diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 84e559d..ef27111 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -1095,7 +1095,8 @@ public void testConcurrentTransactionBatchCommits() throws Exception { Reader reader = OrcFile.createReader(orcFile, OrcFile.readerOptions(conf).filesystem(fs)); - RecordReader rows = reader.rows(); + Reader.Options options = new Reader.Options().lowLevelAccess(true); + RecordReader rows = reader.rowsOptions(options); StructObjectInspector inspector = (StructObjectInspector) reader .getObjectInspector(); @@ -1718,7 +1719,7 @@ public void testErrorHandling() throws Exception { txnBatch.write("name4,2,more Streaming unlimited".getBytes()); txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); txnBatch.commit(); - + expectedEx = null; txnBatch.beginNextTransaction(); writer.enableErrors(); @@ -1764,7 +1765,7 @@ public void testErrorHandling() throws Exception { } Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"), expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred")); - + r = msClient.showTxns(); Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark()); ti = r.getOpen_txns(); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 1befba7..f0a1b3e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -704,7 +704,7 @@ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, String[] colNamesForSarg = OrcInputFormat.getSargColumnNames( columnNames, types, globalIncludes, fileMetadata.isOriginalFormat()); sargApp = new RecordReaderImpl.SargApplier(sarg, colNamesForSarg, - rowIndexStride, types, globalIncludes.length); + rowIndexStride, globalIncludes.length); } boolean hasAnyData = false; // readState should have been initialized by this time with an empty array. diff --git orc/src/java/org/apache/orc/OrcUtils.java orc/src/java/org/apache/orc/OrcUtils.java index dc83b9c..0f33b0a 100644 --- orc/src/java/org/apache/orc/OrcUtils.java +++ orc/src/java/org/apache/orc/OrcUtils.java @@ -246,7 +246,7 @@ public static void appendOrcTypesRebuildSubtypes(List result, { // Make room for MAP type. result.add(null); - + // Add MAP type pair in order to determine their subtype values. appendOrcTypesRebuildSubtypes(result, children.get(0)); int subtype2 = result.size(); @@ -384,7 +384,7 @@ public static int appendOrcTypesRebuildSubtypes(List result, { // Make room for MAP type. result.add(null); - + // Add MAP type pair in order to determine their subtype values. columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); int subtype2 = result.size(); @@ -538,4 +538,47 @@ TypeDescription convertTypeFromProtobuf(List types, } return result; } + + /** + * Determine whether a schema has the ACID struct by examining whether the transaction related + * column names are present. + * @param type + * @return + */ + public static boolean checkAcidSchema(TypeDescription type) { + if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { + List rootFields = type.getFieldNames(); + if (acidEventFieldNames.equals(rootFields)) { + return true; + } + } + return false; + } + + /** + * @param typeDescr + * @return ORC types for the ACID event based on the row's type description + */ + public static TypeDescription createEventSchema(TypeDescription typeDescr) { + TypeDescription result = TypeDescription.createStruct() + .addField("operation", TypeDescription.createInt()) + .addField("originalTransaction", TypeDescription.createLong()) + .addField("bucket", TypeDescription.createInt()) + .addField("rowId", TypeDescription.createLong()) + .addField("currentTransaction", TypeDescription.createLong()) + .addField("row", typeDescr.clone()); + return result; + } + + public static int ACID_ROW_CHILD = 6; + + public static final List acidEventFieldNames= new ArrayList(); + static { + acidEventFieldNames.add("operation"); + acidEventFieldNames.add("originalTransaction"); + acidEventFieldNames.add("bucket"); + acidEventFieldNames.add("rowId"); + acidEventFieldNames.add("currentTransaction"); + acidEventFieldNames.add("row"); + } } diff --git orc/src/java/org/apache/orc/Reader.java orc/src/java/org/apache/orc/Reader.java index c2d5235..6b38608 100644 --- orc/src/java/org/apache/orc/Reader.java +++ orc/src/java/org/apache/orc/Reader.java @@ -157,6 +157,7 @@ private Boolean skipCorruptRecords = null; private TypeDescription schema = null; private DataReader dataReader = null; + private boolean lowLevelAccess = false; /** * Set the list of columns to read. @@ -225,6 +226,15 @@ public Options skipCorruptRecords(boolean value) { return this; } + /** + * Set whether the reader wants to read the file schema ignoring Schema Evolution and ACID. + * @return + */ + public Options lowLevelAccess(boolean lowLevelAccess) { + this.lowLevelAccess = lowLevelAccess; + return this; + } + public boolean[] getInclude() { return include; } @@ -269,6 +279,10 @@ public DataReader getDataReader() { return dataReader; } + public boolean getLowLevelAccess() { + return lowLevelAccess; + } + public Options clone() { Options result = new Options(); result.include = include; diff --git orc/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java orc/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java index 9e5f5cc..74e9b1b 100644 --- orc/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java +++ orc/src/java/org/apache/orc/impl/ConvertTreeReaderFactory.java @@ -2065,7 +2065,6 @@ private static TreeReader createAnyIntegerConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from (BOOLEAN, BYTE, SHORT, INT, LONG) to schema type. @@ -2122,7 +2121,6 @@ private static TreeReader createFloatConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from FLOAT to schema type. @@ -2171,7 +2169,6 @@ private static TreeReader createDoubleConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from DOUBLE to schema type. @@ -2220,7 +2217,6 @@ private static TreeReader createDecimalConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from DECIMAL to schema type. @@ -2268,7 +2264,6 @@ private static TreeReader createStringConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from STRING to schema type. @@ -2325,7 +2320,6 @@ private static TreeReader createCharConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from CHAR to schema type. @@ -2381,7 +2375,6 @@ private static TreeReader createVarcharConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from VARCHAR to schema type. @@ -2437,7 +2430,6 @@ private static TreeReader createTimestampConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from TIMESTAMP to schema type. @@ -2488,7 +2480,6 @@ private static TreeReader createDateConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from DATE to schema type. @@ -2531,7 +2522,6 @@ private static TreeReader createBinaryConvertTreeReader(int columnId, TypeDescription fileType, TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { // CONVERT from DATE to schema type. @@ -2697,7 +2687,6 @@ private static TreeReader createBinaryConvertTreeReader(int columnId, */ public static TreeReader createConvertTreeReader(TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt ) throws IOException { @@ -2712,43 +2701,43 @@ public static TreeReader createConvertTreeReader(TypeDescription readerType, case INT: case LONG: return createAnyIntegerConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case FLOAT: return createFloatConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case DOUBLE: return createDoubleConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case DECIMAL: return createDecimalConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case STRING: return createStringConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case CHAR: return createCharConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case VARCHAR: return createVarcharConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case TIMESTAMP: return createTimestampConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case DATE: return createDateConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); case BINARY: return createBinaryConvertTreeReader(columnId, fileType, readerType, evolution, - included, skipCorrupt); + skipCorrupt); // UNDONE: Complex conversions... case STRUCT: diff --git orc/src/java/org/apache/orc/impl/ReaderImpl.java orc/src/java/org/apache/orc/impl/ReaderImpl.java index d6df7d7..93fc0ce 100644 --- orc/src/java/org/apache/orc/impl/ReaderImpl.java +++ orc/src/java/org/apache/orc/impl/ReaderImpl.java @@ -572,13 +572,6 @@ public RecordReader rows() throws IOException { @Override public RecordReader rows(Options options) throws IOException { LOG.info("Reading ORC rows from " + path + " with " + options); - boolean[] include = options.getInclude(); - // if included columns is null, then include all columns - if (include == null) { - include = new boolean[types.size()]; - Arrays.fill(include, true); - options.include(include); - } return new RecordReaderImpl(this, options); } diff --git orc/src/java/org/apache/orc/impl/RecordReaderImpl.java orc/src/java/org/apache/orc/impl/RecordReaderImpl.java index 36a802e..abe6c18 100644 --- orc/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ orc/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -134,24 +134,26 @@ static int findColumns(String[] columnNames, protected RecordReaderImpl(ReaderImpl fileReader, Reader.Options options) throws IOException { - SchemaEvolution treeReaderSchema; - this.included = options.getInclude(); - included[0] = true; - if (options.getSchema() == null) { + SchemaEvolution evolution; + if (options.getLowLevelAccess()) { + evolution = new SchemaEvolution(fileReader.getSchema()); + } else if (options.getSchema() == null) { if (LOG.isInfoEnabled()) { LOG.info("Schema on read not provided -- using file schema " + fileReader.getSchema()); } - treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included); + evolution = new SchemaEvolution(fileReader.getSchema(), options.getInclude(), + options.getColumnNames()); } else { // Now that we are creating a record reader for a file, validate that the schema to read // is compatible with the file schema. // - treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), - options.getSchema(),included); + evolution = new SchemaEvolution(fileReader.getSchema(), options.getSchema(), + options.getInclude(), options.getColumnNames()); } - this.schema = treeReaderSchema.getReaderSchema(); + this.schema = evolution.getReaderSchema(); + this.included = evolution.getReaderIncluded(); this.path = fileReader.path; this.codec = fileReader.codec; this.types = fileReader.types; @@ -160,8 +162,7 @@ protected RecordReaderImpl(ReaderImpl fileReader, SearchArgument sarg = options.getSearchArgument(); if (sarg != null && rowIndexStride != 0) { sargApp = new SargApplier( - sarg, options.getColumnNames(), rowIndexStride, types, - included.length); + sarg, evolution.getReaderColumnNames(), rowIndexStride, included.length); } else { sargApp = null; } @@ -205,8 +206,8 @@ protected RecordReaderImpl(ReaderImpl fileReader, skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf); } - reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(), - treeReaderSchema, included, skipCorrupt); + reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(), evolution, + skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; advanceToNextRow(reader, 0L, true); @@ -707,7 +708,7 @@ private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object private final boolean[] sargColumns; public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride, - List types, int includedCount) { + int includedCount) { this.sarg = sarg; sargLeaves = sarg.getLeaves(); filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0); diff --git orc/src/java/org/apache/orc/impl/SchemaEvolution.java orc/src/java/org/apache/orc/impl/SchemaEvolution.java index a6c1d60..b6a7e74 100644 --- orc/src/java/org/apache/orc/impl/SchemaEvolution.java +++ orc/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -20,51 +20,213 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; +import org.apache.orc.TypeDescription.Category; + +import com.google.common.base.Preconditions; /** * Take the file types and the (optional) configuration column names/types and see if there * has been schema evolution. + * + * The logical schema is schema desired by the client of the ORC input file format. E.g. Hive. + * + * The reader schema will be same as the logical schema for non-ACID ORC files. But for ACID ORC + * files, the reader schema will have the wrapper ACID STRUCT. In this case, the logical schema + * will be the ROW field of the ACID STRUCT. + * + * The file schema is has the actual file columns. If it is different than the reader schema, then + * there is Schema Evolution. + * + * The getFileType method goes from a reader type to a file type. It is for helping + * with creating the tree readers. If the column is not needed (i.e. not included), getFileType + * returns null. */ public class SchemaEvolution { - private final Map readerToFile; - private final boolean[] included; + private final TypeDescription logicalSchema; + private final boolean[] logicalIncluded; + private final int logicalFirstId; + private final String[] logicalColumnNames; private final TypeDescription readerSchema; + private final boolean[] readerIncluded; + private final String[] readerColumnNames; + private final Map readerToFile; + private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); - public SchemaEvolution(TypeDescription readerSchema, boolean[] included) { - this.included = included; + /** + * Use when the logical schema and file schema are identical. + * + * I.e. when no schema evolution is needed. + * + * @param logicalSchema + * @param logicalIncluded The columns to include from the schema. Indexed by the schema + * type id. + * @param logicalColumnNames The column names. Can be null. + */ + public SchemaEvolution(TypeDescription logicalSchema, + boolean[] logicalIncluded, + String[] logicalColumnNames) { + + // Reading ACID tables must go through the next constructor that accepts file and logical + // schema. Or, it needs to go through the low level access constructor for file dump, etc. + Preconditions.checkState(!OrcUtils.checkAcidSchema(logicalSchema)); + + this.logicalSchema = logicalSchema; + this.logicalIncluded = makeLogicalIncluded(logicalIncluded); + logicalFirstId = 0; + this.logicalColumnNames = logicalColumnNames; + + // No mapping. readerToFile = null; - this.readerSchema = readerSchema; + + readerSchema = this.logicalSchema; + readerIncluded = this.logicalIncluded; + readerColumnNames = this.logicalColumnNames; } + /** + * Use when the file and reader schema may be different and require schema evolution. + * + * @param fileSchema + * @param logicalSchema The schema (excluding ACID metadata columns) desired by the reader. + * @param logicalIncluded The columns to include from the logical schema. Indexed by the + * schema type id. + * @throws IOException + */ public SchemaEvolution(TypeDescription fileSchema, - TypeDescription readerSchema, - boolean[] included) throws IOException { - readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1); - this.included = included; - if (checkAcidSchema(fileSchema)) { - this.readerSchema = createEventSchema(readerSchema); + TypeDescription logicalSchema, + boolean[] logicalIncluded, + String[] logicalColumnNames) throws IOException { + Preconditions.checkState(fileSchema.getCategory() == Category.STRUCT); + Preconditions.checkState(logicalSchema.getCategory() == Category.STRUCT); + + this.logicalSchema = logicalSchema; + this.logicalIncluded = makeLogicalIncluded(logicalIncluded); + + /* + * We need special handling for ACID. + */ + if (OrcUtils.checkAcidSchema(fileSchema)) { + readerSchema = OrcUtils.createEventSchema(this.logicalSchema); + + /* + * We need the range of type ids for the logical schema so we can create the + * logicalFileTypes below. + */ + logicalFirstId = OrcUtils.ACID_ROW_CHILD; } else { - this.readerSchema = readerSchema; + this.readerSchema = this.logicalSchema; + logicalFirstId = 0; } - buildMapping(fileSchema, this.readerSchema); + this.logicalColumnNames = logicalColumnNames; + + readerIncluded = makeReaderIncluded(); + readerColumnNames = makeReaderColumnNames(); + + readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1); + + // Void call to make sure all the ids are assigned. + fileSchema.getId(); + + buildMapping(fileSchema, readerSchema); + } + + /** + * Use for low level reading of file schema. + * + * @param schema + */ + public SchemaEvolution(TypeDescription fileSchema) { + + logicalSchema = fileSchema; + logicalIncluded = makeLogicalIncluded(null); + logicalFirstId = 0; + logicalColumnNames = null; + + // No mapping. + readerToFile = null; + + readerSchema = logicalSchema; + readerIncluded = logicalIncluded; + readerColumnNames = null; } + /** + * The schema desired by the reader. + * + * For an ACID table, it will have the ACID metadata. + * + * @return + */ public TypeDescription getReaderSchema() { return readerSchema; } + /** + * The needed columns from the reader schema. + * + * For an ACID table, the boolean array will include the ACID metadata columns. + * + * We always provide a non-null include boolean area as a convenience. + * + * @return + */ + public boolean[] getReaderIncluded() { + return readerIncluded; + } + + /** + * The reader column names when logical column names were provided to the constructor. + * + * For an ACID table, the column name array will have the ACID metadata columns. + * + * @return + */ + public String[] getReaderColumnNames() { + return readerColumnNames; + } + + /** + * The schema desired by the reader *without* ACID metadata columns. + * + * @return + */ + public TypeDescription getLogicalSchema() { + return logicalSchema; + } + + /** + * The needed columns from the logical schema. + * + * For an ACID table, the boolean array will *not* include the ACID metadata columns. + * + * We always provide a non-null include boolean area as a convenience. + * + * @return + */ + public boolean[] getLogicalIncluded() { + return logicalIncluded; + } + + + /** + * Get the file type for a reader type. + * @param readerType + * @return + */ public TypeDescription getFileType(TypeDescription readerType) { TypeDescription result; if (readerToFile == null) { - if (included == null || included[readerType.getId()]) { + if (readerIncluded[readerType.getId()]) { result = readerType; } else { result = null; @@ -77,8 +239,8 @@ public TypeDescription getFileType(TypeDescription readerType) { void buildMapping(TypeDescription fileType, TypeDescription readerType) throws IOException { - // if the column isn't included, don't map it - if (included != null && !included[readerType.getId()]) { + // When a column isn't included, don't map it. + if (!readerIncluded[readerType.getId()]) { return; } boolean isOk = true; @@ -151,38 +313,56 @@ void buildMapping(TypeDescription fileType, } } - private static boolean checkAcidSchema(TypeDescription type) { - if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { - List rootFields = type.getFieldNames(); - if (acidEventFieldNames.equals(rootFields)) { - return true; + private boolean[] makeLogicalIncluded(boolean[] logicalIncluded) { + final int logicalLength = logicalSchema.getMaximumId() + 1; + boolean[] result = new boolean[logicalLength]; + if (logicalIncluded == null) { + Arrays.fill(result, true); + } else { + + // When logicalIncluded is shorter, then extra columns are not included. + System.arraycopy(logicalIncluded, 0, result, 0, logicalIncluded.length); + + if (logicalSchema.getCategory() == Category.STRUCT) { + // Always include top level struct. + result[0] = true; } } - return false; + return result; } - /** - * @param typeDescr - * @return ORC types for the ACID event based on the row's type description - */ - public static TypeDescription createEventSchema(TypeDescription typeDescr) { - TypeDescription result = TypeDescription.createStruct() - .addField("operation", TypeDescription.createInt()) - .addField("originalTransaction", TypeDescription.createLong()) - .addField("bucket", TypeDescription.createInt()) - .addField("rowId", TypeDescription.createLong()) - .addField("currentTransaction", TypeDescription.createLong()) - .addField("row", typeDescr.clone()); + private boolean[] makeReaderIncluded() { + final int logicalLength = logicalSchema.getMaximumId() + 1; + final int readerLength = readerSchema.getMaximumId() + 1; + boolean[] result; + if (logicalLength == readerLength) { + result = logicalIncluded; + } else { + result = new boolean[readerLength]; + Arrays.fill(result, true); + System.arraycopy(logicalIncluded, 0, result, logicalFirstId, logicalLength); + } return result; } - public static final List acidEventFieldNames= new ArrayList(); - static { - acidEventFieldNames.add("operation"); - acidEventFieldNames.add("originalTransaction"); - acidEventFieldNames.add("bucket"); - acidEventFieldNames.add("rowId"); - acidEventFieldNames.add("currentTransaction"); - acidEventFieldNames.add("row"); + /** + * If logical column names are provided, return the reader column name array. + * Otherwise, null is returned. + * @return + */ + private String[] makeReaderColumnNames() { + if (logicalColumnNames == null) { + return null; + } + final int logicalLength = logicalSchema.getMaximumId() + 1; + final int readerLength = readerSchema.getMaximumId() + 1; + String[] result; + if (logicalLength == readerLength) { + result = logicalColumnNames; + } else { + result = new String[readerLength]; + System.arraycopy(logicalColumnNames, 0, result, logicalFirstId, logicalLength); + } + return result; } } diff --git orc/src/java/org/apache/orc/impl/TreeReaderFactory.java orc/src/java/org/apache/orc/impl/TreeReaderFactory.java index c4a2093..5d8daf2 100644 --- orc/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ orc/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -1705,7 +1705,6 @@ public void nextVector(ColumnVector previousVector, protected StructTreeReader(int columnId, TypeDescription readerSchema, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); @@ -1713,7 +1712,7 @@ protected StructTreeReader(int columnId, this.fields = new TreeReader[childrenTypes.size()]; for (int i = 0; i < fields.length; ++i) { TypeDescription subtype = childrenTypes.get(i); - this.fields[i] = createTreeReader(subtype, evolution, included, skipCorrupt); + this.fields[i] = createTreeReader(subtype, evolution, skipCorrupt); } } @@ -1790,7 +1789,6 @@ void skipRows(long items) throws IOException { protected UnionTreeReader(int fileColumn, TypeDescription readerSchema, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { super(fileColumn); List childrenTypes = readerSchema.getChildren(); @@ -1798,7 +1796,7 @@ protected UnionTreeReader(int fileColumn, this.fields = new TreeReader[fieldCount]; for (int i = 0; i < fieldCount; ++i) { TypeDescription subtype = childrenTypes.get(i); - this.fields[i] = createTreeReader(subtype, evolution, included, skipCorrupt); + this.fields[i] = createTreeReader(subtype, evolution, skipCorrupt); } } @@ -1867,12 +1865,10 @@ void skipRows(long items) throws IOException { protected ListTreeReader(int fileColumn, TypeDescription readerSchema, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { super(fileColumn); TypeDescription elementType = readerSchema.getChildren().get(0); - elementReader = createTreeReader(elementType, evolution, included, - skipCorrupt); + elementReader = createTreeReader(elementType, evolution, skipCorrupt); } @Override @@ -1947,13 +1943,12 @@ void skipRows(long items) throws IOException { protected MapTreeReader(int fileColumn, TypeDescription readerSchema, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt) throws IOException { super(fileColumn); TypeDescription keyType = readerSchema.getChildren().get(0); TypeDescription valueType = readerSchema.getChildren().get(1); - keyReader = createTreeReader(keyType, evolution, included, skipCorrupt); - valueReader = createTreeReader(valueType, evolution, included, skipCorrupt); + keyReader = createTreeReader(keyType, evolution, skipCorrupt); + valueReader = createTreeReader(valueType, evolution, skipCorrupt); } @Override @@ -2028,12 +2023,10 @@ void skipRows(long items) throws IOException { public static TreeReader createTreeReader(TypeDescription readerType, SchemaEvolution evolution, - boolean[] included, boolean skipCorrupt ) throws IOException { TypeDescription fileType = evolution.getFileType(readerType); - if (fileType == null || - (included != null && !included[readerType.getId()])) { + if (fileType == null) { return new NullTreeReader(0); } TypeDescription.Category readerTypeCategory = readerType.getCategory(); @@ -2043,8 +2036,7 @@ public static TreeReader createTreeReader(TypeDescription readerType, readerTypeCategory != TypeDescription.Category.LIST && readerTypeCategory != TypeDescription.Category.UNION)) { // We only convert complex children. - return ConvertTreeReaderFactory.createConvertTreeReader(readerType, evolution, - included, skipCorrupt); + return ConvertTreeReaderFactory.createConvertTreeReader(readerType, evolution, skipCorrupt); } switch (readerTypeCategory) { case BOOLEAN: @@ -2078,16 +2070,16 @@ public static TreeReader createTreeReader(TypeDescription readerType, readerType.getScale()); case STRUCT: return new StructTreeReader(fileType.getId(), readerType, - evolution, included, skipCorrupt); + evolution, skipCorrupt); case LIST: return new ListTreeReader(fileType.getId(), readerType, - evolution, included, skipCorrupt); + evolution, skipCorrupt); case MAP: return new MapTreeReader(fileType.getId(), readerType, evolution, - included, skipCorrupt); + skipCorrupt); case UNION: return new UnionTreeReader(fileType.getId(), readerType, - evolution, included, skipCorrupt); + evolution, skipCorrupt); default: throw new IllegalArgumentException("Unsupported type " + readerTypeCategory); diff --git orc/src/java/org/apache/orc/tools/FileDump.java orc/src/java/org/apache/orc/tools/FileDump.java index 1a1d8ab..8219867 100644 --- orc/src/java/org/apache/orc/tools/FileDump.java +++ orc/src/java/org/apache/orc/tools/FileDump.java @@ -334,7 +334,8 @@ private static void printMetaDataImpl(final String filename, System.out.println("Structure for " + filename); System.out.println("File Version: " + reader.getFileVersion().getName() + " with " + reader.getWriterVersion()); - RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + Reader.Options options = new Reader.Options().lowLevelAccess(true); + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(options); System.out.println("Rows: " + reader.getNumberOfRows()); System.out.println("Compression: " + reader.getCompressionKind()); if (reader.getCompressionKind() != CompressionKind.NONE) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 69d58d6..625fb16 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -116,6 +116,7 @@ import org.apache.orc.OrcProto; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.CodedInputStream; @@ -306,14 +307,15 @@ public static RecordReader createReaderFromFile(Reader file, /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE); + TypeDescription logicalSchema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE); Reader.Options options = new Reader.Options().range(offset, length); - options.schema(schema); - boolean isOriginal = isOriginal(file); - List types = file.getTypes(); - options.include(genIncludedColumns(types, conf, isOriginal)); - setSearchArgument(options, types, conf, isOriginal); + options.schema(logicalSchema); + if (logicalSchema == null) { + logicalSchema = file.getSchema(); + } + options.include(genIncludedColumns(logicalSchema, conf)); + setSearchArgument(options, logicalSchema, conf); return (RecordReader) file.rowsOptions(options); } @@ -323,6 +325,114 @@ public static boolean isOriginal(Reader file) { /** * Recurse down into a type subtree turning on all of the sub-columns. + * + * NOTE: Currently, the included columns are just the top level columns. Any STRUCT type columns + * will have all of their columns included, recursively. There is open HIVE-13873 that would + * enhance things to have included columns have depth into STRUCT type columns so unnecessary + * STRUCT columns are not read. + * + * @param typeDescr the possible complex type. + * @param result the global view of columns that should be included + */ + private static void includeColumnRecursive(TypeDescription typeDescr, + boolean[] result) { + result[typeDescr.getId()] = true; + final List children = typeDescr.getChildren(); + if (children == null) { + return; + } + final int numChildren = children.size(); + for(int i=0; i < numChildren; ++i) { + TypeDescription child = children.get(i); + includeColumnRecursive(child, result); + } + } + + /** + * Generate an included boolean array from the reader schema and list of column ids. + * @param logicalSchema + * @param included + * @return + */ + public static boolean[] genIncludedColumns(TypeDescription logicalSchema, + List included) { + boolean[] result = new boolean[logicalSchema.getMaximumId() + 1]; + result[0] = true; + final List children = logicalSchema.getChildren(); + final int numChildren = children.size(); + for (int i = 0; i < numChildren; ++i) { + TypeDescription child = children.get(i); + if (included.contains(i)) { + includeColumnRecursive(child, result); + } + } + return result; + } + + /** + * Determine the logical included columns from the reader schema and configuration. + * + * @param schema + * @param conf + * @return Will be null if included colummns are not present in the configuration. + */ + public static boolean[] genIncludedColumns( + TypeDescription schema, Configuration conf) { + if (!ColumnProjectionUtils.isReadAllColumns(conf)) { + List included = ColumnProjectionUtils.getReadColumnIDs(conf); + return genIncludedColumns(schema, included); + } else { + return null; + } + } + + public static String[] getSargColumnNames(String[] originalColumnNames, + TypeDescription logicalSchema, boolean[] includedColumns) { + String[] columnNames = new String[logicalSchema.getMaximumId() + 1]; + int i = 0; + final List children = logicalSchema.getChildren(); + // The way this works is as such. originalColumnNames is the equivalent on getNeededColumns + // from TSOP. They are assumed to be in the same order as the columns in ORC file, AND they are + // assumed to be equivalent to the columns in includedColumns (because it was generated from + // we go thru all the top level ORC file columns that are included, in order, they match + // originalColumnNames. This way, we do not depend on names stored inside ORC for SARG leaf + // column name resolution (see mapSargColumns method). + for(TypeDescription child : children) { + int id = child.getId(); + if (includedColumns == null || includedColumns[id]) { + // this is guaranteed to be positive because types only have children + // ids greater than their own id. + columnNames[id] = originalColumnNames[i++]; + } + } + return columnNames; + } + + static void setSearchArgument(Reader.Options options, + TypeDescription logicalSchema, + Configuration conf) { + String neededColumnNames = getNeededColumnNamesString(conf); + if (neededColumnNames == null) { + LOG.debug("No ORC pushdown predicate - no column names"); + options.searchArgument(null, null); + return; + } + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf); + if (sarg == null) { + LOG.debug("No ORC pushdown predicate"); + options.searchArgument(null, null); + return; + } + + if (LOG.isInfoEnabled()) { + LOG.info("ORC pushdown predicate: " + sarg); + } + options.searchArgument(sarg, getSargColumnNames( + neededColumnNames.split(","), logicalSchema, options.getInclude())); + } + + /** + * Recurse down into a type subtree turning on all of the sub-columns. * @param types the types of the file * @param result the global view of columns that should be included * @param typeId the root of tree to enable @@ -1703,19 +1813,18 @@ public float getProgress() throws IOException { /** - * Do we have schema on read in the configuration variables? + * We always have the schema in the configuration variables for ACID. */ - TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); + final TypeDescription logicalSchema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); + Preconditions.checkState(logicalSchema != null); final Reader reader; final int bucket; - Reader.Options readOptions = new Reader.Options().schema(schema); + Reader.Options readOptions = new Reader.Options().schema(logicalSchema); readOptions.range(split.getStart(), split.getLength()); - // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription. - final List schemaTypes = OrcUtils.getOrcTypes(schema); - readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL)); - setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); + readOptions.include(genIncludedColumns(logicalSchema, conf)); + setSearchArgument(readOptions, logicalSchema, conf); if (split.hasBase()) { bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) @@ -1741,7 +1850,7 @@ public float getProgress() throws IOException { @Override public ObjectInspector getObjectInspector() { - return OrcStruct.createObjectInspector(0, schemaTypes); + return OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(logicalSchema)); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index b9094bf..010bdd8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -146,7 +146,7 @@ public int compareTo(RecordIdentifier other) { private boolean isSameRow(ReaderKey other) { return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; } - + public long getCurrentTransactionId() { return currentTransactionId; } @@ -385,40 +385,6 @@ private void discoverKeyBounds(Reader reader, } /** - * Convert from the row include/sarg/columnNames to the event equivalent - * for the underlying file. - * @param options options for the row reader - * @return a cloned options object that is modified for the event reader - */ - static Reader.Options createEventOptions(Reader.Options options) { - Reader.Options result = options.clone(); - result.range(options.getOffset(), Long.MAX_VALUE); - // slide the columns down by 6 for the include array - if (options.getInclude() != null) { - boolean[] orig = options.getInclude(); - // we always need the base row - orig[0] = true; - boolean[] include = new boolean[orig.length + OrcRecordUpdater.FIELDS]; - Arrays.fill(include, 0, OrcRecordUpdater.FIELDS, true); - for(int i= 0; i < orig.length; ++i) { - include[i + OrcRecordUpdater.FIELDS] = orig[i]; - } - result.include(include); - } - - // slide the column names down by 6 for the name array - if (options.getColumnNames() != null) { - String[] orig = options.getColumnNames(); - String[] cols = new String[orig.length + OrcRecordUpdater.FIELDS]; - for(int i=0; i < orig.length; ++i) { - cols[i + OrcRecordUpdater.FIELDS] = orig[i]; - } - result.searchArgument(options.getSearchArgument(), cols); - } - return result; - } - - /** * Create a reader that merge sorts the ACID events together. * @param conf the configuration * @param collapseEvents should the events on the same row be collapsed @@ -442,14 +408,15 @@ private void discoverKeyBounds(Reader reader, this.length = options.getLength(); this.validTxnList = validTxnList; - TypeDescription typeDescr = + TypeDescription logicalSchema = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); + options.schema(logicalSchema); objectInspector = OrcRecordUpdater.createEventSchema - (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); + (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(logicalSchema))); + + Reader.Options eventOptions = options.clone().range(options.getOffset(), Long.MAX_VALUE); - // modify the options to reflect the event instead of the base row - Reader.Options eventOptions = createEventOptions(options); if (reader == null) { baseReader = null; } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 9bcdb39..4160e77 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -76,13 +76,6 @@ public RecordReader rows() throws IOException { @Override public RecordReader rowsOptions(Options options) throws IOException { LOG.info("Reading ORC rows from " + path + " with " + options); - boolean[] include = options.getInclude(); - // if included columns is null, then include all columns - if (include == null) { - include = new boolean[types.size()]; - Arrays.fill(include, true); - options.include(include); - } return new RecordReaderImpl(this, options); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 9275aa9..09eb6bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -74,30 +74,29 @@ /** * Do we have schema on read in the configuration variables? */ - List types = file.getTypes(); int dataColumns = rbCtx.getDataColumnCount(); - TypeDescription schema = + TypeDescription logicalSchema = OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns); - if (schema == null) { - schema = file.getSchema(); + if (logicalSchema == null) { + logicalSchema = file.getSchema(); // Even if the user isn't doing schema evolution, cut the schema // to the desired size. - if (schema.getCategory() == TypeDescription.Category.STRUCT && - schema.getChildren().size() > dataColumns) { - schema = schema.clone(); - List children = schema.getChildren(); + if (logicalSchema.getCategory() == TypeDescription.Category.STRUCT && + logicalSchema.getChildren().size() > dataColumns) { + logicalSchema = logicalSchema.clone(); + List children = logicalSchema.getChildren(); for(int c = children.size() - 1; c >= dataColumns; --c) { children.remove(c); } } } - Reader.Options options = new Reader.Options().schema(schema); + Reader.Options options = new Reader.Options().schema(logicalSchema); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); options.range(offset, length); - options.include(OrcInputFormat.genIncludedColumns(types, conf, true)); - OrcInputFormat.setSearchArgument(options, types, conf, true); + options.include(OrcInputFormat.genIncludedColumns(logicalSchema, conf)); + OrcInputFormat.setSearchArgument(options, logicalSchema, conf); this.reader = file.rowsOptions(options); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index e76c925..165017a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -846,7 +846,7 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { init.run(); int numAttemptedCompactions = 1; checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler)); - + hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS); AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService(); runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history @@ -868,7 +868,7 @@ public void testInitiatorWithMultipleFailedCompactions() throws Exception { hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED), hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,0,0, hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)), countCompacts(txnHandler)); - + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false); txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR)); //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated (explicitly by user) @@ -1139,6 +1139,31 @@ public void testOpenTxnsCounter() throws Exception { Assert.assertNull(exception); } + @Test + public void testCompactWithDelete() throws Exception { + int[][] tableData = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(); + AtomicBoolean looped = new AtomicBoolean(); + stop.set(true); + t.init(stop, looped); + t.run(); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4"); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = -2 where b = 2"); + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MINOR'"); + t.run(); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertEquals("Unexpected 1 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState()); + } + + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index 67c473e..3ba4653 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -134,7 +134,8 @@ public void testWriter() throws Exception { reader = OrcFile.createReader(bucketPath, new OrcFile.ReaderOptions(conf).filesystem(fs).maxLength(len)); assertEquals(5, reader.getNumberOfRows()); - RecordReader rows = reader.rows(); + Reader.Options readerOptions = new Reader.Options().lowLevelAccess(true); + RecordReader rows = reader.rowsOptions(readerOptions); // check the contents of the file assertEquals(true, rows.hasNext()); @@ -263,7 +264,8 @@ public void testUpdates() throws Exception { new OrcFile.ReaderOptions(conf).filesystem(fs)); assertEquals(2, reader.getNumberOfRows()); - RecordReader rows = reader.rows(); + Reader.Options readerOptions = new Reader.Options().lowLevelAccess(true); + RecordReader rows = reader.rowsOptions(readerOptions); // check the contents of the file assertEquals(true, rows.hasNext());