diff --git orc/src/java/org/apache/orc/impl/SchemaEvolution.java orc/src/java/org/apache/orc/impl/SchemaEvolution.java index 7379de9..fd4305b 100644 --- orc/src/java/org/apache/orc/impl/SchemaEvolution.java +++ orc/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -50,7 +50,7 @@ public SchemaEvolution(TypeDescription fileSchema, this.hasConversion = false; this.fileSchema = fileSchema; if (readerSchema != null) { - if (checkAcidSchema(fileSchema)) { + if (checkAcidSchema(fileSchema) && !checkAcidSchema(readerSchema)) { this.readerSchema = createEventSchema(readerSchema); } else { this.readerSchema = readerSchema; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 5366020..a5e0482 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -124,8 +124,10 @@ public WriterOptions inspector(ObjectInspector value) { * @return this */ public WriterOptions setSchema(TypeDescription schema) { - this.explicitSchema = true; - super.setSchema(schema); + if (schema != null) { + this.explicitSchema = true; + super.setSchema(schema); + } return this; } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index efde2db..4a1c3f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -28,6 +28,7 @@ import org.apache.orc.TypeDescription; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -388,9 +389,10 @@ private void discoverKeyBounds(Reader reader, * Convert from the row include/sarg/columnNames to the event equivalent * for the underlying file. * @param options options for the row reader + * @param rowSchema schema of the row, excluding ACID columns * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + static Reader.Options createEventOptions(Reader.Options options, TypeDescription rowSchema) { Reader.Options result = options.clone(); result.range(options.getOffset(), Long.MAX_VALUE); // slide the columns down by 6 for the include array @@ -415,6 +417,9 @@ private void discoverKeyBounds(Reader reader, } result.searchArgument(options.getSearchArgument(), cols); } + + result.schema(SchemaEvolution.createEventSchema(rowSchema)); + return result; } @@ -445,11 +450,11 @@ private void discoverKeyBounds(Reader reader, TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); - objectInspector = OrcRecordUpdater.createEventSchema + objectInspector = OrcRecordUpdater.createEventObjectInspector (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); // modify the options to reflect the event instead of the base row - Reader.Options eventOptions = createEventOptions(options); + Reader.Options eventOptions = createEventOptions(options, typeDescr); if (reader == null) { baseReader = null; } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 492c64c..916a9f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -23,28 +23,35 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Properties; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,7 +173,7 @@ public OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventObjectInspector(ObjectInspector rowInspector) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -184,6 +191,38 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { return new OrcStruct.OrcStructInspector(fields); } + private static TypeDescription createEventSchemaFromTableProperties(Properties tableProps) { + TypeDescription rowSchema = getTypeDescriptionFromTableProperties(tableProps); + if (rowSchema == null) { + return null; + } + + return SchemaEvolution.createEventSchema(rowSchema); + } + + private static TypeDescription getTypeDescriptionFromTableProperties(Properties tableProperties) { + TypeDescription schema = null; + if (tableProperties != null) { + final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); + final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); + if (!Strings.isNullOrEmpty(columnNameProperty) && !Strings.isNullOrEmpty(columnTypeProperty)) { + List columnNames = columnNameProperty.length() == 0 ? new ArrayList() : Arrays.asList(columnNameProperty.split(",")); + List columnTypes = columnTypeProperty.length() == 0 ? new ArrayList() : TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + + schema = TypeDescription.createStruct(); + for (int i = 0; i < columnNames.size(); i++) { + schema.addField(columnNames.get(i), OrcInputFormat.convertTypeInfo(columnTypes.get(i))); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("ORC schema = " + schema); + } + + return schema; + } + OrcRecordUpdater(Path path, AcidOutputFormat.Options options) throws IOException { this.options = options; @@ -260,8 +299,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { } writerOptions.fileSystem(fs).callback(indexBuilder); rowInspector = (StructObjectInspector)options.getInspector(); - writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), + writerOptions.inspector(createEventObjectInspector(findRecId(options.getInspector(), options.getRecordIdColumn()))); + writerOptions.setSchema(createEventSchemaFromTableProperties(options.getTableProperties())); this.writer = OrcFile.createWriter(this.path, writerOptions); if (this.acidOperationalProperties.isSplitUpdate()) { // If this is a split-update, we initialize a delete delta file path in anticipation that diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680..69742a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.BitSet; -import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; @@ -42,9 +41,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; @@ -96,7 +92,7 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, reporter.setStatus(orcSplit.toString()); Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, orcSplit); Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf); - readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); + readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions, readerOptions.getSchema()); this.offset = orcSplit.getStart(); this.length = orcSplit.getLength(); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 49ba667..8cd9460 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -57,6 +58,9 @@ import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1748,6 +1752,28 @@ public void testValuesSource() throws Exception { Assert.assertEquals(stringifyValues(rExpected), r); } + @Test + public void testAcidOrcWritePreservesFieldNames() throws Exception { + String tableName = "acidorcwritefieldnames"; + runStatementOnDriver("DROP TABLE IF EXISTS " + tableName); + runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING) CLUSTERED BY (a) INTO " + BUCKET_COUNT + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("INSERT INTO " + tableName + " VALUES (1, 'foo'), (2, 'bar')"); + + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tableName + "/" + AcidUtils.DELTA_PREFIX + "*/" + AcidUtils.BUCKET_PREFIX + "*")); + Assert.assertEquals(BUCKET_COUNT, fileStatuses.length); + + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(hiveConf); + for (FileStatus fileStatus : fileStatuses) { + Reader r = OrcFile.createReader(fileStatus.getPath(), readerOptions); + TypeDescription rowSchema = r.getSchema().getChildren().get(5); + List fieldNames = rowSchema.getFieldNames(); + Assert.assertEquals(2, fieldNames.size()); + Assert.assertEquals("a", fieldNames.get(0)); + Assert.assertEquals("b", fieldNames.get(1)); + } + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order