diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index f1f638d980..d61e111ee2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -300,6 +300,7 @@ public RecordUpdater getRecordUpdater(Path path, opts.inspector(options.getInspector()) .callback(watcher); final Writer writer = OrcFile.createWriter(filename, opts); + OrcRecordUpdater.setAcidVersion(writer); return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { @Override public void write(Writable w) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index b90ce6ed2f..2702318d61 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -35,7 +36,6 @@ import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -63,6 +63,15 @@ private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class); static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; + private static final String ACID_VERSION_KEY = "hive.acid.version"; + /** + * 2.0 is the version of Acid released in Hive 3.0 + */ + public static final String ACID_VERSION_CURRENT = "2.0"; + /** + * anything before 2.0 + */ + public static final String ACID_VERSION_DEFAULT = "0.0"; private static final String ACID_FORMAT = "_orc_acid_version"; private static final int ORC_ACID_VERSION = 0; @@ -86,6 +95,7 @@ final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final CharsetDecoder utf8Decoder = UTF8.newDecoder(); private final AcidOutputFormat.Options options; private final AcidUtils.AcidOperationalProperties acidOperationalProperties; @@ -382,9 +392,7 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId, item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); - if (writer == null) { - writer = OrcFile.createWriter(path, writerOptions); - } + initWriter(); writer.addRow(item); restoreBucket(currentBucket, operation); } @@ -484,9 +492,7 @@ public void flush() throws IOException { throw new IllegalStateException("Attempting to flush a RecordUpdater on " + path + " with a single transaction."); } - if (writer == null) { - writer = OrcFile.createWriter(path, writerOptions); - } + initWriter(); long len = writer.writeIntermediateFooter(); flushLengths.writeLong(len); OrcInputFormat.SHIMS.hflush(flushLengths); @@ -509,10 +515,8 @@ public void close(boolean abort) throws IOException { writer.close(); // normal close, when there are inserts. } } else { - if (writer == null) { - //so that we create empty bucket files when needed (but see HIVE-17138) - writer = OrcFile.createWriter(path, writerOptions); - } + //so that we create empty bucket files when needed (but see HIVE-17138) + initWriter(); writer.close(); // normal close. } if (deleteEventWriter != null) { @@ -533,6 +537,42 @@ public void close(boolean abort) throws IOException { deleteEventWriter = null; writerClosed = true; } + private void initWriter() throws IOException { + if (writer == null) { + writer = OrcFile.createWriter(path, writerOptions); + setAcidVersion(writer); + } + } + + /** + * All data files produced by Acid write should have this (starting with Hive 3.0), including + * those written by compactor + * @param writer - file written + */ + static void setAcidVersion(Writer writer) { + //so that we know which version wrote the file + writer.addUserMetadata(ACID_VERSION_KEY, UTF8.encode(ACID_VERSION_CURRENT)); + } + + /** + * This is smart enough to handle streaming ingest where there could be a + * {@link OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX} side file. + * @param dataFile - ORC acid data file + * @return version property from file if there, + * {@link OrcRecordUpdater#ACID_VERSION_DEFAULT} otherwise + */ + public static String getAcidVersion(Path dataFile, FileSystem fs) throws IOException { + FileStatus fileStatus = fs.getFileStatus(dataFile); + Reader orcReader = OrcFile.createReader(dataFile, + OrcFile.readerOptions(fs.getConf()) + .filesystem(fs) + //make sure to check for side file in case streaming ingest died + .maxLength(AcidUtils.getLogicalLength(fs, fileStatus))); + if(orcReader.hasMetadataValue(ACID_VERSION_KEY)) { + return utf8Decoder.decode(orcReader.getMetadataValue(ACID_VERSION_KEY)).toString(); + } + return ACID_VERSION_DEFAULT; + } @Override public SerDeStats getStats() { @@ -543,9 +583,6 @@ public SerDeStats getStats() { return stats; } - private static final Charset utf8 = Charset.forName("UTF-8"); - private static final CharsetDecoder utf8Decoder = utf8.newDecoder(); - static RecordIdentifier[] parseKeyIndex(Reader reader) { String[] stripes; try { diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 2a1545f1da..8770d84b6d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,16 +17,12 @@ */ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; -import org.apache.hadoop.hive.metastore.api.LockState; -import org.apache.hadoop.hive.metastore.api.LockType; -import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; -import org.apache.hadoop.hive.metastore.api.TxnInfo; -import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -34,6 +30,7 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -834,4 +831,40 @@ public void testMoreBucketsThanReducers2() throws Exception { int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; Assert.assertEquals(stringifyValues(expected), r); } + @Test + public void testVersioning() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + int[][] data = {{1,2}}; + //create 1 delta file bucket_00000 + runStatementOnDriver("insert into T" + makeValuesClause(data)); + + //delete the bucket files so now we have empty delta dirs + List rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); + FileSystem fs = FileSystem.get(hiveConf); + Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX)); + String version = OrcRecordUpdater.getAcidVersion(new Path(rs.get(0)), fs); + //check it has expected version marker + Assert.assertEquals("Unexpected version marker in " + rs.get(0), + OrcRecordUpdater.ACID_VERSION_CURRENT, version); + + runStatementOnDriver("insert into T" + makeValuesClause(data)); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + //check status of compaction job + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); + Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX)); + version = OrcRecordUpdater.getAcidVersion(new Path(rs.get(0)), fs); + //check that files produced by compaction still have the version marker + Assert.assertEquals("Unexpected version marker in " + rs.get(0), + OrcRecordUpdater.ACID_VERSION_CURRENT, version); + } }