diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 68e88cf..56d590c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -125,8 +125,10 @@ public WriterOptions inspector(ObjectInspector value) { * @return this */ public WriterOptions setSchema(TypeDescription schema) { - this.explicitSchema = true; - super.setSchema(schema); + if (schema != null) { + this.explicitSchema = true; + super.setSchema(schema); + } return this; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 9d954ca..aa68024 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -32,6 +32,7 @@ import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -771,9 +772,10 @@ private KeyInterval discoverKeyBounds(Reader reader, * Convert from the row include/sarg/columnNames to the event equivalent * for the underlying file. * @param options options for the row reader + * @param rowSchema schema of the row, excluding ACID columns * @return a cloned options object that is modified for the event reader */ - static Reader.Options createEventOptions(Reader.Options options) { + static Reader.Options createEventOptions(Reader.Options options, TypeDescription rowSchema) { Reader.Options result = options.clone(); result.include(options.getInclude()); @@ -786,6 +788,10 @@ private KeyInterval discoverKeyBounds(Reader reader, } result.searchArgument(options.getSearchArgument(), cols); } + + // schema evolution will insert the acid columns to row schema for ACID read + result.schema(rowSchema); + return result; } @@ -968,12 +974,12 @@ public Options clone() { TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); - objectInspector = OrcRecordUpdater.createEventSchema + objectInspector = OrcRecordUpdater.createEventObjectInspector (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); assert !(mergerOptions.isCompacting() && reader != null) : "don't need a reader for compaction"; // modify the options to reflect the event instead of the base row - Reader.Options eventOptions = createEventOptions(options); + Reader.Options eventOptions = createEventOptions(options, typeDescr); //suppose it's the first Major compaction so we only have deltas boolean isMajorNoBase = mergerOptions.isCompacting() && mergerOptions.isMajorCompaction() && mergerOptions.getBaseDir() == null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1359dc3..5590470 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -23,7 +23,9 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -33,6 +35,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -42,15 +45,21 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.AcidStats; import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.SchemaEvolution; import org.apache.orc.impl.WriterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Strings; + /** * A RecordUpdater where the files are stored as ORC. * A note on various record structures: the {@code row} coming in (as in {@link #insert(long, Object)} @@ -179,7 +188,7 @@ OrcOptions orcOptions(OrcFile.WriterOptions opts) { * @param rowInspector the row's object inspector * @return an object inspector for the event stream */ - static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { + static StructObjectInspector createEventObjectInspector(ObjectInspector rowInspector) { List fields = new ArrayList(); fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); @@ -194,6 +203,41 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { fields.add(new OrcStruct.Field("row", rowInspector, ROW)); return new OrcStruct.OrcStructInspector(fields); } + + private static TypeDescription createEventSchemaFromTableProperties(Properties tableProps) { + TypeDescription rowSchema = getTypeDescriptionFromTableProperties(tableProps); + if (rowSchema == null) { + return null; + } + + return SchemaEvolution.createEventSchema(rowSchema); + } + + private static TypeDescription getTypeDescriptionFromTableProperties(Properties tableProperties) { + TypeDescription schema = null; + if (tableProperties != null) { + final String columnNameProperty = tableProperties.getProperty(IOConstants.COLUMNS); + final String columnTypeProperty = tableProperties.getProperty(IOConstants.COLUMNS_TYPES); + if (!Strings.isNullOrEmpty(columnNameProperty) && !Strings.isNullOrEmpty(columnTypeProperty)) { + List columnNames = + columnNameProperty.length() == 0 ? new ArrayList() : Arrays.asList(columnNameProperty.split(",")); + List columnTypes = columnTypeProperty.length() == 0 ? new ArrayList() : TypeInfoUtils + .getTypeInfosFromTypeString(columnTypeProperty); + + schema = TypeDescription.createStruct(); + for (int i = 0; i < columnNames.size(); i++) { + schema.addField(columnNames.get(i), OrcInputFormat.convertTypeInfo(columnTypes.get(i))); + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("ORC schema = " + schema); + } + + return schema; + } + /** * @param partitionRoot - partition root (or table root if not partitioned) */ @@ -282,8 +326,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { */ this.deleteWriterOptions = OrcFile.writerOptions(optionsCloneForDelta.getTableProperties(), optionsCloneForDelta.getConfiguration()); - this.deleteWriterOptions.inspector(createEventSchema(findRecId(options.getInspector(), + this.deleteWriterOptions.inspector(createEventObjectInspector(findRecId(options.getInspector(), options.getRecordIdColumn()))); + this.deleteWriterOptions.setSchema(createEventSchemaFromTableProperties(options.getTableProperties())); } // get buffer size and stripe size for base writer @@ -305,8 +350,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { } writerOptions.fileSystem(fs).callback(indexBuilder); rowInspector = (StructObjectInspector)options.getInspector(); - writerOptions.inspector(createEventSchema(findRecId(options.getInspector(), + writerOptions.inspector(createEventObjectInspector(findRecId(options.getInspector(), options.getRecordIdColumn()))); + writerOptions.setSchema(createEventSchemaFromTableProperties(options.getTableProperties())); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); item.setFieldValue(CURRENT_WRITEID, currentWriteId); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 66ffcae..551a4f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -176,7 +176,7 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte } reporter.setStatus(orcSplit.toString()); - readerOptions = OrcRawRecordMerger.createEventOptions(OrcInputFormat.createOptionsForReader(conf)); + readerOptions = OrcInputFormat.createOptionsForReader(conf); this.offset = orcSplit.getStart(); this.length = orcSplit.getLength(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 7c201b6..3e98a09 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; @@ -60,6 +61,9 @@ import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -2168,6 +2172,62 @@ private void verifyDirAndResult(int expectedDeltas) throws Exception { Assert.assertEquals(stringifyValues(resultData), rs); } + @Test + public void testAcidOrcWritePreservesFieldNames() throws Exception { + // with vectorization + String tableName = "acidorcwritefieldnames"; + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + runStatementOnDriver("DROP TABLE IF EXISTS " + tableName); + runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING) CLUSTERED BY (a) INTO " + BUCKET_COUNT + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("INSERT INTO " + tableName + " VALUES (1, 'foo'), (2, 'bar')"); + + tableName = "acidorcwritefieldnames_complex"; + runStatementOnDriver("DROP TABLE IF EXISTS " + tableName); + runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING, s STRUCT>) CLUSTERED BY (a) INTO " + BUCKET_COUNT + + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("INSERT INTO " + tableName + " select a, b, named_struct('c',10,'si'," + + "named_struct('d',cast(1.0 as double),'e',cast(2.0 as float))) from acidorcwritefieldnames"); + + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tableName + "/" + AcidUtils.DELTA_PREFIX + "*/" + AcidUtils.BUCKET_PREFIX + "*")); + Assert.assertEquals(BUCKET_COUNT, fileStatuses.length); + + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(hiveConf); + for (FileStatus fileStatus : fileStatuses) { + Reader r = OrcFile.createReader(fileStatus.getPath(), readerOptions); + TypeDescription rowSchema = r.getSchema().getChildren().get(5); + Assert.assertEquals("struct>>", rowSchema.toString()); + } + + // without vectorization + tableName = "acidorcwritefieldnames"; + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false); + runStatementOnDriver("DROP TABLE IF EXISTS " + tableName); + runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING) CLUSTERED BY (a) INTO " + BUCKET_COUNT + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("INSERT INTO " + tableName + " VALUES (1, 'foo'), (2, 'bar')"); + + tableName = "acidorcwritefieldnames_complex"; + runStatementOnDriver("DROP TABLE IF EXISTS " + tableName); + runStatementOnDriver("CREATE TABLE " + tableName + " (a INT, b STRING, s STRUCT>) CLUSTERED BY (a) INTO " + BUCKET_COUNT + + " BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("INSERT INTO " + tableName + " select a, b, named_struct('c',10,'si'," + + "named_struct('d',cast(1.0 as double),'e',cast(2.0 as float))) from acidorcwritefieldnames"); + + fs = FileSystem.get(hiveConf); + fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + "/" + tableName + "/" + AcidUtils.DELTA_PREFIX + "*/" + AcidUtils.BUCKET_PREFIX + "*")); + Assert.assertEquals(BUCKET_COUNT, fileStatuses.length); + + readerOptions = OrcFile.readerOptions(hiveConf); + for (FileStatus fileStatus : fileStatuses) { + Reader r = OrcFile.createReader(fileStatus.getPath(), readerOptions); + TypeDescription rowSchema = r.getSchema().getChildren().get(5); + Assert.assertEquals("struct>>", rowSchema.toString()); + } + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order