commit d75c29cc7d6b05fdc90808fc96e8d700a295420e Author: Owen O'Malley Date: Fri Sep 4 16:11:13 2015 -0700 HIVE-4243. Fix column names in ORC metadata. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java index 15a3e2c..f39d3e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java @@ -22,8 +22,6 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; @@ -964,35 +962,30 @@ public String toString() { return builder; } - static ColumnStatisticsImpl create(ObjectInspector inspector) { - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - return new BooleanStatisticsImpl(); - case BYTE: - case SHORT: - case INT: - case LONG: - return new IntegerStatisticsImpl(); - case FLOAT: - case DOUBLE: - return new DoubleStatisticsImpl(); - case STRING: - case CHAR: - case VARCHAR: - return new StringStatisticsImpl(); - case DECIMAL: - return new DecimalStatisticsImpl(); - case DATE: - return new DateStatisticsImpl(); - case TIMESTAMP: - return new TimestampStatisticsImpl(); - case BINARY: - return new BinaryStatisticsImpl(); - default: - return new ColumnStatisticsImpl(); - } + static ColumnStatisticsImpl create(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanStatisticsImpl(); + case BYTE: + case SHORT: + case INT: + case LONG: + return new IntegerStatisticsImpl(); + case FLOAT: + case DOUBLE: + return new DoubleStatisticsImpl(); + case STRING: + case CHAR: + case VARCHAR: + return new StringStatisticsImpl(); + case DECIMAL: + return new DecimalStatisticsImpl(); + case DATE: + return new DateStatisticsImpl(); + case TIMESTAMP: + return new TimestampStatisticsImpl(); + case BINARY: + return new BinaryStatisticsImpl(); default: return new ColumnStatisticsImpl(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index a60ebb4..cea684a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; /** * Contains factory methods to read or write ORC files. @@ -205,7 +206,8 @@ public static Reader createReader(Path path, public static class WriterOptions { private final Configuration configuration; private FileSystem fileSystemValue = null; - private ObjectInspector inspectorValue = null; + private TypeDescription schema = null; + private ObjectInspector inspector = null; private long stripeSizeValue; private long blockSizeValue; private int rowIndexStrideValue; @@ -359,7 +361,19 @@ public WriterOptions compress(CompressionKind value) { * to determine the schema for the file. */ public WriterOptions inspector(ObjectInspector value) { - inspectorValue = value; + this.inspector = value; + this.schema = OrcOutputFormat.convertTypeInfo( + TypeInfoUtils.getTypeInfoFromObjectInspector(value)); + return this; + } + + /** + * Set the schema for the file. This is a required parameter. + * @param schema the schema for the file. + * @return this + */ + public WriterOptions setSchema(TypeDescription schema) { + this.schema = schema; return this; } @@ -426,8 +440,8 @@ public static Writer createWriter(Path path, FileSystem fs = opts.fileSystemValue == null ? path.getFileSystem(opts.configuration) : opts.fileSystemValue; - return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue, - opts.stripeSizeValue, opts.compressValue, + return new WriterImpl(fs, path, opts.configuration, opts.schema, + opts.inspector, opts.stripeSizeValue, opts.compressValue, opts.bufferSizeValue, opts.rowIndexStrideValue, opts.memoryManagerValue, opts.blockPaddingValue, opts.versionValue, opts.callback, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java index 95e95c6..9ed4d51 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewOutputFormat.java @@ -50,7 +50,7 @@ public void write(NullWritable key, OrcSerdeRow row) options.inspector(row.getInspector()); writer = OrcFile.createWriter(path, options); } - writer.addRow(row.getRow()); + writer.addRow(row.getRow(), row.getInspector()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index ea4ebb4..1f4df9a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -20,22 +20,32 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Properties; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; -import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy; import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -55,54 +65,55 @@ private static class OrcRecordWriter implements RecordWriter, StatsProvidingRecordWriter { - private Writer writer = null; - private final Path path; - private final OrcFile.WriterOptions options; + private Writer writer; private final SerDeStats stats; + private final Path path; + private final OrcFile.WriterOptions opts; - OrcRecordWriter(Path path, OrcFile.WriterOptions options) { + OrcRecordWriter(Path path, + OrcFile.WriterOptions options) throws IOException { + this.stats = new SerDeStats(); this.path = path; - this.options = options; + opts = options; + writer = OrcFile.createWriter(path, options); + } + + OrcRecordWriter(Path path, + JobConf conf) throws IOException { this.stats = new SerDeStats(); + this.path = path; + opts = OrcFile.writerOptions(conf); } @Override public void write(NullWritable nullWritable, OrcSerdeRow row) throws IOException { if (writer == null) { - options.inspector(row.getInspector()); - writer = OrcFile.createWriter(path, options); + opts.inspector(row.getInspector()); + writer = OrcFile.createWriter(path, opts); } - writer.addRow(row.getRow()); + writer.addRow(row.getRow(), row.getInspector()); } @Override public void write(Writable row) throws IOException { OrcSerdeRow serdeRow = (OrcSerdeRow) row; - if (writer == null) { - options.inspector(serdeRow.getInspector()); - writer = OrcFile.createWriter(path, options); - } - writer.addRow(serdeRow.getRow()); + writer.addRow(serdeRow.getRow(), serdeRow.getInspector()); } @Override public void close(Reporter reporter) throws IOException { + if (writer == null) { + TypeDescription schema = TypeDescription.createStruct(); + schema.addField("dummy", TypeDescription.createString()); + opts.setSchema(schema); + writer = OrcFile.createWriter(path, opts); + } close(true); } @Override public void close(boolean b) throws IOException { - // if we haven't written any rows, we need to create a file with a - // generic schema. - if (writer == null) { - // a row with no columns - ObjectInspector inspector = ObjectInspectorFactory. - getStandardStructObjectInspector(new ArrayList(), - new ArrayList()); - options.inspector(inspector); - writer = OrcFile.createWriter(path, options); - } writer.close(); } @@ -114,8 +125,83 @@ public SerDeStats getStats() { } } - private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) { - return OrcFile.writerOptions(props, conf); + static TypeDescription convertTypeInfo(TypeInfo info) { + switch (info.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info; + switch (pinfo.getPrimitiveCategory()) { + case BOOLEAN: + return TypeDescription.createBoolean(); + case BYTE: + return TypeDescription.createByte(); + case SHORT: + return TypeDescription.createShort(); + case INT: + return TypeDescription.createInt(); + case LONG: + return TypeDescription.createLong(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case STRING: + return TypeDescription.createString(); + case DATE: + return TypeDescription.createDate(); + case TIMESTAMP: + return TypeDescription.createTimestamp(); + case BINARY: + return TypeDescription.createBinary(); + case DECIMAL: { + DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo; + return TypeDescription.createDecimal(dinfo.getPrecision(), + dinfo.getScale()); + } + case VARCHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createVarchar(cinfo.getLength()); + } + case CHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createChar(cinfo.getLength()); + } + default: + throw new IllegalArgumentException("ORC doesn't handle primitive" + + " category " + pinfo.getPrimitiveCategory()); + } + } + case LIST: { + ListTypeInfo linfo = (ListTypeInfo) info; + return TypeDescription.createList + (convertTypeInfo(linfo.getListElementTypeInfo())); + } + case MAP: { + MapTypeInfo minfo = (MapTypeInfo) info; + return TypeDescription.createMap + (convertTypeInfo(minfo.getMapKeyTypeInfo()), + convertTypeInfo(minfo.getMapValueTypeInfo())); + } + case UNION: { + UnionTypeInfo minfo = (UnionTypeInfo) info; + TypeDescription result = TypeDescription.createUnion(); + for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) { + result.addUnionChild(convertTypeInfo(child)); + } + return result; + } + case STRUCT: { + StructTypeInfo sinfo = (StructTypeInfo) info; + TypeDescription result = TypeDescription.createStruct(); + for(String fieldName: sinfo.getAllStructFieldNames()) { + result.addField(fieldName, + convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName))); + } + return result; + } + default: + throw new IllegalArgumentException("ORC doesn't handle " + + info.getCategory()); + } } @Override @@ -123,7 +209,7 @@ public SerDeStats getStats() { getRecordWriter(FileSystem fileSystem, JobConf conf, String name, Progressable reporter) throws IOException { return new - OrcRecordWriter(new Path(name), getOptions(conf,null)); + OrcRecordWriter(new Path(name), conf); } @@ -135,7 +221,33 @@ public SerDeStats getStats() { boolean isCompressed, Properties tableProperties, Progressable reporter) throws IOException { - return new OrcRecordWriter(path, getOptions(conf,tableProperties)); + OrcFile.WriterOptions opts = OrcFile.writerOptions(tableProperties, conf); + final String columnNameProperty = + tableProperties.getProperty(IOConstants.COLUMNS); + final String columnTypeProperty = + tableProperties.getProperty(IOConstants.COLUMNS_TYPES); + List columnNames; + List columnTypes; + + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + TypeDescription schema = TypeDescription.createStruct(); + for(int i=0; i < columnNames.size(); ++i) { + schema.addField(columnNames.get(i), + convertTypeInfo(columnTypes.get(i))); + } + opts.setSchema(schema); + return new OrcRecordWriter(path, opts); } private class DummyOrcRecordUpdater implements RecordUpdater { @@ -230,7 +342,7 @@ public RecordUpdater getRecordUpdater(Path path, @Override public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path, - Options options) throws IOException { + final Options options) throws IOException { final Path filename = AcidUtils.createFilename(path, options); final OrcFile.WriterOptions opts = OrcFile.writerOptions(options.getConfiguration()); @@ -256,7 +368,7 @@ public void write(Writable w) throws IOException { orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); - writer.addRow(w); + writer.addRow(w, options.getInspector()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 2220b8e..4256e7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -88,6 +89,7 @@ private final LongWritable originalTransaction = new LongWritable(-1); private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); + private final ObjectInspector inspector; private long insertedRows = 0; private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows @@ -248,8 +250,10 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { writerOptions.stripeSize(DELTA_STRIPE_SIZE); } rowInspector = (StructObjectInspector)options.getInspector(); - writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), - options.getRecordIdColumn()))); + inspector = createEventSchema(findRecId(options.getInspector(), + options.getRecordIdColumn())); + writerOptions.setSchema(OrcOutputFormat.convertTypeInfo( + TypeInfoUtils.getTypeInfoFromObjectInspector(inspector))); this.writer = OrcFile.createWriter(this.path, writerOptions); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); @@ -342,7 +346,7 @@ else if(operation == INSERT_OPERATION) { this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); - writer.addRow(item); + writer.addRow(item, inspector); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index db2ca15..3e2af23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -18,20 +18,10 @@ package org.apache.hadoop.hive.ql.io.orc; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; - -import com.google.common.collect.Lists; public class OrcUtils { private static final Log LOG = LogFactory.getLog(OrcUtils.class); @@ -49,159 +39,44 @@ * index 5 correspond to column d. After flattening list gets 2 columns. * * @param selectedColumns - comma separated list of selected column names - * @param allColumns - comma separated list of all column names - * @param inspector - object inspector + * @param schema - object schema * @return - boolean array with true value set for the specified column names */ - public static boolean[] includeColumns(String selectedColumns, String allColumns, - ObjectInspector inspector) { - int numFlattenedCols = getFlattenedColumnsCount(inspector); - boolean[] results = new boolean[numFlattenedCols]; + public static boolean[] includeColumns(String selectedColumns, + TypeDescription schema) { + int numFlattenedCols = schema.getMaximumId(); + boolean[] results = new boolean[numFlattenedCols + 1]; if ("*".equals(selectedColumns)) { Arrays.fill(results, true); return results; } - if (selectedColumns != null && !selectedColumns.isEmpty()) { - includeColumnsImpl(results, selectedColumns.toLowerCase(), allColumns, inspector); - } - return results; - } - - private static void includeColumnsImpl(boolean[] includeColumns, String selectedColumns, - String allColumns, - ObjectInspector inspector) { - Map> columnSpanMap = getColumnSpan(allColumns, inspector); - LOG.info("columnSpanMap: " + columnSpanMap); - - String[] selCols = selectedColumns.split(","); - for (String sc : selCols) { - if (columnSpanMap.containsKey(sc)) { - List colSpan = columnSpanMap.get(sc); - int start = colSpan.get(0); - int end = colSpan.get(1); - for (int i = start; i <= end; i++) { - includeColumns[i] = true; + if (selectedColumns != null && + schema.getCategory() == TypeDescription.Category.STRUCT) { + List fieldNames = schema.getFieldNames(); + List fields = schema.getChildren(); + for (String column: selectedColumns.split((","))) { + TypeDescription col = findColumn(column, fieldNames, fields); + if (col != null) { + for(int i=col.getId(); i <= col.getMaximumId(); ++i) { + results[i] = true; } } } - - LOG.info("includeColumns: " + Arrays.toString(includeColumns)); } - - private static Map> getColumnSpan(String allColumns, - ObjectInspector inspector) { - // map that contains the column span for each column. Column span is the number of columns - // required after flattening. For a given object inspector this map contains the start column - // id and end column id (both inclusive) after flattening. - // EXAMPLE: - // schema: struct> - // column span map for the above struct will be - // a => [1,1], b => [2,2], c => [3,5] - Map> columnSpanMap = new HashMap>(); - if (allColumns != null) { - String[] columns = allColumns.split(","); - int startIdx = 0; - int endIdx = 0; - if (inspector instanceof StructObjectInspector) { - StructObjectInspector soi = (StructObjectInspector) inspector; - List fields = soi.getAllStructFieldRefs(); - for (int i = 0; i < fields.size(); i++) { - StructField sf = fields.get(i); - - // we get the type (category) from object inspector but column name from the argument. - // The reason for this is hive (FileSinkOperator) does not pass the actual column names, - // instead it passes the internal column names (_col1,_col2). - ObjectInspector sfOI = sf.getFieldObjectInspector(); - String colName = columns[i]; - - startIdx = endIdx + 1; - switch (sfOI.getCategory()) { - case PRIMITIVE: - endIdx += 1; - break; - case STRUCT: - endIdx += 1; - StructObjectInspector structInsp = (StructObjectInspector) sfOI; - List structFields = structInsp.getAllStructFieldRefs(); - for (int j = 0; j < structFields.size(); ++j) { - endIdx += getFlattenedColumnsCount(structFields.get(j).getFieldObjectInspector()); - } - break; - case MAP: - endIdx += 1; - MapObjectInspector mapInsp = (MapObjectInspector) sfOI; - endIdx += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector()); - endIdx += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector()); - break; - case LIST: - endIdx += 1; - ListObjectInspector listInsp = (ListObjectInspector) sfOI; - endIdx += getFlattenedColumnsCount(listInsp.getListElementObjectInspector()); - break; - case UNION: - endIdx += 1; - UnionObjectInspector unionInsp = (UnionObjectInspector) sfOI; - List choices = unionInsp.getObjectInspectors(); - for (int j = 0; j < choices.size(); ++j) { - endIdx += getFlattenedColumnsCount(choices.get(j)); - } - break; - default: - throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); - } - - columnSpanMap.put(colName, Lists.newArrayList(startIdx, endIdx)); - } - } - } - return columnSpanMap; + return results; } - /** - * Returns the number of columns after flatting complex types. - * - * @param inspector - object inspector - * @return - */ - public static int getFlattenedColumnsCount(ObjectInspector inspector) { - int numWriters = 0; - switch (inspector.getCategory()) { - case PRIMITIVE: - numWriters += 1; - break; - case STRUCT: - numWriters += 1; - StructObjectInspector structInsp = (StructObjectInspector) inspector; - List fields = structInsp.getAllStructFieldRefs(); - for (int i = 0; i < fields.size(); ++i) { - numWriters += getFlattenedColumnsCount(fields.get(i).getFieldObjectInspector()); - } - break; - case MAP: - numWriters += 1; - MapObjectInspector mapInsp = (MapObjectInspector) inspector; - numWriters += getFlattenedColumnsCount(mapInsp.getMapKeyObjectInspector()); - numWriters += getFlattenedColumnsCount(mapInsp.getMapValueObjectInspector()); - break; - case LIST: - numWriters += 1; - ListObjectInspector listInsp = (ListObjectInspector) inspector; - numWriters += getFlattenedColumnsCount(listInsp.getListElementObjectInspector()); - break; - case UNION: - numWriters += 1; - UnionObjectInspector unionInsp = (UnionObjectInspector) inspector; - List choices = unionInsp.getObjectInspectors(); - for (int i = 0; i < choices.size(); ++i) { - numWriters += getFlattenedColumnsCount(choices.get(i)); - } - break; - default: - throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); + private static TypeDescription findColumn(String columnName, + List fieldNames, + List fields) { + int i = 0; + for(String fieldName: fieldNames) { + if (fieldName.equalsIgnoreCase(columnName)) { + return fields.get(i); + } else { + i += 1; + } } - return numWriters; + return null; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java new file mode 100644 index 0000000..ac489ec --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This is the description of the types in an ORC file. + */ +public class TypeDescription { + public enum Category { + BOOLEAN("boolean", true), + BYTE("tinyint", true), + SHORT("smallint", true), + INT("int", true), + LONG("bigint", true), + FLOAT("float", true), + DOUBLE("double", true), + STRING("string", true), + DATE("date", true), + TIMESTAMP("timestamp", true), + BINARY("binary", true), + DECIMAL("decimal", true), + VARCHAR("varchar", true), + CHAR("char", true), + LIST("array", false), + MAP("map", false), + STRUCT("struct", false), + UNION("union", false); + + private Category(String name, boolean isPrimitive) { + this.name = name; + this.isPrimitive = isPrimitive; + } + + final boolean isPrimitive; + final String name; + + public boolean isPrimitive() { + return isPrimitive; + } + + public String getName() { + return name; + } + } + + public static TypeDescription createBoolean() { + return new TypeDescription(Category.BOOLEAN); + } + + public static TypeDescription createByte() { + return new TypeDescription(Category.BYTE); + } + + public static TypeDescription createShort() { + return new TypeDescription(Category.SHORT); + } + + public static TypeDescription createInt() { + return new TypeDescription(Category.INT); + } + + public static TypeDescription createLong() { + return new TypeDescription(Category.LONG); + } + + public static TypeDescription createFloat() { + return new TypeDescription(Category.FLOAT); + } + + public static TypeDescription createDouble() { + return new TypeDescription(Category.DOUBLE); + } + + public static TypeDescription createString() { + return new TypeDescription(Category.STRING); + } + + public static TypeDescription createDate() { + return new TypeDescription(Category.DATE); + } + + public static TypeDescription createTimestamp() { + return new TypeDescription(Category.TIMESTAMP); + } + + public static TypeDescription createBinary() { + return new TypeDescription(Category.BINARY); + } + + public static TypeDescription createDecimal(int precision, int scale) { + if (scale < 0 || precision < 1 || precision > 38 || scale > precision) { + throw new IllegalArgumentException("Invalid decimal type scale = " + + scale + ", precision = " + precision); + } + return new TypeDescription(Category.DECIMAL, precision, scale); + } + + public static TypeDescription createVarchar(int maxLength) { + return new TypeDescription(Category.VARCHAR, maxLength); + } + + public static TypeDescription createChar(int maxLength) { + return new TypeDescription(Category.CHAR, maxLength); + } + + public static TypeDescription createList(TypeDescription childType) { + TypeDescription result = new TypeDescription(Category.LIST); + result.children.add(childType); + childType.parent = result; + return result; + } + + public static TypeDescription createMap(TypeDescription keyType, + TypeDescription valueType) { + TypeDescription result = new TypeDescription(Category.MAP); + result.children.add(keyType); + result.children.add(valueType); + keyType.parent = result; + valueType.parent = result; + return result; + } + + public static TypeDescription createUnion() { + return new TypeDescription(Category.UNION); + } + + public static TypeDescription createStruct() { + return new TypeDescription(Category.STRUCT); + } + + /** + * Add a child to a union type. + * @param child a new child type to add + * @return the union type. + */ + public TypeDescription addUnionChild(TypeDescription child) { + if (category != Category.UNION) { + throw new IllegalArgumentException("Can only add types to union type" + + " and not " + category); + } + children.add(child); + child.parent = this; + return this; + } + + /** + * Add a field to a struct type as it is built. + * @param field the field name + * @param fieldType the type of the field + * @return the struct type + */ + public TypeDescription addField(String field, TypeDescription fieldType) { + if (category != Category.STRUCT) { + throw new IllegalArgumentException("Can only add fields to struct type" + + " and not " + category); + } + fieldNames.add(field); + children.add(fieldType); + fieldType.parent = this; + return this; + } + + /** + * Get the id for this type. + * The first call will cause all of the the ids in tree to be assigned, so + * it should not be called before the type is completely built. + * @return the sequential id + */ + public int getId() { + // if the id hasn't been assigned, assign all of the ids from the root + if (id == -1) { + TypeDescription root = this; + while (root.parent != null) { + root = root.parent; + } + root.assignIds(0); + } + return id; + } + + /** + * Get the maximum id assigned to this type or its children. + * The first call will cause all of the the ids in tree to be assigned, so + * it should not be called before the type is completely built. + * @return the maximum id assigned under this type + */ + public int getMaximumId() { + // if the id hasn't been assigned, assign all of the ids from the root + if (maxId == -1) { + TypeDescription root = this; + while (root.parent != null) { + root = root.parent; + } + root.assignIds(0); + } + return maxId; + } + + /** + * Get the kind of this type. + * @return get the category for this type. + */ + public Category getCategory() { + return category; + } + + /** + * Get the maximum length of the type. Only used for char and varchar types. + * @return the maximum length of the string type + */ + public int getMaxLength() { + return maxLength; + } + + /** + * Get the precision of the decimal type. + * @return the number of digits for the precision. + */ + public int getPrecision() { + return precision; + } + + /** + * Get the scale of the decimal type. + * @return the number of digits for the scale. + */ + public int getScale() { + return scale; + } + + /** + * For struct types, get the list of field names. + * @return the list of field names. + */ + public List getFieldNames() { + return Collections.unmodifiableList(fieldNames); + } + + /** + * Get the subtypes of this type. + * @return the list of children types + */ + public List getChildren() { + return children == null ? null : Collections.unmodifiableList(children); + } + + /** + * Assign ids to all of the nodes under this one. + * @param startId the lowest id to assign + * @return the next available id + */ + private int assignIds(int startId) { + id = startId++; + if (children != null) { + for (TypeDescription child : children) { + startId = child.assignIds(startId); + } + } + maxId = startId - 1; + return startId; + } + + private TypeDescription(Category category) { + this.category = category; + if (category.isPrimitive) { + children = null; + } else { + children = new ArrayList<>(); + } + if (category == Category.STRUCT) { + fieldNames = new ArrayList<>(); + } else { + fieldNames = null; + } + this.maxLength = 0; + this.precision = 0; + this.scale = 0; + } + + private TypeDescription(Category category, int precision, + int scale) { + this.category = category; + children = null; + fieldNames = null; + this.maxLength = 0; + this.precision = precision; + this.scale = scale; + } + + private TypeDescription(Category category, int maxLength) { + this.category = category; + children = null; + fieldNames = null; + this.maxLength = maxLength; + this.precision = 0; + this.scale = 0; + } + + private int id = -1; + private int maxId = -1; + private TypeDescription parent; + private final Category category; + private final List children; + private final List fieldNames; + private final int maxLength; + private final int precision; + private final int scale; + + public void printToBuffer(StringBuilder buffer) { + buffer.append(category.name); + switch (category) { + case DECIMAL: + buffer.append('('); + buffer.append(precision); + buffer.append(','); + buffer.append(scale); + buffer.append(')'); + break; + case CHAR: + case VARCHAR: + buffer.append('('); + buffer.append(maxLength); + buffer.append(')'); + break; + case LIST: + case MAP: + case UNION: + buffer.append('<'); + for(int i=0; i < children.size(); ++i) { + if (i != 0) { + buffer.append(','); + } + children.get(i).printToBuffer(buffer); + } + buffer.append('>'); + break; + case STRUCT: + buffer.append('<'); + for(int i=0; i < children.size(); ++i) { + if (i != 0) { + buffer.append(','); + } + buffer.append(fieldNames.get(i)); + buffer.append(':'); + children.get(i).printToBuffer(buffer); + } + buffer.append('>'); + break; + default: + break; + } + } + + public String toString() { + StringBuilder buffer = new StringBuilder(); + printToBuffer(buffer); + return buffer.toString(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java index 6411e3f..e10c287 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -26,6 +28,13 @@ * The interface for writing ORC files. */ public interface Writer { + + /** + * Get the schema for this writer + * @return the file schema + */ + TypeDescription getSchema(); + /** * Add arbitrary meta-data to the ORC file. This may be called at any point * until the Writer is closed. If the same key is passed a second time, the @@ -36,11 +45,19 @@ void addUserMetadata(String key, ByteBuffer value); /** + * Add a row to the file. This version of addRow is for backwards + * compatibility and the addRow(Object,ObjectInspector) is preferred. + * @param row the row to write + */ + void addRow(Object row) throws IOException; + + /** * Add a row to the ORC file. * @param row the row to add + * @param inspector the object inspector * @throws IOException */ - void addRow(Object row) throws IOException; + void addRow(Object row, ObjectInspector inspector) throws IOException; /** * Flush all of the buffers and close the file. No methods on this writer diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 7aa8d65..9c345df 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier; import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy; @@ -54,7 +53,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; @@ -72,9 +70,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; @@ -127,6 +122,10 @@ private final int bufferSize; private final long blockSize; private final double paddingTolerance; + private final TypeDescription schema; + // This is for backwards compatibility and may be null. + private final ObjectInspector inspector; + // the streams that make up the current stripe private final Map streams = new TreeMap(); @@ -162,27 +161,30 @@ private boolean writeTimeZone; WriterImpl(FileSystem fs, - Path path, - Configuration conf, - ObjectInspector inspector, - long stripeSize, - CompressionKind compress, - int bufferSize, - int rowIndexStride, - MemoryManager memoryManager, - boolean addBlockPadding, - OrcFile.Version version, - OrcFile.WriterCallback callback, - EncodingStrategy encodingStrategy, - CompressionStrategy compressionStrategy, - double paddingTolerance, - long blockSizeValue, - String bloomFilterColumnNames, - double bloomFilterFpp) throws IOException { + Path path, + Configuration conf, + TypeDescription schema, + ObjectInspector inspector, + long stripeSize, + CompressionKind compress, + int bufferSize, + int rowIndexStride, + MemoryManager memoryManager, + boolean addBlockPadding, + OrcFile.Version version, + OrcFile.WriterCallback callback, + EncodingStrategy encodingStrategy, + CompressionStrategy compressionStrategy, + double paddingTolerance, + long blockSizeValue, + String bloomFilterColumnNames, + double bloomFilterFpp) throws IOException { this.fs = fs; this.path = path; this.conf = conf; this.callback = callback; + this.schema = schema; + this.inspector = inspector; if (callback != null) { callbackContext = new OrcFile.WriterContext(){ @@ -207,21 +209,18 @@ public Writer getWriter() { this.memoryManager = memoryManager; buildIndex = rowIndexStride > 0; codec = createCodec(compress); - String allColumns = conf.get(IOConstants.COLUMNS); - if (allColumns == null) { - allColumns = getColumnNamesFromInspector(inspector); - } - this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); + int numColumns = schema.getMaximumId() + 1; + this.bufferSize = getEstimatedBufferSize(getMemoryAvailableForORC(), + codec != null, numColumns, bufferSize); if (version == OrcFile.Version.V_0_11) { /* do not write bloom filters for ORC v11 */ - this.bloomFilterColumns = - OrcUtils.includeColumns(null, allColumns, inspector); + this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1]; } else { this.bloomFilterColumns = - OrcUtils.includeColumns(bloomFilterColumnNames, allColumns, inspector); + OrcUtils.includeColumns(bloomFilterColumnNames, schema); } this.bloomFilterFpp = bloomFilterFpp; - treeWriter = createTreeWriter(inspector, streamFactory, false); + treeWriter = createTreeWriter(schema, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + MIN_ROW_INDEX_STRIDE); @@ -231,62 +230,42 @@ public Writer getWriter() { memoryManager.addWriter(path, stripeSize, this); } - private String getColumnNamesFromInspector(ObjectInspector inspector) { - List fieldNames = Lists.newArrayList(); - Joiner joiner = Joiner.on(","); - if (inspector instanceof StructObjectInspector) { - StructObjectInspector soi = (StructObjectInspector) inspector; - List fields = soi.getAllStructFieldRefs(); - for(StructField sf : fields) { - fieldNames.add(sf.getFieldName()); - } - } - return joiner.join(fieldNames); - } + static int getEstimatedBufferSize(long availableMem, + boolean isCompressed, + int columnCount, int bs) { + if (columnCount > COLUMN_COUNT_THRESHOLD) { + // In BufferedStream, there are 3 outstream buffers (compressed, + // uncompressed and overflow) and list of previously compressed buffers. + // Since overflow buffer is rarely used, lets consider only 2 allocation. + // Also, initially, the list of compression buffers will be empty. + final int outStreamBuffers = isCompressed ? 2 : 1; - @VisibleForTesting - int getEstimatedBufferSize(int bs) { - return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs); - } + // max possible streams per column is 5. For string columns, there is + // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. + final int maxStreams = 5; - int getEstimatedBufferSize(String colNames, int bs) { - long availableMem = getMemoryAvailableForORC(); - if (colNames != null) { - final int numCols = colNames.split(",").length; - if (numCols > COLUMN_COUNT_THRESHOLD) { - // In BufferedStream, there are 3 outstream buffers (compressed, - // uncompressed and overflow) and list of previously compressed buffers. - // Since overflow buffer is rarely used, lets consider only 2 allocation. - // Also, initially, the list of compression buffers will be empty. - final int outStreamBuffers = codec == null ? 1 : 2; - - // max possible streams per column is 5. For string columns, there is - // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. - final int maxStreams = 5; - - // Lets assume 10% memory for holding dictionary in memory and other - // object allocations - final long miscAllocation = (long) (0.1f * availableMem); - - // compute the available memory - final long remainingMem = availableMem - miscAllocation; - - int estBufferSize = (int) (remainingMem / - (maxStreams * outStreamBuffers * numCols)); - estBufferSize = getClosestBufferSize(estBufferSize, bs); - if (estBufferSize > bs) { - estBufferSize = bs; - } + // Lets assume 10% memory for holding dictionary in memory and other + // object allocations + final long miscAllocation = (long) (0.1f * availableMem); - LOG.info("WIDE TABLE - Number of columns: " + numCols + - " Chosen compression buffer size: " + estBufferSize); - return estBufferSize; + // compute the available memory + final long remainingMem = availableMem - miscAllocation; + + int estBufferSize = (int) (remainingMem / + (maxStreams * outStreamBuffers * columnCount)); + estBufferSize = getClosestBufferSize(estBufferSize); + if (estBufferSize > bs) { + estBufferSize = bs; } + + LOG.info("WIDE TABLE - Number of columns: " + columnCount + + " Chosen compression buffer size: " + estBufferSize); + return estBufferSize; } return bs; } - private int getClosestBufferSize(int estBufferSize, int bs) { + private static int getClosestBufferSize(int estBufferSize) { final int kb4 = 4 * 1024; final int kb8 = 8 * 1024; final int kb16 = 16 * 1024; @@ -546,15 +525,6 @@ public int getNextColumnId() { } /** - * Get the current column id. After creating all tree writers this count should tell how many - * columns (including columns within nested complex objects) are created in total. - * @return current column id - */ - public int getCurrentColumnId() { - return columnCount; - } - - /** * Get the stride rate of the row index. */ public int getRowIndexStride() { @@ -641,7 +611,6 @@ public boolean hasWriterTimeZone() { */ private abstract static class TreeWriter { protected final int id; - protected final ObjectInspector inspector; private final BitFieldWriter isPresent; private final boolean isCompressed; protected final ColumnStatisticsImpl indexStatistics; @@ -665,18 +634,18 @@ public boolean hasWriterTimeZone() { /** * Create a tree writer. * @param columnId the column id of the column to write - * @param inspector the object inspector to use + * @param schema the row schema * @param streamFactory limited access to the Writer's data. * @param nullable can the value be null? * @throws IOException */ - TreeWriter(int columnId, ObjectInspector inspector, + TreeWriter(int columnId, TypeDescription schema, StreamFactory streamFactory, boolean nullable) throws IOException { this.streamFactory = streamFactory; this.isCompressed = streamFactory.isCompressed(); this.id = columnId; - this.inspector = inspector; + if (nullable) { isPresentOutStream = streamFactory.createStream(id, OrcProto.Stream.Kind.PRESENT); @@ -686,9 +655,9 @@ public boolean hasWriterTimeZone() { } this.foundNulls = false; createBloomFilter = streamFactory.getBloomFilterColumns()[columnId]; - indexStatistics = ColumnStatisticsImpl.create(inspector); - stripeColStatistics = ColumnStatisticsImpl.create(inspector); - fileStatistics = ColumnStatisticsImpl.create(inspector); + indexStatistics = ColumnStatisticsImpl.create(schema); + stripeColStatistics = ColumnStatisticsImpl.create(schema); + fileStatistics = ColumnStatisticsImpl.create(schema); childrenWriters = new TreeWriter[0]; rowIndex = OrcProto.RowIndex.newBuilder(); rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); @@ -749,10 +718,11 @@ boolean isNewWriteFormat(StreamFactory writer) { /** * Add a new value to the column. - * @param obj + * @param obj the object to write + * @param inspector the object inspector * @throws IOException */ - void write(Object obj) throws IOException { + void write(Object obj, ObjectInspector inspector) throws IOException { if (obj != null) { indexStatistics.increment(); } else { @@ -918,10 +888,10 @@ long estimateMemory() { private final BitFieldWriter writer; BooleanTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.writer = new BitFieldWriter(out, 1); @@ -929,8 +899,8 @@ long estimateMemory() { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { boolean val = ((BooleanObjectInspector) inspector).get(obj); indexStatistics.updateBoolean(val); @@ -957,18 +927,18 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final RunLengthByteWriter writer; ByteTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.writer = new RunLengthByteWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA)); recordPosition(rowIndexPosition); } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { byte val = ((ByteObjectInspector) inspector).get(obj); indexStatistics.updateInteger(val); @@ -996,35 +966,20 @@ void recordPosition(PositionRecorder recorder) throws IOException { private static class IntegerTreeWriter extends TreeWriter { private final IntegerWriter writer; - private final ShortObjectInspector shortInspector; - private final IntObjectInspector intInspector; - private final LongObjectInspector longInspector; private boolean isDirectV2 = true; + private final TypeDescription.Category category; IntegerTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); - if (inspector instanceof IntObjectInspector) { - intInspector = (IntObjectInspector) inspector; - shortInspector = null; - longInspector = null; - } else { - intInspector = null; - if (inspector instanceof LongObjectInspector) { - longInspector = (LongObjectInspector) inspector; - shortInspector = null; - } else { - shortInspector = (ShortObjectInspector) inspector; - longInspector = null; - } - } recordPosition(rowIndexPosition); + this.category = schema.getCategory(); } @Override @@ -1038,16 +993,23 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { long val; - if (intInspector != null) { - val = intInspector.get(obj); - } else if (longInspector != null) { - val = longInspector.get(obj); - } else { - val = shortInspector.get(obj); + switch (category) { + case SHORT: + val = ((ShortObjectInspector) inspector).get(obj); + break; + case INT: + val = ((IntObjectInspector) inspector).get(obj); + break; + case LONG: + val = ((LongObjectInspector) inspector).get(obj); + break; + default: + throw new IllegalArgumentException("Unknown integer type " + + category); } indexStatistics.updateInteger(val); if (createBloomFilter) { @@ -1078,10 +1040,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final SerializationUtils utils; FloatTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); @@ -1089,8 +1051,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { float val = ((FloatObjectInspector) inspector).get(obj); indexStatistics.updateDouble(val); @@ -1122,10 +1084,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final SerializationUtils utils; DoubleTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); @@ -1133,8 +1095,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { double val = ((DoubleObjectInspector) inspector).get(obj); indexStatistics.updateDouble(val); @@ -1183,10 +1145,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final boolean strideDictionaryCheck; StringTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); @@ -1214,15 +1176,15 @@ void recordPosition(PositionRecorder recorder) throws IOException { * @param obj value * @return Text text value from obj */ - Text getTextValue(Object obj) { + Text getTextValue(Object obj, ObjectInspector inspector) { return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj); } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { - Text val = getTextValue(obj); + Text val = getTextValue(obj, inspector); if (useDictionaryEncoding || !strideDictionaryCheck) { rows.add(dictionary.add(val)); } else { @@ -1422,17 +1384,17 @@ long estimateMemory() { private static class CharTreeWriter extends StringTreeWriter { CharTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); } /** * Override base class implementation to support char values. */ @Override - Text getTextValue(Object obj) { + Text getTextValue(Object obj, ObjectInspector inspector) { return (((HiveCharObjectInspector) inspector) .getPrimitiveWritableObject(obj)).getTextValue(); } @@ -1444,17 +1406,17 @@ Text getTextValue(Object obj) { private static class VarcharTreeWriter extends StringTreeWriter { VarcharTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); } /** * Override base class implementation to support varchar values. */ @Override - Text getTextValue(Object obj) { + Text getTextValue(Object obj, ObjectInspector inspector) { return (((HiveVarcharObjectInspector) inspector) .getPrimitiveWritableObject(obj)).getTextValue(); } @@ -1466,10 +1428,10 @@ Text getTextValue(Object obj) { private boolean isDirectV2 = true; BinaryTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1489,8 +1451,8 @@ Text getTextValue(Object obj) { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { BytesWritable val = ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj); @@ -1530,10 +1492,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final long base_timestamp; TimestampTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); this.seconds = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); @@ -1556,8 +1518,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { Timestamp val = ((TimestampObjectInspector) inspector). @@ -1609,10 +1571,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final boolean isDirectV2; DateTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); OutStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); @@ -1621,8 +1583,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { // Using the Writable here as it's used directly for writing as well as for stats. DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj); @@ -1665,10 +1627,10 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final boolean isDirectV2; DecimalTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.scaleStream = createIntegerWriter(writer.createStream(id, @@ -1687,8 +1649,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector). getPrimitiveJavaObject(obj); @@ -1723,32 +1685,31 @@ void recordPosition(PositionRecorder recorder) throws IOException { } private static class StructTreeWriter extends TreeWriter { - private final List fields; StructTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); - StructObjectInspector structObjectInspector = - (StructObjectInspector) inspector; - fields = structObjectInspector.getAllStructFieldRefs(); - childrenWriters = new TreeWriter[fields.size()]; + super(columnId, schema, writer, nullable); + List children = schema.getChildren(); + childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { childrenWriters[i] = createTreeWriter( - fields.get(i).getFieldObjectInspector(), writer, true); + children.get(i), writer, true); } recordPosition(rowIndexPosition); } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { StructObjectInspector insp = (StructObjectInspector) inspector; + List fields = insp.getAllStructFieldRefs(); for(int i = 0; i < fields.size(); ++i) { StructField field = fields.get(i); TreeWriter writer = childrenWriters[i]; - writer.write(insp.getStructFieldData(obj, field)); + writer.write(insp.getStructFieldData(obj, field), + field.getFieldObjectInspector()); } } } @@ -1769,16 +1730,14 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, private final boolean isDirectV2; ListTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); - ListObjectInspector listObjectInspector = (ListObjectInspector) inspector; childrenWriters = new TreeWriter[1]; childrenWriters[0] = - createTreeWriter(listObjectInspector.getListElementObjectInspector(), - writer, true); + createTreeWriter(schema.getChildren().get(0), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); @@ -1795,8 +1754,8 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { ListObjectInspector insp = (ListObjectInspector) inspector; int len = insp.getListLength(obj); @@ -1805,7 +1764,8 @@ void write(Object obj) throws IOException { bloomFilter.addLong(len); } for(int i=0; i < len; ++i) { - childrenWriters[0].write(insp.getListElement(obj, i)); + childrenWriters[0].write(insp.getListElement(obj, i), + insp.getListElementObjectInspector()); } } } @@ -1833,17 +1793,17 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final boolean isDirectV2; MapTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); + super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); - MapObjectInspector insp = (MapObjectInspector) inspector; childrenWriters = new TreeWriter[2]; - childrenWriters[0] = - createTreeWriter(insp.getMapKeyObjectInspector(), writer, true); - childrenWriters[1] = - createTreeWriter(insp.getMapValueObjectInspector(), writer, true); + List children = schema.getChildren(); + for(int i=0; i < 2; ++i) { + childrenWriters[i] = + createTreeWriter(children.get(i), writer, true); + } lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); recordPosition(rowIndexPosition); @@ -1860,8 +1820,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { MapObjectInspector insp = (MapObjectInspector) inspector; // this sucks, but it will have to do until we can get a better @@ -1872,8 +1832,10 @@ void write(Object obj) throws IOException { bloomFilter.addLong(valueMap.size()); } for(Map.Entry entry: valueMap.entrySet()) { - childrenWriters[0].write(entry.getKey()); - childrenWriters[1].write(entry.getValue()); + childrenWriters[0].write(entry.getKey(), + insp.getMapKeyObjectInspector()); + childrenWriters[1].write(entry.getValue(), + insp.getMapValueObjectInspector()); } } } @@ -1900,15 +1862,14 @@ void recordPosition(PositionRecorder recorder) throws IOException { private final RunLengthByteWriter tags; UnionTreeWriter(int columnId, - ObjectInspector inspector, + TypeDescription schema, StreamFactory writer, boolean nullable) throws IOException { - super(columnId, inspector, writer, nullable); - UnionObjectInspector insp = (UnionObjectInspector) inspector; - List choices = insp.getObjectInspectors(); - childrenWriters = new TreeWriter[choices.size()]; + super(columnId, schema, writer, nullable); + List children = schema.getChildren(); + childrenWriters = new TreeWriter[children.size()]; for(int i=0; i < childrenWriters.length; ++i) { - childrenWriters[i] = createTreeWriter(choices.get(i), writer, true); + childrenWriters[i] = createTreeWriter(children.get(i), writer, true); } tags = new RunLengthByteWriter(writer.createStream(columnId, @@ -1917,8 +1878,8 @@ void recordPosition(PositionRecorder recorder) throws IOException { } @Override - void write(Object obj) throws IOException { - super.write(obj); + void write(Object obj, ObjectInspector inspector) throws IOException { + super.write(obj, inspector); if (obj != null) { UnionObjectInspector insp = (UnionObjectInspector) inspector; byte tag = insp.getTag(obj); @@ -1926,7 +1887,8 @@ void write(Object obj) throws IOException { if (createBloomFilter) { bloomFilter.addLong(tag); } - childrenWriters[tag].write(insp.getField(obj)); + childrenWriters[tag].write(insp.getField(obj), + insp.getObjectInspectors().get(tag)); } } @@ -1948,169 +1910,151 @@ void recordPosition(PositionRecorder recorder) throws IOException { } } - private static TreeWriter createTreeWriter(ObjectInspector inspector, + private static TreeWriter createTreeWriter(TypeDescription schema, StreamFactory streamFactory, boolean nullable) throws IOException { - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - return new BooleanTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case BYTE: - return new ByteTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case SHORT: - case INT: - case LONG: - return new IntegerTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case FLOAT: - return new FloatTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DOUBLE: - return new DoubleTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case STRING: - return new StringTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case CHAR: - return new CharTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case VARCHAR: - return new VarcharTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case BINARY: - return new BinaryTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case TIMESTAMP: - return new TimestampTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DATE: - return new DateTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - case DECIMAL: - return new DecimalTreeWriter(streamFactory.getNextColumnId(), - inspector, streamFactory, nullable); - default: - throw new IllegalArgumentException("Bad primitive category " + - ((PrimitiveObjectInspector) inspector).getPrimitiveCategory()); - } + switch (schema.getCategory()) { + case BOOLEAN: + return new BooleanTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case BYTE: + return new ByteTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case SHORT: + case INT: + case LONG: + return new IntegerTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case FLOAT: + return new FloatTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case DOUBLE: + return new DoubleTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case STRING: + return new StringTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case CHAR: + return new CharTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case VARCHAR: + return new VarcharTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case BINARY: + return new BinaryTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case TIMESTAMP: + return new TimestampTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case DATE: + return new DateTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); + case DECIMAL: + return new DecimalTreeWriter(streamFactory.getNextColumnId(), + schema, streamFactory, nullable); case STRUCT: - return new StructTreeWriter(streamFactory.getNextColumnId(), inspector, + return new StructTreeWriter(streamFactory.getNextColumnId(), schema, streamFactory, nullable); case MAP: - return new MapTreeWriter(streamFactory.getNextColumnId(), inspector, + return new MapTreeWriter(streamFactory.getNextColumnId(), schema, streamFactory, nullable); case LIST: - return new ListTreeWriter(streamFactory.getNextColumnId(), inspector, + return new ListTreeWriter(streamFactory.getNextColumnId(), schema, streamFactory, nullable); case UNION: - return new UnionTreeWriter(streamFactory.getNextColumnId(), inspector, + return new UnionTreeWriter(streamFactory.getNextColumnId(), schema, streamFactory, nullable); default: throw new IllegalArgumentException("Bad category: " + - inspector.getCategory()); + schema.getCategory()); } } private static void writeTypes(OrcProto.Footer.Builder builder, - TreeWriter treeWriter) { + TypeDescription schema) { OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - switch (treeWriter.inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) treeWriter.inspector). - getPrimitiveCategory()) { - case BOOLEAN: - type.setKind(OrcProto.Type.Kind.BOOLEAN); - break; - case BYTE: - type.setKind(OrcProto.Type.Kind.BYTE); - break; - case SHORT: - type.setKind(OrcProto.Type.Kind.SHORT); - break; - case INT: - type.setKind(OrcProto.Type.Kind.INT); - break; - case LONG: - type.setKind(OrcProto.Type.Kind.LONG); - break; - case FLOAT: - type.setKind(OrcProto.Type.Kind.FLOAT); - break; - case DOUBLE: - type.setKind(OrcProto.Type.Kind.DOUBLE); - break; - case STRING: - type.setKind(OrcProto.Type.Kind.STRING); - break; - case CHAR: - // The char length needs to be written to file and should be available - // from the object inspector - CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); - type.setKind(Type.Kind.CHAR); - type.setMaximumLength(charTypeInfo.getLength()); - break; - case VARCHAR: - // The varchar length needs to be written to file and should be available - // from the object inspector - VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) treeWriter.inspector).getTypeInfo(); - type.setKind(Type.Kind.VARCHAR); - type.setMaximumLength(typeInfo.getLength()); - break; - case BINARY: - type.setKind(OrcProto.Type.Kind.BINARY); - break; - case TIMESTAMP: - type.setKind(OrcProto.Type.Kind.TIMESTAMP); - break; - case DATE: - type.setKind(OrcProto.Type.Kind.DATE); - break; - case DECIMAL: - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)((PrimitiveObjectInspector)treeWriter.inspector).getTypeInfo(); - type.setKind(OrcProto.Type.Kind.DECIMAL); - type.setPrecision(decTypeInfo.precision()); - type.setScale(decTypeInfo.scale()); - break; - default: - throw new IllegalArgumentException("Unknown primitive category: " + - ((PrimitiveObjectInspector) treeWriter.inspector). - getPrimitiveCategory()); - } + List children = schema.getChildren(); + switch (schema.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case VARCHAR: + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(schema.getPrecision()); + type.setScale(schema.getScale()); break; case LIST: type.setKind(OrcProto.Type.Kind.LIST); - type.addSubtypes(treeWriter.childrenWriters[0].id); + type.addSubtypes(children.get(0).getId()); break; case MAP: type.setKind(OrcProto.Type.Kind.MAP); - type.addSubtypes(treeWriter.childrenWriters[0].id); - type.addSubtypes(treeWriter.childrenWriters[1].id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } break; case STRUCT: type.setKind(OrcProto.Type.Kind.STRUCT); - for(TreeWriter child: treeWriter.childrenWriters) { - type.addSubtypes(child.id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); } - for(StructField field: ((StructTreeWriter) treeWriter).fields) { - type.addFieldNames(field.getFieldName()); + for(String field: schema.getFieldNames()) { + type.addFieldNames(field); } break; case UNION: type.setKind(OrcProto.Type.Kind.UNION); - for(TreeWriter child: treeWriter.childrenWriters) { - type.addSubtypes(child.id); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); } break; default: throw new IllegalArgumentException("Unknown category: " + - treeWriter.inspector.getCategory()); + schema.getCategory()); } builder.addTypes(type); - for(TreeWriter child: treeWriter.childrenWriters) { - writeTypes(builder, child); + if (children != null) { + for(TypeDescription child: children) { + writeTypes(builder, child); + } } } @@ -2243,73 +2187,58 @@ private void flushStripe() throws IOException { } private long computeRawDataSize() { - long result = 0; - for (TreeWriter child : treeWriter.getChildrenWriters()) { - result += getRawDataSizeFromInspectors(child, child.inspector); - } - return result; + return getRawDataSize(treeWriter, schema); } - private long getRawDataSizeFromInspectors(TreeWriter child, ObjectInspector oi) { + private long getRawDataSize(TreeWriter child, + TypeDescription schema) { long total = 0; - switch (oi.getCategory()) { - case PRIMITIVE: - total += getRawDataSizeFromPrimitives(child, oi); - break; - case LIST: - case MAP: - case UNION: - case STRUCT: - for (TreeWriter tw : child.childrenWriters) { - total += getRawDataSizeFromInspectors(tw, tw.inspector); - } - break; - default: - LOG.debug("Unknown object inspector category."); - break; - } - return total; - } - - private long getRawDataSizeFromPrimitives(TreeWriter child, ObjectInspector oi) { - long result = 0; long numVals = child.fileStatistics.getNumberOfValues(); - switch (((PrimitiveObjectInspector) oi).getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case FLOAT: - return numVals * JavaDataModel.get().primitive1(); - case LONG: - case DOUBLE: - return numVals * JavaDataModel.get().primitive2(); - case STRING: - case VARCHAR: - case CHAR: - // ORC strings are converted to java Strings. so use JavaDataModel to - // compute the overall size of strings - child = (StringTreeWriter) child; - StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; - numVals = numVals == 0 ? 1 : numVals; - int avgStringLen = (int) (scs.getSum() / numVals); - return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); - case DECIMAL: - return numVals * JavaDataModel.get().lengthOfDecimal(); - case DATE: - return numVals * JavaDataModel.get().lengthOfDate(); - case BINARY: - // get total length of binary blob - BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; - return bcs.getSum(); - case TIMESTAMP: - return numVals * JavaDataModel.get().lengthOfTimestamp(); - default: - LOG.debug("Unknown primitive category."); - break; + switch (schema.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case FLOAT: + return numVals * JavaDataModel.get().primitive1(); + case LONG: + case DOUBLE: + return numVals * JavaDataModel.get().primitive2(); + case STRING: + case VARCHAR: + case CHAR: + // ORC strings are converted to java Strings. so use JavaDataModel to + // compute the overall size of strings + StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics; + numVals = numVals == 0 ? 1 : numVals; + int avgStringLen = (int) (scs.getSum() / numVals); + return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen); + case DECIMAL: + return numVals * JavaDataModel.get().lengthOfDecimal(); + case DATE: + return numVals * JavaDataModel.get().lengthOfDate(); + case BINARY: + // get total length of binary blob + BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics; + return bcs.getSum(); + case TIMESTAMP: + return numVals * JavaDataModel.get().lengthOfTimestamp(); + case LIST: + case MAP: + case UNION: + case STRUCT: { + TreeWriter[] childWriters = child.getChildrenWriters(); + List childTypes = schema.getChildren(); + for (int i=0; i < childWriters.length; ++i) { + total += getRawDataSize(childWriters[i], childTypes.get(i)); + } + break; + } + default: + LOG.debug("Unknown object inspector category."); + break; } - - return result; + return total; } private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { @@ -2356,7 +2285,7 @@ private int writeFooter(long bodyLength) throws IOException { // populate raw data size rawDataSize = computeRawDataSize(); // serialize the types - writeTypes(builder, treeWriter); + writeTypes(builder, schema); // add the stripe information for(OrcProto.StripeInformation stripe: stripes) { builder.addStripes(stripe); @@ -2410,13 +2339,23 @@ private long estimateStripeSize() { } @Override + public TypeDescription getSchema() { + return schema; + } + + @Override public void addUserMetadata(String name, ByteBuffer value) { userMetadata.put(name, ByteString.copyFrom(value)); } @Override public void addRow(Object row) throws IOException { - treeWriter.write(row); + addRow(row, inspector); + } + + @Override + public void addRow(Object row, ObjectInspector inspector) throws IOException { + treeWriter.write(row, inspector); rowsInStripe += 1; if (buildIndex) { rowsInIndex += 1; @@ -2493,12 +2432,11 @@ public void appendStripe(byte[] stripe, int offset, int length, getStream(); long start = rawWriter.getPos(); - long stripeLen = length; long availBlockSpace = blockSize - (start % blockSize); // see if stripe can fit in the current hdfs block, else pad the remaining // space in the block - if (stripeLen < blockSize && stripeLen > availBlockSpace && + if (length < blockSize && length > availBlockSpace && addBlockPadding) { byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; LOG.info(String.format("Padding ORC by %d bytes while merging..", diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java index 4d30377..c2afc6c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java @@ -48,11 +48,10 @@ @Test public void testLongMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaIntObjectInspector; + TypeDescription schema = TypeDescription.createInt(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateInteger(10); stats1.updateInteger(10); stats2.updateInteger(1); @@ -71,11 +70,10 @@ public void testLongMerge() throws Exception { @Test public void testDoubleMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + TypeDescription schema = TypeDescription.createDouble(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateDouble(10.0); stats1.updateDouble(100.0); stats2.updateDouble(1.0); @@ -95,11 +93,10 @@ public void testDoubleMerge() throws Exception { @Test public void testStringMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaStringObjectInspector; + TypeDescription schema = TypeDescription.createString(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateString(new Text("bob")); stats1.updateString(new Text("david")); stats1.updateString(new Text("charles")); @@ -119,11 +116,10 @@ public void testStringMerge() throws Exception { @Test public void testDateMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaDateObjectInspector; + TypeDescription schema = TypeDescription.createDate(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateDate(new DateWritable(1000)); stats1.updateDate(new DateWritable(100)); stats2.updateDate(new DateWritable(10)); @@ -142,11 +138,10 @@ public void testDateMerge() throws Exception { @Test public void testTimestampMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaTimestampObjectInspector; + TypeDescription schema = TypeDescription.createTimestamp(); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateTimestamp(new Timestamp(10)); stats1.updateTimestamp(new Timestamp(100)); stats2.updateTimestamp(new Timestamp(1)); @@ -165,11 +160,10 @@ public void testTimestampMerge() throws Exception { @Test public void testDecimalMerge() throws Exception { - ObjectInspector inspector = - PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector; + TypeDescription schema = TypeDescription.createDecimal(38, 16); - ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(inspector); - ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(inspector); + ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema); + ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema); stats1.updateDecimal(HiveDecimal.create(10)); stats1.updateDecimal(HiveDecimal.create(100)); stats2.updateDecimal(HiveDecimal.create(1)); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 8ba4d2e..85b44a2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -1106,6 +1106,8 @@ public void testProjectedColumnSize() throws Exception { @SuppressWarnings("unchecked,deprecation") public void testInOutFormat() throws Exception { Properties properties = new Properties(); + properties.setProperty("columns", "x,y"); + properties.setProperty("columns.types", "int:int"); StructObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = (StructObjectInspector) @@ -1122,8 +1124,6 @@ public void testInOutFormat() throws Exception { writer.write(serde.serialize(new MyRow(3,2), inspector)); writer.close(true); serde = new OrcSerde(); - properties.setProperty("columns", "x,y"); - properties.setProperty("columns.types", "int:int"); SerDeUtils.initializeSerDe(serde, conf, properties, null); assertEquals(OrcSerde.OrcSerdeRow.class, serde.getSerializedClass()); inspector = (StructObjectInspector) serde.getObjectInspector(); @@ -1295,13 +1295,13 @@ public void testMROutput() throws Exception { @SuppressWarnings("deprecation") public void testEmptyFile() throws Exception { Properties properties = new Properties(); + properties.setProperty("columns", "x,y"); + properties.setProperty("columns.types", "int:int"); HiveOutputFormat outFormat = new OrcOutputFormat(); org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter writer = outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true, properties, Reporter.NULL); writer.close(true); - properties.setProperty("columns", "x,y"); - properties.setProperty("columns.types", "int:int"); SerDe serde = new OrcSerde(); SerDeUtils.initializeSerDe(serde, conf, properties, null); InputFormat in = new OrcInputFormat(); @@ -1333,6 +1333,8 @@ public void readFields(DataInput dataInput) throws IOException { @SuppressWarnings("unchecked,deprecation") public void testDefaultTypes() throws Exception { Properties properties = new Properties(); + properties.setProperty("columns", "str,str2"); + properties.setProperty("columns.types", "string:string"); StructObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = (StructObjectInspector) @@ -1352,7 +1354,6 @@ public void testDefaultTypes() throws Exception { writer.write(serde.serialize(new StringRow("miles"), inspector)); writer.close(true); serde = new OrcSerde(); - properties.setProperty("columns", "str,str2"); SerDeUtils.initializeSerDe(serde, conf, properties, null); inspector = (StructObjectInspector) serde.getObjectInspector(); assertEquals("struct", inspector.getTypeName()); @@ -1873,6 +1874,8 @@ public void testSetSearchArgument() throws Exception { @SuppressWarnings("unchecked,deprecation") public void testSplitElimination() throws Exception { Properties properties = new Properties(); + properties.setProperty("columns", "z,r"); + properties.setProperty("columns.types", "int:struct"); StructObjectInspector inspector; synchronized (TestOrcFile.class) { inspector = (StructObjectInspector) @@ -1901,8 +1904,6 @@ public void testSplitElimination() throws Exception { .build(); conf.set("sarg.pushdown", toKryo(sarg)); conf.set("hive.io.file.readcolumn.names", "z,r"); - properties.setProperty("columns", "z,r"); - properties.setProperty("columns.types", "int:struct"); SerDeUtils.initializeSerDe(serde, conf, properties, null); inspector = (StructObjectInspector) serde.getObjectInspector(); InputFormat in = new OrcInputFormat(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java index 0bb8401..06e3362 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java @@ -519,9 +519,9 @@ public void testTimestamp() throws Exception { Object row = rows.next(null); assertEquals(tslist.get(idx++).getNanos(), ((TimestampWritable) row).getNanos()); } - assertEquals(1, OrcUtils.getFlattenedColumnsCount(inspector)); + assertEquals(0, writer.getSchema().getMaximumId()); boolean[] expected = new boolean[] {false}; - boolean[] included = OrcUtils.includeColumns("", "ts", inspector); + boolean[] included = OrcUtils.includeColumns("", writer.getSchema()); assertEquals(true, Arrays.equals(expected, included)); } @@ -546,17 +546,18 @@ public void testStringAndBinaryStatistics() throws Exception { Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); - assertEquals(3, OrcUtils.getFlattenedColumnsCount(inspector)); + TypeDescription schema = writer.getSchema(); + assertEquals(2, schema.getMaximumId()); boolean[] expected = new boolean[] {false, false, true}; - boolean[] included = OrcUtils.includeColumns("string1", "bytes1,string1", inspector); + boolean[] included = OrcUtils.includeColumns("string1", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, false, false}; - included = OrcUtils.includeColumns("", "bytes1,string1", inspector); + included = OrcUtils.includeColumns("", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, false, false}; - included = OrcUtils.includeColumns(null, "bytes1,string1", inspector); + included = OrcUtils.includeColumns(null, schema); assertEquals(true, Arrays.equals(expected, included)); // check the stats @@ -656,9 +657,10 @@ public void testStripeLevelStats() throws Exception { Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); - assertEquals(3, OrcUtils.getFlattenedColumnsCount(inspector)); + TypeDescription schema = writer.getSchema(); + assertEquals(2, schema.getMaximumId()); boolean[] expected = new boolean[] {false, true, false}; - boolean[] included = OrcUtils.includeColumns("int1", "int1,string1", inspector); + boolean[] included = OrcUtils.includeColumns("int1", schema); assertEquals(true, Arrays.equals(expected, included)); Metadata metadata = reader.getMetadata(); @@ -742,14 +744,14 @@ public void test1() throws Exception { Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); - assertEquals(24, OrcUtils.getFlattenedColumnsCount(inspector)); + TypeDescription schema = writer.getSchema(); + assertEquals(23, schema.getMaximumId()); boolean[] expected = new boolean[] {false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false}; - boolean[] included = OrcUtils.includeColumns("", - "boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map", inspector); + boolean[] included = OrcUtils.includeColumns("", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, true, false, false, false, @@ -757,8 +759,7 @@ public void test1() throws Exception { true, true, true, true, true, false, false, false, false, true, true, true, true, true}; - included = OrcUtils.includeColumns("boolean1,string1,middle,map", - "boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map", inspector); + included = OrcUtils.includeColumns("boolean1,string1,middle,map", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, true, false, false, false, @@ -766,8 +767,7 @@ public void test1() throws Exception { true, true, true, true, true, false, false, false, false, true, true, true, true, true}; - included = OrcUtils.includeColumns("boolean1,string1,middle,map", - "boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map", inspector); + included = OrcUtils.includeColumns("boolean1,string1,middle,map", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, true, true, true, true, @@ -777,7 +777,7 @@ public void test1() throws Exception { true, true, true, true}; included = OrcUtils.includeColumns( "boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map", - "boolean1,byte1,short1,int1,long1,float1,double1,bytes1,string1,middle,list,map", inspector); + schema); assertEquals(true, Arrays.equals(expected, included)); Metadata metadata = reader.getMetadata(); @@ -1312,17 +1312,18 @@ public void testUnionAndTimestamp() throws Exception { Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); - assertEquals(6, OrcUtils.getFlattenedColumnsCount(inspector)); + TypeDescription schema = writer.getSchema(); + assertEquals(5, schema.getMaximumId()); boolean[] expected = new boolean[] {false, false, false, false, false, false}; - boolean[] included = OrcUtils.includeColumns("", "time,union,decimal", inspector); + boolean[] included = OrcUtils.includeColumns("", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, true, false, false, false, true}; - included = OrcUtils.includeColumns("time,decimal", "time,union,decimal", inspector); + included = OrcUtils.includeColumns("time,decimal", schema); assertEquals(true, Arrays.equals(expected, included)); expected = new boolean[] {false, false, true, true, true, false}; - included = OrcUtils.includeColumns("union", "time,union,decimal", inspector); + included = OrcUtils.includeColumns("union", schema); assertEquals(true, Arrays.equals(expected, included)); assertEquals(false, reader.getMetadataKeys().iterator().hasNext()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java index a3d3ec5..f838cbc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java @@ -38,241 +38,45 @@ public class TestOrcWideTable { - private static final int MEMORY_FOR_ORC = 512 * 1024 * 1024; - Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" - + File.separator + "tmp")); - - Configuration conf; - FileSystem fs; - Path testFilePath; - float memoryPercent; - - @Rule - public TestName testCaseName = new TestName(); - - @Before - public void openFileSystem() throws Exception { - conf = new Configuration(); - fs = FileSystem.getLocal(conf); - testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc"); - fs.delete(testFilePath, false); - // make sure constant memory is available for ORC always - memoryPercent = (float) MEMORY_FOR_ORC / (float) ManagementFactory.getMemoryMXBean(). - getHeapMemoryUsage().getMax(); - conf.setFloat(HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL.varname, memoryPercent); - } - @Test public void testBufferSizeFor1Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 128 * 1024; - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } + assertEquals(128 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + false, 1, 128*1024)); } @Test public void testBufferSizeFor1000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 128 * 1024; - String columns = getRandomColumnNames(1000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } + assertEquals(128 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + false, 1000, 128*1024)); } @Test public void testBufferSizeFor2000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(2000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.ZLIB).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(32 * 1024, newBufferSize); - } + assertEquals(32 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + true, 2000, 256*1024)); } @Test public void testBufferSizeFor2000ColNoCompression() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(2000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(64 * 1024, newBufferSize); - } + assertEquals(64 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + false, 2000, 256*1024)); } @Test public void testBufferSizeFor4000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(4000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.ZLIB).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(16 * 1024, newBufferSize); - } + assertEquals(16 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + true, 4000, 256*1024)); } @Test public void testBufferSizeFor4000ColNoCompression() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(4000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(32 * 1024, newBufferSize); - } + assertEquals(32 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + false, 4000, 256*1024)); } @Test public void testBufferSizeFor25000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(25000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - // 4K is the minimum buffer size - assertEquals(4 * 1024, newBufferSize); - } - } - - @Test - public void testBufferSizeManualOverride1() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 1024; - String columns = getRandomColumnNames(2000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } - } - - @Test - public void testBufferSizeManualOverride2() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 2 * 1024; - String columns = getRandomColumnNames(4000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } - } - - private String getRandomColumnNames(int n) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < n - 1; i++) { - sb.append("col").append(i).append(","); - } - sb.append("col").append(n - 1); - return sb.toString(); + assertEquals(4 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + false, 25000, 256*1024)); } } diff --git ql/src/test/results/clientpositive/orc_analyze.q.out ql/src/test/results/clientpositive/orc_analyze.q.out index 6eb9a93..bc46852 100644 --- ql/src/test/results/clientpositive/orc_analyze.q.out +++ ql/src/test/results/clientpositive/orc_analyze.q.out @@ -106,7 +106,7 @@ Table Parameters: numFiles 1 numRows 100 rawDataSize 52600 - totalSize 3174 + totalSize 3202 #### A masked pattern was here #### # Storage Information @@ -154,7 +154,7 @@ Table Parameters: numFiles 1 numRows 100 rawDataSize 52600 - totalSize 3174 + totalSize 3202 #### A masked pattern was here #### # Storage Information @@ -202,7 +202,7 @@ Table Parameters: numFiles 1 numRows 100 rawDataSize 52600 - totalSize 3174 + totalSize 3202 #### A masked pattern was here #### # Storage Information @@ -291,7 +291,7 @@ Table Parameters: numFiles 1 numRows 100 rawDataSize 52600 - totalSize 3174 + totalSize 3202 #### A masked pattern was here #### # Storage Information @@ -405,7 +405,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -448,7 +448,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -503,7 +503,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -546,7 +546,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -601,7 +601,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -644,7 +644,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -744,7 +744,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -787,7 +787,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -907,7 +907,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -950,7 +950,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -1005,7 +1005,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -1048,7 +1048,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -1103,7 +1103,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -1146,7 +1146,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -1252,7 +1252,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -1295,7 +1295,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 22050 - totalSize 2088 + totalSize 2118 #### A masked pattern was here #### # Storage Information @@ -1460,7 +1460,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -1560,7 +1560,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information @@ -1660,7 +1660,7 @@ Partition Parameters: numFiles 1 numRows 50 rawDataSize 21950 - totalSize 2073 + totalSize 2102 #### A masked pattern was here #### # Storage Information diff --git ql/src/test/results/clientpositive/orc_file_dump.q.out ql/src/test/results/clientpositive/orc_file_dump.q.out index 67aa189..32c41d3 100644 --- ql/src/test/results/clientpositive/orc_file_dump.q.out +++ ql/src/test/results/clientpositive/orc_file_dump.q.out @@ -97,7 +97,7 @@ File Version: 0.12 with HIVE_8732 Rows: 1049 Compression: ZLIB Compression size: 262144 -Type: struct<_col0:tinyint,_col1:smallint,_col2:int,_col3:bigint,_col4:float,_col5:double,_col6:boolean,_col7:string,_col8:timestamp,_col9:decimal(4,2),_col10:binary> +Type: struct Stripe Statistics: Stripe 1: @@ -192,7 +192,7 @@ Stripes: Entry 1: numHashFunctions: 4 bitCount: 6272 popCount: 168 loadFactor: 0.0268 expectedFpp: 5.147697E-7 Stripe level merge: numHashFunctions: 4 bitCount: 6272 popCount: 492 loadFactor: 0.0784 expectedFpp: 3.7864847E-5 -File length: 33456 bytes +File length: 33458 bytes Padding length: 0 bytes Padding ratio: 0% -- END ORC FILE DUMP -- @@ -215,7 +215,7 @@ File Version: 0.12 with HIVE_8732 Rows: 1049 Compression: ZLIB Compression size: 262144 -Type: struct<_col0:tinyint,_col1:smallint,_col2:int,_col3:bigint,_col4:float,_col5:double,_col6:boolean,_col7:string,_col8:timestamp,_col9:decimal(4,2),_col10:binary> +Type: struct Stripe Statistics: Stripe 1: @@ -310,7 +310,7 @@ Stripes: Entry 1: numHashFunctions: 7 bitCount: 9600 popCount: 285 loadFactor: 0.0297 expectedFpp: 2.0324289E-11 Stripe level merge: numHashFunctions: 7 bitCount: 9600 popCount: 849 loadFactor: 0.0884 expectedFpp: 4.231118E-8 -File length: 38610 bytes +File length: 38613 bytes Padding length: 0 bytes Padding ratio: 0% -- END ORC FILE DUMP -- @@ -345,7 +345,7 @@ File Version: 0.12 with HIVE_8732 Rows: 1049 Compression: ZLIB Compression size: 262144 -Type: struct<_col0:tinyint,_col1:smallint,_col2:int,_col3:bigint,_col4:float,_col5:double,_col6:boolean,_col7:string,_col8:timestamp,_col9:decimal(4,2),_col10:binary> +Type: struct Stripe Statistics: Stripe 1: @@ -440,7 +440,7 @@ Stripes: Entry 1: numHashFunctions: 4 bitCount: 6272 popCount: 168 loadFactor: 0.0268 expectedFpp: 5.147697E-7 Stripe level merge: numHashFunctions: 4 bitCount: 6272 popCount: 492 loadFactor: 0.0784 expectedFpp: 3.7864847E-5 -File length: 33456 bytes +File length: 33458 bytes Padding length: 0 bytes Padding ratio: 0% -- END ORC FILE DUMP -- diff --git ql/src/test/results/clientpositive/orc_int_type_promotion.q.out ql/src/test/results/clientpositive/orc_int_type_promotion.q.out index d26dff2..03e2f7f 100644 --- ql/src/test/results/clientpositive/orc_int_type_promotion.q.out +++ ql/src/test/results/clientpositive/orc_int_type_promotion.q.out @@ -220,14 +220,14 @@ STAGE PLANS: Map Operator Tree: TableScan alias: alltypes_orc - Statistics: Num rows: 88 Data size: 1772 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 88 Data size: 1766 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: ti (type: tinyint), si (type: smallint), i (type: int), bi (type: bigint) outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 88 Data size: 1772 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 88 Data size: 1766 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - Statistics: Num rows: 88 Data size: 1772 Basic stats: COMPLETE Column stats: NONE + Statistics: Num rows: 88 Data size: 1766 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat