commit 2f6408f6f4db76e32049c83d0796b1887235daa4 Author: Owen O'Malley Date: Fri Nov 4 10:45:56 2016 -0700 HIVE-15124. Fix OrcInputFormat to use reader's schema for include boolean array. diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 39f4112..9aa595c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -268,8 +268,11 @@ protected Void performDataRead() throws IOException { try { if (sarg != null && stride != 0) { // TODO: move this to a common method + SchemaEvolution evolution = new SchemaEvolution( + OrcUtils.convertTypeFromProtobuf(fileMetadata.getTypes(), 0), + null, null); int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( - sarg.getLeaves(), columnNames, 0); + sarg.getLeaves(), evolution); // included will not be null, row options will fill the array with trues if null sargColumns = new boolean[globalIncludes.length]; for (int i : filterColumns) { @@ -697,7 +700,7 @@ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, TypeDescription schema = OrcUtils.convertTypeFromProtobuf(types, 0); SchemaEvolution schemaEvolution = new SchemaEvolution(schema, globalIncludes); sargApp = new RecordReaderImpl.SargApplier(sarg, colNamesForSarg, - rowIndexStride, globalIncludes.length, schemaEvolution); + rowIndexStride, schemaEvolution); } boolean hasAnyData = false; // readState should have been initialized by this time with an empty array. diff --git orc/src/java/org/apache/orc/TypeDescription.java orc/src/java/org/apache/orc/TypeDescription.java index 1585e43..2e9328b 100644 --- orc/src/java/org/apache/orc/TypeDescription.java +++ orc/src/java/org/apache/orc/TypeDescription.java @@ -178,6 +178,191 @@ public static TypeDescription createDecimal() { return new TypeDescription(Category.DECIMAL); } + static class StringPosition { + final String value; + int position; + final int length; + + StringPosition(String value) { + this.value = value; + position = 0; + length = value.length(); + } + + @Override + public String toString() { + StringBuilder buffer = new StringBuilder(); + buffer.append('\''); + buffer.append(value.substring(0, position)); + buffer.append('^'); + buffer.append(value.substring(position)); + buffer.append('\''); + return buffer.toString(); + } + } + + static Category parseCategory(StringPosition source) { + int start = source.position; + while (source.position < source.length) { + char ch = source.value.charAt(source.position); + if (!Character.isLetter(ch)) { + break; + } + source.position += 1; + } + if (source.position != start) { + String word = source.value.substring(start, source.position).toLowerCase(); + for (Category cat : Category.values()) { + if (cat.getName().equals(word)) { + return cat; + } + } + } + throw new IllegalArgumentException("Can't parse category at " + source); + } + + static int parseInt(StringPosition source) { + int start = source.position; + int result = 0; + while (source.position < source.length) { + char ch = source.value.charAt(source.position); + if (!Character.isDigit(ch)) { + break; + } + result = result * 10 + (ch - '0'); + source.position += 1; + } + if (source.position == start) { + throw new IllegalArgumentException("Missing integer at " + source); + } + return result; + } + + static String parseName(StringPosition source) { + int start = source.position; + while (source.position < source.length) { + char ch = source.value.charAt(source.position); + if (!Character.isLetterOrDigit(ch) && ch != '.' && ch != '_') { + break; + } + source.position += 1; + } + if (source.position == start) { + throw new IllegalArgumentException("Missing name at " + source); + } + return source.value.substring(start, source.position); + } + + static void requireChar(StringPosition source, char required) { + if (source.position >= source.length || + source.value.charAt(source.position) != required) { + throw new IllegalArgumentException("Missing required char '" + + required + "' at " + source); + } + source.position += 1; + } + + static boolean consumeChar(StringPosition source, char ch) { + boolean result = source.position < source.length && + source.value.charAt(source.position) == ch; + if (result) { + source.position += 1; + } + return result; + } + + static void parseUnion(TypeDescription type, StringPosition source) { + requireChar(source, '<'); + do { + type.addUnionChild(parseType(source)); + } while (consumeChar(source, ',')); + requireChar(source, '>'); + } + + static void parseStruct(TypeDescription type, StringPosition source) { + requireChar(source, '<'); + do { + String fieldName = parseName(source); + requireChar(source, ':'); + type.addField(fieldName, parseType(source)); + } while (consumeChar(source, ',')); + requireChar(source, '>'); + } + + static TypeDescription parseType(StringPosition source) { + TypeDescription result = new TypeDescription(parseCategory(source)); + switch (result.getCategory()) { + case BINARY: + case BOOLEAN: + case BYTE: + case DATE: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case SHORT: + case STRING: + case TIMESTAMP: + break; + case CHAR: + case VARCHAR: + requireChar(source, '('); + result.withMaxLength(parseInt(source)); + requireChar(source, ')'); + break; + case DECIMAL: { + requireChar(source, '('); + int precision = parseInt(source); + requireChar(source, ','); + result.withScale(parseInt(source)); + result.withPrecision(precision); + requireChar(source, ')'); + break; + } + case LIST: + requireChar(source, '<'); + result.children.add(parseType(source)); + requireChar(source, '>'); + break; + case MAP: + requireChar(source, '<'); + result.children.add(parseType(source)); + requireChar(source, ','); + result.children.add(parseType(source)); + requireChar(source, '>'); + break; + case UNION: + parseUnion(result, source); + break; + case STRUCT: + parseStruct(result, source); + break; + default: + throw new IllegalArgumentException("Unknown type " + + result.getCategory() + " at " + source); + } + return result; + } + + /** + * Parse TypeDescription from the Hive type names. This is the inverse + * of TypeDescription.toString() + * @param typeName the name of the type + * @return a new TypeDescription or null if typeName was null + * @throws IllegalArgumentException if the string is badly formed + */ + public static TypeDescription fromString(String typeName) { + if (typeName == null) { + return null; + } + StringPosition source = new StringPosition(typeName); + TypeDescription result = parseType(source); + if (source.position != source.length) { + throw new IllegalArgumentException("Extra characters at " + source); + } + return result; + } + /** * For decimal types, set the precision. * @param precision the new precision @@ -657,4 +842,29 @@ public String toJson() { printJsonToBuffer("", buffer, 0); return buffer.toString(); } -} + + /** + * Locate a subtype by its id. + * @param goal the column id to look for + * @return the subtype + */ + public TypeDescription findSubtype(int goal) { + // call getId method to make sure the ids are assigned + int id = getId(); + if (goal < id || goal > maxId) { + throw new IllegalArgumentException("Unknown type id " + id + " in " + + toJson()); + } + if (goal == id) { + return this; + } else { + TypeDescription prev = null; + for(TypeDescription next: children) { + if (next.id > goal) { + return prev.findSubtype(goal); + } + prev = next; + } + return prev.findSubtype(goal); + } + }} diff --git orc/src/java/org/apache/orc/impl/ReaderImpl.java orc/src/java/org/apache/orc/impl/ReaderImpl.java index d6df7d7..93fc0ce 100644 --- orc/src/java/org/apache/orc/impl/ReaderImpl.java +++ orc/src/java/org/apache/orc/impl/ReaderImpl.java @@ -572,13 +572,6 @@ public RecordReader rows() throws IOException { @Override public RecordReader rows(Options options) throws IOException { LOG.info("Reading ORC rows from " + path + " with " + options); - boolean[] include = options.getInclude(); - // if included columns is null, then include all columns - if (include == null) { - include = new boolean[types.size()]; - Arrays.fill(include, true); - options.include(include); - } return new RecordReaderImpl(this, options); } diff --git orc/src/java/org/apache/orc/impl/RecordReaderImpl.java orc/src/java/org/apache/orc/impl/RecordReaderImpl.java index 92b6a8b..6f864c3 100644 --- orc/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ orc/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -75,6 +75,7 @@ private final List types; private final int bufferSize; private final SchemaEvolution evolution; + // the file included columns indexed by the file's column ids. private final boolean[] included; private final long rowIndexStride; private long rowInStripe = 0; @@ -95,17 +96,18 @@ /** * Given a list of column names, find the given column and return the index. * - * @param columnNames the list of potential column names + * @param evolution the mapping from reader to file schema * @param columnName the column name to look for - * @param rootColumn offset the result with the rootColumn - * @return the column number or -1 if the column wasn't found + * @return the file column id or -1 if the column wasn't found */ - static int findColumns(String[] columnNames, - String columnName, - int rootColumn) { - for(int i=0; i < columnNames.length; ++i) { - if (columnName.equals(columnNames[i])) { - return i + rootColumn; + static int findColumns(SchemaEvolution evolution, + String columnName) { + TypeDescription readerSchema = evolution.getReaderBaseSchema(); + List fieldNames = readerSchema.getFieldNames(); + List children = readerSchema.getChildren(); + for (int i = 0; i < fieldNames.size(); ++i) { + if (columnName.equals(fieldNames.get(i))) { + return evolution.getFileType(children.get(i).getId()).getId(); } } return -1; @@ -114,40 +116,36 @@ static int findColumns(String[] columnNames, /** * Find the mapping from predicate leaves to columns. * @param sargLeaves the search argument that we need to map - * @param columnNames the names of the columns - * @param rootColumn the offset of the top level row, which offsets the - * result - * @return an array mapping the sarg leaves to concrete column numbers + * @param evolution the mapping from reader to file schema + * @return an array mapping the sarg leaves to file column ids */ public static int[] mapSargColumnsToOrcInternalColIdx(List sargLeaves, - String[] columnNames, - int rootColumn) { + SchemaEvolution evolution) { int[] result = new int[sargLeaves.size()]; Arrays.fill(result, -1); for(int i=0; i < result.length; ++i) { String colName = sargLeaves.get(i).getColumnName(); - result[i] = findColumns(columnNames, colName, rootColumn); + result[i] = findColumns(evolution, colName); } return result; } protected RecordReaderImpl(ReaderImpl fileReader, Reader.Options options) throws IOException { - this.included = options.getInclude(); - included[0] = true; + boolean[] readerIncluded = options.getInclude(); if (options.getSchema() == null) { if (LOG.isInfoEnabled()) { LOG.info("Reader schema not provided -- using file schema " + fileReader.getSchema()); } - evolution = new SchemaEvolution(fileReader.getSchema(), included); + evolution = new SchemaEvolution(fileReader.getSchema(), readerIncluded); } else { // Now that we are creating a record reader for a file, validate that the schema to read // is compatible with the file schema. // evolution = new SchemaEvolution(fileReader.getSchema(), - options.getSchema(),included); + options.getSchema(), readerIncluded); if (LOG.isDebugEnabled() && evolution.hasConversion()) { LOG.debug("ORC file " + fileReader.path.toString() + " has data type conversion --\n" + @@ -164,7 +162,7 @@ protected RecordReaderImpl(ReaderImpl fileReader, SearchArgument sarg = options.getSearchArgument(); if (sarg != null && rowIndexStride != 0) { sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride, - included.length, evolution); + evolution); } else { sargApp = null; } @@ -209,9 +207,10 @@ protected RecordReaderImpl(ReaderImpl fileReader, } reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(), - evolution, included, skipCorrupt); + evolution, readerIncluded, skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; + this.included = evolution.getFileIncluded(); advanceToNextRow(reader, 0L, true); } @@ -710,14 +709,15 @@ private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object private final boolean[] sargColumns; private SchemaEvolution evolution; - public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride, - int includedCount, final SchemaEvolution evolution) { + public SargApplier(SearchArgument sarg, String[] columnNames, + long rowIndexStride, + SchemaEvolution evolution) { this.sarg = sarg; sargLeaves = sarg.getLeaves(); - filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0); + filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, evolution); this.rowIndexStride = rowIndexStride; // included will not be null, row options will fill the array with trues if null - sargColumns = new boolean[includedCount]; + sargColumns = new boolean[evolution.getFileIncluded().length]; for (int i : filterColumns) { // filter columns may have -1 as index which could be partition column in SARG. if (i > 0) { diff --git orc/src/java/org/apache/orc/impl/SchemaEvolution.java orc/src/java/org/apache/orc/impl/SchemaEvolution.java index 7379de9..bb5bcf7 100644 --- orc/src/java/org/apache/orc/impl/SchemaEvolution.java +++ orc/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -32,7 +32,11 @@ // indexed by reader column id private final TypeDescription[] readerFileTypes; // indexed by reader column id - private final boolean[] included; + private final boolean[] readerIncluded; + // the offset to the first column id ignoring any ACID columns + private final int readerColumnOffset; + // indexed by file column id + private final boolean[] fileIncluded; private final TypeDescription fileSchema; private final TypeDescription readerSchema; private boolean hasConversion; @@ -46,20 +50,36 @@ public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) { public SchemaEvolution(TypeDescription fileSchema, TypeDescription readerSchema, boolean[] includeCols) { - this.included = includeCols == null ? null : Arrays.copyOf(includeCols, includeCols.length); + this.readerIncluded = includeCols == null ? null : Arrays.copyOf(includeCols, includeCols.length); this.hasConversion = false; this.fileSchema = fileSchema; + boolean isAcid = checkAcidSchema(fileSchema); + this.readerColumnOffset = isAcid ? acidEventFieldNames.size() : 0; if (readerSchema != null) { - if (checkAcidSchema(fileSchema)) { + if (isAcid) { this.readerSchema = createEventSchema(readerSchema); } else { this.readerSchema = readerSchema; } + if (readerIncluded != null && + readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) { + throw new IllegalArgumentException("Include vector the wrong length: " + + this.readerSchema.toJson() + " with include length " + + readerIncluded.length); + } this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1]; + this.fileIncluded = new boolean[fileSchema.getMaximumId() + 1]; buildConversionFileTypesArray(fileSchema, this.readerSchema); } else { this.readerSchema = fileSchema; this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1]; + this.fileIncluded = readerIncluded; + if (readerIncluded != null && + readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) { + throw new IllegalArgumentException("Include vector the wrong length: " + + this.readerSchema.toJson() + " with include length " + + readerIncluded.length); + } buildSameSchemaFileTypesArray(); } this.ppdSafeConversion = populatePpdSafeConversion(); @@ -70,6 +90,15 @@ public TypeDescription getReaderSchema() { } /** + * Returns the non-ACID (aka base) reader type description. + * + * @return the reader type ignoring the ACID rowid columns, if any + */ + public TypeDescription getReaderBaseSchema() { + return readerSchema.findSubtype(readerColumnOffset); + } + + /** * Is there Schema Evolution data type conversion? * @return */ @@ -82,6 +111,22 @@ public TypeDescription getFileType(TypeDescription readerType) { } /** + * Get whether each column is included from the reader's point of view. + * @return a boolean array indexed by reader column id + */ + public boolean[] getReaderIncluded() { + return readerIncluded; + } + + /** + * Get whether each column is included from the file's point of view. + * @return a boolean array indexed by file column id + */ + public boolean[] getFileIncluded() { + return fileIncluded; + } + + /** * Get the file type by reader type id. * @param id reader column id * @return @@ -189,10 +234,22 @@ private boolean validatePPDConversion(final TypeDescription fileType, return false; } + /** + * Should we read the given reader column? + * @param readerId the id of column in the extended reader schema + * @return true if the column should be read + */ + public boolean includeReaderColumn(int readerId) { + return readerIncluded == null || + readerId <= readerColumnOffset || + readerIncluded[readerId - readerColumnOffset]; + } + void buildConversionFileTypesArray(TypeDescription fileType, TypeDescription readerType) { // if the column isn't included, don't map it - if (included != null && !included[readerType.getId()]) { + int readerId = readerType.getId(); + if (!includeReaderColumn(readerId)) { return; } boolean isOk = true; @@ -266,17 +323,17 @@ void buildConversionFileTypesArray(TypeDescription fileType, hasConversion = true; } if (isOk) { - int id = readerType.getId(); - if (readerFileTypes[id] != null) { + if (readerFileTypes[readerId] != null) { throw new RuntimeException("reader to file type entry already assigned"); } - readerFileTypes[id] = fileType; + readerFileTypes[readerId] = fileType; + fileIncluded[fileType.getId()] = true; } else { throw new IllegalArgumentException( String.format( "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)", fileType.toString(), fileType.getId(), - readerType.toString(), readerType.getId())); + readerType.toString(), readerId)); } } @@ -289,10 +346,10 @@ private void buildSameSchemaFileTypesArray() { } void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) { - if (included != null && !included[readerType.getId()]) { + int id = readerType.getId(); + if (!includeReaderColumn(id)) { return; } - int id = readerType.getId(); if (readerFileTypes[id] != null) { throw new RuntimeException("reader to file type entry already assigned"); } diff --git orc/src/java/org/apache/orc/impl/TreeReaderFactory.java orc/src/java/org/apache/orc/impl/TreeReaderFactory.java index e46a0a4..484209b 100644 --- orc/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ orc/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -2082,8 +2082,7 @@ public static TreeReader createTreeReader(TypeDescription readerType, boolean skipCorrupt ) throws IOException { TypeDescription fileType = evolution.getFileType(readerType); - if (fileType == null || - (included != null && !included[readerType.getId()])) { + if (fileType == null || !evolution.includeReaderColumn(readerType.getId())){ return new NullTreeReader(0); } TypeDescription.Category readerTypeCategory = readerType.getCategory(); diff --git pom.xml pom.xml index 8c58f4c..3fc35bc 100644 --- pom.xml +++ pom.xml @@ -1011,6 +1011,7 @@ false false ${maven.test.jvm.args} + false ${test.conf.dir} ${basedir}/${hive.path.to.root}/conf diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 572953a..b570d40 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -314,7 +314,10 @@ public static RecordReader createReaderFromFile(Reader file, Reader.Options options = new Reader.Options().range(offset, length); options.schema(schema); boolean isOriginal = isOriginal(file); - List types = file.getTypes(); + if (schema == null) { + schema = file.getSchema(); + } + List types = OrcUtils.getOrcTypes(schema); options.include(genIncludedColumns(types, conf, isOriginal)); setSearchArgument(options, types, conf, isOriginal); return file.rowsOptions(options); @@ -1363,7 +1366,7 @@ public String toString() { if (colNames == null) { LOG.warn("Skipping split elimination for {} as column names is null", file.getPath()); } else { - includeStripe = pickStripes(context.sarg, colNames, writerVersion, isOriginal, + includeStripe = pickStripes(context.sarg, writerVersion, stripeStats, stripes.size(), file.getPath(), evolution); } } @@ -2037,8 +2040,9 @@ static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) { return pickStripesInternal(sarg, filterColumns, stripeStats, stripeCount, null, evolution); } - private static boolean[] pickStripes(SearchArgument sarg, String[] sargColNames, - OrcFile.WriterVersion writerVersion, boolean isOriginal, List stripeStats, + private static boolean[] pickStripes(SearchArgument sarg, + OrcFile.WriterVersion writerVersion, + List stripeStats, int stripeCount, Path filePath, final SchemaEvolution evolution) { if (sarg == null || stripeStats == null || writerVersion == OrcFile.WriterVersion.ORIGINAL) { return null; // only do split pruning if HIVE-8732 has been fixed in the writer @@ -2046,7 +2050,7 @@ static int getBucketForSplit(Configuration conf, OrcSplit orcSplit) { // eliminate stripes that doesn't satisfy the predicate condition List sargLeaves = sarg.getLeaves(); int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx(sargLeaves, - sargColNames, getRootColumn(isOriginal)); + evolution); return pickStripesInternal(sarg, filterColumns, stripeStats, stripeCount, filePath, evolution); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index efde2db..fb7a6b2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -393,18 +393,7 @@ private void discoverKeyBounds(Reader reader, static Reader.Options createEventOptions(Reader.Options options) { Reader.Options result = options.clone(); result.range(options.getOffset(), Long.MAX_VALUE); - // slide the columns down by 6 for the include array - if (options.getInclude() != null) { - boolean[] orig = options.getInclude(); - // we always need the base row - orig[0] = true; - boolean[] include = new boolean[orig.length + OrcRecordUpdater.FIELDS]; - Arrays.fill(include, 0, OrcRecordUpdater.FIELDS, true); - for(int i= 0; i < orig.length; ++i) { - include[i + OrcRecordUpdater.FIELDS] = orig[i]; - } - result.include(include); - } + result.include(options.getInclude()); // slide the column names down by 6 for the name array if (options.getColumnNames() != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 74c5071..cbbbb15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.orc.TypeDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,14 +77,6 @@ public RecordReader rows() throws IOException { @Override public RecordReader rowsOptions(Options options) throws IOException { LOG.info("Reading ORC rows from " + path + " with " + options); - boolean[] include = options.getInclude(); - // if included columns is null, then include all columns - if (include == null) { - options = options.clone(); - include = new boolean[types.size()]; - Arrays.fill(include, true); - options.include(include); - } return new RecordReaderImpl(this, options); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 9275aa9..9ee716f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; /** @@ -74,7 +75,6 @@ /** * Do we have schema on read in the configuration variables? */ - List types = file.getTypes(); int dataColumns = rbCtx.getDataColumnCount(); TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns); @@ -91,6 +91,7 @@ } } } + List types = OrcUtils.getOrcTypes(schema); Reader.Options options = new Reader.Options().schema(schema); this.offset = fileSplit.getStart(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 4aac90a..a35a2df 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -25,6 +25,8 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; import java.sql.Date; import java.sql.Timestamp; @@ -54,9 +56,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.TypeDesc; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -66,6 +70,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; @@ -76,6 +81,8 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -114,7 +121,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; -import org.apache.orc.OrcProto; +import org.apache.orc.*; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -2213,7 +2220,7 @@ public void testVectorizationWithAcid() throws Exception { OrcRecordUpdater writer = new OrcRecordUpdater(partDir, new AcidOutputFormat.Options(conf).maximumTransactionId(10) .writingBase(true).bucket(0).inspector(inspector).finalDestination(partDir)); - for(int i=0; i < 100; ++i) { + for (int i = 0; i < 100; ++i) { BigRow row = new BigRow(i); writer.insert(10, row); } @@ -2223,7 +2230,7 @@ public void testVectorizationWithAcid() throws Exception { .setBlocks(new MockBlock("host0", "host1")); // call getsplits - HiveInputFormat inputFormat = + HiveInputFormat inputFormat = new HiveInputFormat(); InputSplit[] splits = inputFormat.getSplits(conf, 10); assertEquals(1, splits.length); @@ -2233,7 +2240,7 @@ public void testVectorizationWithAcid() throws Exception { HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); org.apache.hadoop.mapred.RecordReader - reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); + reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); NullWritable key = reader.createKey(); VectorizedRowBatch value = reader.createValue(); assertEquals(true, reader.next(key, value)); @@ -3682,4 +3689,188 @@ public void testRowNumberUniquenessInDifferentSplits() throws Exception { conf.unset(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname); } } + + /** + * Test schema evolution when using the reader directly. + */ + @Test + public void testSchemaEvolution() throws Exception { + TypeDescription fileSchema = + TypeDescription.fromString("struct,d:string>"); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .fileSystem(fs) + .setSchema(fileSchema) + .compress(org.apache.orc.CompressionKind.NONE)); + VectorizedRowBatch batch = fileSchema.createRowBatch(1000); + batch.size = 1000; + LongColumnVector lcv = ((LongColumnVector) ((StructColumnVector) batch.cols[1]).fields[0]); + for(int r=0; r < 1000; r++) { + ((LongColumnVector) batch.cols[0]).vector[r] = r * 42; + lcv.vector[r] = r * 10001; + ((BytesColumnVector) batch.cols[2]).setVal(r, + Integer.toHexString(r).getBytes(StandardCharsets.UTF_8)); + } + writer.addRowBatch(batch); + writer.close(); + TypeDescription readerSchema = TypeDescription.fromString( + "struct,d:string,future2:int>"); + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader rows = reader.rowsOptions(new Reader.Options() + .schema(readerSchema)); + batch = readerSchema.createRowBatch(); + lcv = ((LongColumnVector) ((StructColumnVector) batch.cols[1]).fields[0]); + LongColumnVector future1 = ((LongColumnVector) ((StructColumnVector) batch.cols[1]).fields[1]); + assertEquals(true, rows.nextBatch(batch)); + assertEquals(1000, batch.size); + assertEquals(true, future1.isRepeating); + assertEquals(true, future1.isNull[0]); + assertEquals(true, batch.cols[3].isRepeating); + assertEquals(true, batch.cols[3].isNull[0]); + for(int r=0; r < batch.size; ++r) { + assertEquals("row " + r, r * 42, ((LongColumnVector) batch.cols[0]).vector[r]); + assertEquals("row " + r, r * 10001, lcv.vector[r]); + assertEquals("row " + r, r * 10001, lcv.vector[r]); + assertEquals("row " + r, Integer.toHexString(r), + ((BytesColumnVector) batch.cols[2]).toString(r)); + } + assertEquals(false, rows.nextBatch(batch)); + rows.close(); + + // try it again with an include vector + rows = reader.rowsOptions(new Reader.Options() + .schema(readerSchema) + .include(new boolean[]{false, true, true, true, false, false, true})); + batch = readerSchema.createRowBatch(); + lcv = ((LongColumnVector) ((StructColumnVector) batch.cols[1]).fields[0]); + future1 = ((LongColumnVector) ((StructColumnVector) batch.cols[1]).fields[1]); + assertEquals(true, rows.nextBatch(batch)); + assertEquals(1000, batch.size); + assertEquals(true, future1.isRepeating); + assertEquals(true, future1.isNull[0]); + assertEquals(true, batch.cols[3].isRepeating); + assertEquals(true, batch.cols[3].isNull[0]); + assertEquals(true, batch.cols[2].isRepeating); + assertEquals(true, batch.cols[2].isNull[0]); + for(int r=0; r < batch.size; ++r) { + assertEquals("row " + r, r * 42, ((LongColumnVector) batch.cols[0]).vector[r]); + assertEquals("row " + r, r * 10001, lcv.vector[r]); + } + assertEquals(false, rows.nextBatch(batch)); + rows.close(); + } + + /** + * Test column projection when using ACID. + */ + @Test + public void testColumnProjectionWithAcid() throws Exception { + Path baseDir = new Path(workDir, "base_00100"); + testFilePath = new Path(baseDir, "bucket_00000"); + fs.mkdirs(baseDir); + fs.delete(testFilePath, true); + TypeDescription fileSchema = + TypeDescription.fromString("struct,d:string>>"); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .fileSystem(fs) + .setSchema(fileSchema) + .compress(org.apache.orc.CompressionKind.NONE)); + VectorizedRowBatch batch = fileSchema.createRowBatch(1000); + batch.size = 1000; + StructColumnVector scv = (StructColumnVector)batch.cols[5]; + // operation + batch.cols[0].isRepeating = true; + ((LongColumnVector) batch.cols[0]).vector[0] = 0; + // original transaction + batch.cols[1].isRepeating = true; + ((LongColumnVector) batch.cols[1]).vector[0] = 1; + // bucket + batch.cols[2].isRepeating = true; + ((LongColumnVector) batch.cols[2]).vector[0] = 0; + // current transaction + batch.cols[4].isRepeating = true; + ((LongColumnVector) batch.cols[4]).vector[0] = 1; + + LongColumnVector lcv = (LongColumnVector) + ((StructColumnVector) scv.fields[1]).fields[0]; + for(int r=0; r < 1000; r++) { + // row id + ((LongColumnVector) batch.cols[3]).vector[r] = r; + // a + ((LongColumnVector) scv.fields[0]).vector[r] = r * 42; + // b.c + lcv.vector[r] = r * 10001; + // d + ((BytesColumnVector) scv.fields[2]).setVal(r, + Integer.toHexString(r).getBytes(StandardCharsets.UTF_8)); + } + writer.addRowBatch(batch); + writer.addUserMetadata(OrcRecordUpdater.ACID_KEY_INDEX_NAME, + ByteBuffer.wrap("0,0,999".getBytes(StandardCharsets.UTF_8))); + writer.close(); + long fileLength = fs.getFileStatus(testFilePath).getLen(); + + // test with same schema with include + conf.set(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.toString(), "1"); + conf.set(ValidTxnList.VALID_TXNS_KEY, "100:99:"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "a,b,d"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "int,struct,string"); + conf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2"); + OrcSplit split = new OrcSplit(testFilePath, null, 0, fileLength, + new String[0], null, false, true, + new ArrayList(), fileLength, fileLength); + OrcInputFormat inputFormat = new OrcInputFormat(); + AcidInputFormat.RowReader reader = inputFormat.getReader(split, + new AcidInputFormat.Options(conf)); + int record = 0; + RecordIdentifier id = reader.createKey(); + OrcStruct struct = reader.createValue(); + while (reader.next(id, struct)) { + assertEquals("id " + record, record, id.getRowId()); + assertEquals("bucket " + record, 0, id.getBucketId()); + assertEquals("trans " + record, 1, id.getTransactionId()); + assertEquals("a " + record, + 42 * record, ((IntWritable) struct.getFieldValue(0)).get()); + assertEquals(null, struct.getFieldValue(1)); + assertEquals("d " + record, + Integer.toHexString(record), struct.getFieldValue(2).toString()); + record += 1; + } + assertEquals(1000, record); + reader.close(); + + // test with schema evolution and include + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "a,b,d,f"); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "int,struct,string,int"); + conf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0,2,3"); + split = new OrcSplit(testFilePath, null, 0, fileLength, + new String[0], null, false, true, + new ArrayList(), fileLength, fileLength); + inputFormat = new OrcInputFormat(); + reader = inputFormat.getReader(split, new AcidInputFormat.Options(conf)); + record = 0; + id = reader.createKey(); + struct = reader.createValue(); + while (reader.next(id, struct)) { + assertEquals("id " + record, record, id.getRowId()); + assertEquals("bucket " + record, 0, id.getBucketId()); + assertEquals("trans " + record, 1, id.getTransactionId()); + assertEquals("a " + record, + 42 * record, ((IntWritable) struct.getFieldValue(0)).get()); + assertEquals(null, struct.getFieldValue(1)); + assertEquals("d " + record, + Integer.toHexString(record), struct.getFieldValue(2).toString()); + assertEquals("f " + record, null, struct.getFieldValue(3)); + record += 1; + } + assertEquals(1000, record); + reader.close(); + } }