diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index 5ee8e32..bad88d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.shims.CombineHiveKey; import java.io.IOException; -import java.util.List; /** * Fast file merge operator for ORC files. @@ -47,7 +46,7 @@ // not be merged. CompressionKind compression = null; long compressBuffSize = 0; - List version; + OrcFile.Version version; int columnCount = 0; int rowIndexStride = 0; @@ -90,13 +89,16 @@ private void processKeyValuePairs(Object key, Object value) if (outWriter == null) { compression = k.getCompression(); compressBuffSize = k.getCompressBufferSize(); - version = k.getVersionList(); + version = k.getVersion(); columnCount = k.getTypes().get(0).getSubtypesCount(); rowIndexStride = k.getRowIndexStride(); // block size and stripe size will be from config outWriter = OrcFile.createWriter(outPath, - OrcFile.writerOptions(jc).compress(compression) + OrcFile.writerOptions(jc) + .compress(compression) + .version(version) + .rowIndexStride(rowIndexStride) .inspector(reader.getObjectInspector())); LOG.info("ORC merge file output path: " + outPath); } @@ -167,7 +169,7 @@ private boolean checkCompatibility(OrcFileKeyWrapper k) { } - if (!k.getVersionList().equals(version)) { + if (!k.getVersion().equals(version)) { LOG.info("Incompatible ORC file merge! Version does not match for " + k.getInputPath()); return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java index 6eb0e22..11f05c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileKeyWrapper.java @@ -18,15 +18,14 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.orc.OrcProto; -import org.apache.hadoop.io.WritableComparable; - /** * Key for OrcFileMergeMapper task. Contains orc file related information that * should match before merging two orc files. @@ -38,14 +37,14 @@ protected long compressBufferSize; protected List types; protected int rowIndexStride; - protected List versionList; + protected OrcFile.Version version; - public List getVersionList() { - return versionList; + public OrcFile.Version getVersion() { + return version; } - public void setVersionList(List versionList) { - this.versionList = versionList; + public void setVersion(OrcFile.Version version) { + this.version = version; } public int getRowIndexStride() { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java index db5bbb5..603b75a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeRecordReader.java @@ -18,16 +18,16 @@ package org.apache.hadoop.hive.ql.io.orc; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + public class OrcFileStripeMergeRecordReader implements RecordReader { @@ -88,7 +88,7 @@ protected boolean nextStripe(OrcFileKeyWrapper keyWrapper, OrcFileValueWrapper v keyWrapper.setInputPath(path); keyWrapper.setCompression(reader.getCompression()); keyWrapper.setCompressBufferSize(reader.getCompressionSize()); - keyWrapper.setVersionList(((ReaderImpl) reader).getFileMetaInfo().versionList); + keyWrapper.setVersion(reader.getFileVersion()); keyWrapper.setRowIndexStride(reader.getRowIndexStride()); keyWrapper.setTypes(reader.getTypes()); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 9e69de6..24da301 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -18,21 +18,13 @@ package org.apache.hadoop.hive.ql.io.orc; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.management.ManagementFactory; -import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -84,12 +76,11 @@ import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.TreeMap; -import static com.google.common.base.Preconditions.checkArgument; - /** * An ORC file writer. The file is divided into stripes, which is the natural * unit of work when reading. Each stripe is buffered in memory until the @@ -2387,13 +2378,23 @@ public void appendStripe(byte[] stripe, int offset, int length, private void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics) { List cs = stripeStatistics.getColStatsList(); + List allWriters = getAllColumnTreeWriters(treeWriter); + for (int i = 0; i < allWriters.size(); i++) { + allWriters.get(i).fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(i))); + } + } + + private List getAllColumnTreeWriters(TreeWriter rootTreeWriter) { + List result = Lists.newArrayList(); + getAllColumnTreeWritersImpl(rootTreeWriter, result); + return result; + } - // root element - treeWriter.fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(0))); - TreeWriter[] childWriters = treeWriter.getChildrenWriters(); - for (int i = 0; i < childWriters.length; i++) { - childWriters[i].fileStatistics.merge( - ColumnStatisticsImpl.deserialize(cs.get(i + 1))); + private void getAllColumnTreeWritersImpl(TreeWriter tw, + List result) { + result.add(tw); + for (TreeWriter child : tw.childrenWriters) { + getAllColumnTreeWritersImpl(child, result); } } diff --git ql/src/test/queries/clientpositive/orc_merge8.q ql/src/test/queries/clientpositive/orc_merge8.q new file mode 100644 index 0000000..61ea4bf --- /dev/null +++ ql/src/test/queries/clientpositive/orc_merge8.q @@ -0,0 +1,46 @@ +create table if not exists alltypes ( + bo boolean, + ti tinyint, + si smallint, + i int, + bi bigint, + f float, + d double, + de decimal(10,3), + ts timestamp, + da date, + s string, + c char(5), + vc varchar(5), + m map, + l array, + st struct +) row format delimited fields terminated by '|' +collection items terminated by ',' +map keys terminated by ':' stored as textfile; + +create table alltypes_orc like alltypes; +alter table alltypes_orc set fileformat orc; + +load data local inpath '../../data/files/alltypes2.txt' overwrite into table alltypes; + +SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; +SET hive.optimize.index.filter=true; +set hive.merge.orcfile.stripe.level=false; +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +insert overwrite table alltypes_orc select * from alltypes; +insert into table alltypes_orc select * from alltypes; + +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/alltypes_orc/; + +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + +alter table alltypes_orc concatenate; + +dfs -ls ${hiveconf:hive.metastore.warehouse.dir}/alltypes_orc/; diff --git ql/src/test/results/clientpositive/orc_merge8.q.out ql/src/test/results/clientpositive/orc_merge8.q.out new file mode 100644 index 0000000..f4f4b4a --- /dev/null +++ ql/src/test/results/clientpositive/orc_merge8.q.out @@ -0,0 +1,130 @@ +PREHOOK: query: create table if not exists alltypes ( + bo boolean, + ti tinyint, + si smallint, + i int, + bi bigint, + f float, + d double, + de decimal(10,3), + ts timestamp, + da date, + s string, + c char(5), + vc varchar(5), + m map, + l array, + st struct +) row format delimited fields terminated by '|' +collection items terminated by ',' +map keys terminated by ':' stored as textfile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@alltypes +POSTHOOK: query: create table if not exists alltypes ( + bo boolean, + ti tinyint, + si smallint, + i int, + bi bigint, + f float, + d double, + de decimal(10,3), + ts timestamp, + da date, + s string, + c char(5), + vc varchar(5), + m map, + l array, + st struct +) row format delimited fields terminated by '|' +collection items terminated by ',' +map keys terminated by ':' stored as textfile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@alltypes +PREHOOK: query: create table alltypes_orc like alltypes +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@alltypes_orc +POSTHOOK: query: create table alltypes_orc like alltypes +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@alltypes_orc +PREHOOK: query: alter table alltypes_orc set fileformat orc +PREHOOK: type: ALTERTABLE_FILEFORMAT +PREHOOK: Input: default@alltypes_orc +PREHOOK: Output: default@alltypes_orc +POSTHOOK: query: alter table alltypes_orc set fileformat orc +POSTHOOK: type: ALTERTABLE_FILEFORMAT +POSTHOOK: Input: default@alltypes_orc +POSTHOOK: Output: default@alltypes_orc +PREHOOK: query: load data local inpath '../../data/files/alltypes2.txt' overwrite into table alltypes +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@alltypes +POSTHOOK: query: load data local inpath '../../data/files/alltypes2.txt' overwrite into table alltypes +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@alltypes +PREHOOK: query: insert overwrite table alltypes_orc select * from alltypes +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypes +PREHOOK: Output: default@alltypes_orc +POSTHOOK: query: insert overwrite table alltypes_orc select * from alltypes +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypes +POSTHOOK: Output: default@alltypes_orc +POSTHOOK: Lineage: alltypes_orc.bi SIMPLE [(alltypes)alltypes.FieldSchema(name:bi, type:bigint, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.bo SIMPLE [(alltypes)alltypes.FieldSchema(name:bo, type:boolean, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.c SIMPLE [(alltypes)alltypes.FieldSchema(name:c, type:char(5), comment:null), ] +POSTHOOK: Lineage: alltypes_orc.d SIMPLE [(alltypes)alltypes.FieldSchema(name:d, type:double, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.da SIMPLE [(alltypes)alltypes.FieldSchema(name:da, type:date, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.de SIMPLE [(alltypes)alltypes.FieldSchema(name:de, type:decimal(10,3), comment:null), ] +POSTHOOK: Lineage: alltypes_orc.f SIMPLE [(alltypes)alltypes.FieldSchema(name:f, type:float, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.i SIMPLE [(alltypes)alltypes.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.l SIMPLE [(alltypes)alltypes.FieldSchema(name:l, type:array, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.m SIMPLE [(alltypes)alltypes.FieldSchema(name:m, type:map, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.s SIMPLE [(alltypes)alltypes.FieldSchema(name:s, type:string, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.si SIMPLE [(alltypes)alltypes.FieldSchema(name:si, type:smallint, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.st SIMPLE [(alltypes)alltypes.FieldSchema(name:st, type:struct, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.ti SIMPLE [(alltypes)alltypes.FieldSchema(name:ti, type:tinyint, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.ts SIMPLE [(alltypes)alltypes.FieldSchema(name:ts, type:timestamp, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.vc SIMPLE [(alltypes)alltypes.FieldSchema(name:vc, type:varchar(5), comment:null), ] +PREHOOK: query: insert into table alltypes_orc select * from alltypes +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypes +PREHOOK: Output: default@alltypes_orc +POSTHOOK: query: insert into table alltypes_orc select * from alltypes +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypes +POSTHOOK: Output: default@alltypes_orc +POSTHOOK: Lineage: alltypes_orc.bi SIMPLE [(alltypes)alltypes.FieldSchema(name:bi, type:bigint, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.bo SIMPLE [(alltypes)alltypes.FieldSchema(name:bo, type:boolean, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.c SIMPLE [(alltypes)alltypes.FieldSchema(name:c, type:char(5), comment:null), ] +POSTHOOK: Lineage: alltypes_orc.d SIMPLE [(alltypes)alltypes.FieldSchema(name:d, type:double, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.da SIMPLE [(alltypes)alltypes.FieldSchema(name:da, type:date, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.de SIMPLE [(alltypes)alltypes.FieldSchema(name:de, type:decimal(10,3), comment:null), ] +POSTHOOK: Lineage: alltypes_orc.f SIMPLE [(alltypes)alltypes.FieldSchema(name:f, type:float, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.i SIMPLE [(alltypes)alltypes.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.l SIMPLE [(alltypes)alltypes.FieldSchema(name:l, type:array, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.m SIMPLE [(alltypes)alltypes.FieldSchema(name:m, type:map, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.s SIMPLE [(alltypes)alltypes.FieldSchema(name:s, type:string, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.si SIMPLE [(alltypes)alltypes.FieldSchema(name:si, type:smallint, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.st SIMPLE [(alltypes)alltypes.FieldSchema(name:st, type:struct, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.ti SIMPLE [(alltypes)alltypes.FieldSchema(name:ti, type:tinyint, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.ts SIMPLE [(alltypes)alltypes.FieldSchema(name:ts, type:timestamp, comment:null), ] +POSTHOOK: Lineage: alltypes_orc.vc SIMPLE [(alltypes)alltypes.FieldSchema(name:vc, type:varchar(5), comment:null), ] +Found 2 items +#### A masked pattern was here #### +PREHOOK: query: alter table alltypes_orc concatenate +PREHOOK: type: ALTER_TABLE_MERGE +PREHOOK: Input: default@alltypes_orc +PREHOOK: Output: default@alltypes_orc +POSTHOOK: query: alter table alltypes_orc concatenate +POSTHOOK: type: ALTER_TABLE_MERGE +POSTHOOK: Input: default@alltypes_orc +POSTHOOK: Output: default@alltypes_orc +Found 1 items +#### A masked pattern was here ####