diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index da2ca7276d..49cb302023 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; @@ -2056,7 +2057,13 @@ public void testErrorHandling() throws Exception { if(!deltaDir.getName().startsWith("delta")) { continue; } - File[] bucketFiles = deltaDir.listFiles(); + File[] bucketFiles = deltaDir.listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + String name = pathname.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); for (File bucketFile : bucketFiles) { if(bucketFile.toString().endsWith("length")) { continue; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 98bb938c13..adba60eaa3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -264,15 +264,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths) } FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf); } - // If we're updating or deleting there may be no file to close. This can happen - // because the where clause strained out all of the records for a given bucket. So - // before attempting the rename below, check if our file exists. If it doesn't, - // then skip the rename. If it does try it. We could just blindly try the rename - // and avoid the extra stat, but that would mask other errors. - Operation acidOp = conf.getWriteType(); - boolean needToRename = outPaths[idx] != null && ((acidOp != Operation.UPDATE - && acidOp != Operation.DELETE) || fs.exists(outPaths[idx])); - if (needToRename && outPaths[idx] != null) { + if(outPaths[idx] != null && fs.exists(outPaths[idx])) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")"); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 553e8bcf4e..d33d56699a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -18,17 +18,7 @@ package org.apache.hadoop.hive.ql.io; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Pattern; - +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,14 +29,15 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.shims.HadoopShims; @@ -59,7 +50,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD; @@ -1605,4 +1606,88 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE } } } + + /** + * Logic related to versioning acid data format. An {@code ACID_FORMAT} file is written to each + * base/delta/delete_delta dir written by a full acid write or compaction. This is the primary + * mechanism for versioning acid data. + * + * Each individual ORC file written stores the current version as a user property in ORC footer. + * All data files produced by Acid write should have this (starting with Hive 3.0), including + * those written by compactor. This is more for sanity checking in case someone moved the files + * around or something like that. + */ + public static final class OrcAcidVersion { + private static final String ACID_VERSION_KEY = "hive.acid.version"; + private static final String ACID_FORMAT = "_orc_acid_version"; + public static final int ORC_ACID_VERSION_DEFAULT = 0; + /** + * 2 is the version of Acid released in Hive 3.0 + */ + public static final int ORC_ACID_VERSION = 2; + /** + * inlucde current acid version in file footer + * @param writer - file written + */ + public static void setAcidVersionInDataFile(Writer writer) { + //so that we know which version wrote the file + ByteBuffer bf = ByteBuffer.allocate(4).putInt(ORC_ACID_VERSION); + bf.rewind();//don't ask - some ByteBuffer weridness. w/o this, empty buffer is written + writer.addUserMetadata(ACID_VERSION_KEY, bf); + } + /** + * This is smart enough to handle streaming ingest where there could be a + * {@link OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX} side file. + * @param dataFile - ORC acid data file + * @return version property from file if there, + * {@link #ORC_ACID_VERSION_DEFAULT} otherwise + */ + @VisibleForTesting + public static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException { + FileStatus fileStatus = fs.getFileStatus(dataFile); + Reader orcReader = OrcFile.createReader(dataFile, + OrcFile.readerOptions(fs.getConf()) + .filesystem(fs) + //make sure to check for side file in case streaming ingest died + .maxLength(getLogicalLength(fs, fileStatus))); + if(orcReader.hasMetadataValue(ACID_VERSION_KEY)) { + return orcReader.getMetadataValue(ACID_VERSION_KEY).getInt(); + } + return ORC_ACID_VERSION_DEFAULT; + } + /** + * This creates a version file in {@code deltaOrBaseDir} + * @param deltaOrBaseDir - where to create the version file + */ + public static void writeVersionFile(Path deltaOrBaseDir, FileSystem fs) throws IOException { + Path formatFile = getVersionFilePath(deltaOrBaseDir); + if(!fs.exists(formatFile)) { + try (FSDataOutputStream strm = fs.create(formatFile, false)) { + strm.writeInt(ORC_ACID_VERSION); + } catch (IOException ioe) { + LOG.error("Failed to create " + formatFile + " due to: " + ioe.getMessage(), ioe); + throw ioe; + } + } + } + public static Path getVersionFilePath(Path deltaOrBase) { + return new Path(deltaOrBase, ACID_FORMAT); + } + @VisibleForTesting + public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs) + throws IOException { + Path formatFile = getVersionFilePath(deltaOrBaseDir); + if(!fs.exists(formatFile)) { + LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT); + return ORC_ACID_VERSION_DEFAULT; + } + try (FSDataInputStream inputStream = fs.open(formatFile)) { + return inputStream.readInt(); + } + catch(IOException ex) { + LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex); + throw ex; + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index f1f638d980..d714bef486 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -40,19 +40,11 @@ import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; @@ -300,6 +292,7 @@ public RecordUpdater getRecordUpdater(Path path, opts.inspector(options.getInspector()) .callback(watcher); final Writer writer = OrcFile.createWriter(filename, opts); + AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer); return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { @Override public void write(Writable w) throws IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index b90ce6ed2f..097e89032b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -63,9 +62,6 @@ private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class); static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index"; - private static final String ACID_FORMAT = "_orc_acid_version"; - private static final int ORC_ACID_VERSION = 0; - final static int INSERT_OPERATION = 0; final static int UPDATE_OPERATION = 1; @@ -86,6 +82,7 @@ final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final CharsetDecoder utf8Decoder = UTF8.newDecoder(); private final AcidOutputFormat.Options options; private final AcidUtils.AcidOperationalProperties acidOperationalProperties; @@ -197,9 +194,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { return new OrcStruct.OrcStructInspector(fields); } /** - * @param path - partition root + * @param partitionRoot - partition root (or table root if not partitioned) */ - OrcRecordUpdater(Path path, + OrcRecordUpdater(Path partitionRoot, AcidOutputFormat.Options options) throws IOException { this.options = options; // Initialize acidOperationalProperties based on table properties, and @@ -227,27 +224,16 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { } } this.bucket.set(bucketCodec.encode(options)); - this.path = AcidUtils.createFilename(path, options); + this.path = AcidUtils.createFilename(partitionRoot, options); this.deleteEventWriter = null; this.deleteEventPath = null; FileSystem fs = options.getFilesystem(); if (fs == null) { - fs = path.getFileSystem(options.getConfiguration()); + fs = partitionRoot.getFileSystem(options.getConfiguration()); } this.fs = fs; - Path formatFile = new Path(path, ACID_FORMAT); - if(!fs.exists(formatFile)) { - try (FSDataOutputStream strm = fs.create(formatFile, false)) { - strm.writeInt(ORC_ACID_VERSION); - } catch (IOException ioe) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to create " + path + "/" + ACID_FORMAT + " with " + - ioe); - } - } - } if (options.getMinimumTransactionId() != options.getMaximumTransactionId() - && !options.isWritingBase()){ + && !options.isWritingBase()) { //throw if file already exists as that should never happen flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8, options.getReporter()); @@ -284,7 +270,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { // This writes to a file in directory which starts with "delete_delta_..." // The actual initialization of a writer only happens if any delete events are written //to avoid empty files. - this.deleteEventPath = AcidUtils.createFilename(path, deleteOptions); + this.deleteEventPath = AcidUtils.createFilename(partitionRoot, deleteOptions); /** * HIVE-14514 is not done so we can't clone writerOptions(). So here we create a new * options object to make sure insert and delete writers don't share them (like the @@ -321,7 +307,6 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { item.setFieldValue(BUCKET, bucket); item.setFieldValue(ROW_ID, rowId); } - @Override public String toString() { return getClass().getName() + "[" + path +"]"; @@ -382,9 +367,7 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId, item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); - if (writer == null) { - writer = OrcFile.createWriter(path, writerOptions); - } + initWriter(); writer.addRow(item); restoreBucket(currentBucket, operation); } @@ -418,7 +401,9 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro // Initialize an indexBuilder for deleteEvents. (HIVE-17284) deleteEventIndexBuilder = new KeyIndexBuilder("delete"); this.deleteEventWriter = OrcFile.createWriter(deleteEventPath, - deleteWriterOptions.callback(deleteEventIndexBuilder)); + deleteWriterOptions.callback(deleteEventIndexBuilder)); + AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(deleteEventWriter); + AcidUtils.OrcAcidVersion.writeVersionFile(this.deleteEventPath.getParent(), fs); } // A delete/update generates a delete event for the original row. @@ -484,9 +469,7 @@ public void flush() throws IOException { throw new IllegalStateException("Attempting to flush a RecordUpdater on " + path + " with a single transaction."); } - if (writer == null) { - writer = OrcFile.createWriter(path, writerOptions); - } + initWriter(); long len = writer.writeIntermediateFooter(); flushLengths.writeLong(len); OrcInputFormat.SHIMS.hflush(flushLengths); @@ -509,10 +492,8 @@ public void close(boolean abort) throws IOException { writer.close(); // normal close, when there are inserts. } } else { - if (writer == null) { - //so that we create empty bucket files when needed (but see HIVE-17138) - writer = OrcFile.createWriter(path, writerOptions); - } + //so that we create empty bucket files when needed (but see HIVE-17138) + initWriter(); writer.close(); // normal close. } if (deleteEventWriter != null) { @@ -533,6 +514,13 @@ public void close(boolean abort) throws IOException { deleteEventWriter = null; writerClosed = true; } + private void initWriter() throws IOException { + if (writer == null) { + writer = OrcFile.createWriter(path, writerOptions); + AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer); + AcidUtils.OrcAcidVersion.writeVersionFile(path.getParent(), fs); + } + } @Override public SerDeStats getStats() { @@ -543,9 +531,6 @@ public SerDeStats getStats() { return stats; } - private static final Charset utf8 = Charset.forName("UTF-8"); - private static final CharsetDecoder utf8Decoder = utf8.newDecoder(); - static RecordIdentifier[] parseKeyIndex(Reader reader) { String[] stripes; try { diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index a45cac60cb..9408bf1836 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3627,13 +3627,13 @@ produced by a (optimized) Union All query └── -ext-10000 ├── HIVE_UNION_SUBDIR_1 │   └── 000000_0 - │   ├── _orc_acid_version │   └── delta_0000019_0000019_0001 + │   ├── _orc_acid_version │   └── bucket_00000 ├── HIVE_UNION_SUBDIR_2 │   └── 000000_0 - │   ├── _orc_acid_version │   └── delta_0000019_0000019_0002 + │   ├── _orc_acid_version │   └── bucket_00000 The assumption is that we either have all data in subdirs or root of srcPath but not both. @@ -3692,7 +3692,10 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F try { if (!createdDeltaDirs.contains(deltaDest)) { try { - fs.mkdirs(deltaDest); + if(fs.mkdirs(deltaDest)) { + fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()), + AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest)); + } createdDeltaDirs.add(deltaDest); } catch (IOException swallowIt) { // Don't worry about this, as it likely just means it's already been created. diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 236e585dc8..4bc83aa6d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -935,6 +934,7 @@ public void commitJob(JobContext context) throws IOException { " not found. Assuming 0 splits. Creating " + newDeltaDir); fs.mkdirs(newDeltaDir); createCompactorMarker(conf, newDeltaDir, fs); + AcidUtils.OrcAcidVersion.writeVersionFile(newDeltaDir, fs); return; } FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list @@ -943,7 +943,12 @@ public void commitJob(JobContext context) throws IOException { for (FileStatus fileStatus : contents) { //newPath is the base/delta dir Path newPath = new Path(finalLocation, fileStatus.getPath().getName()); + /*rename(A, B) has "interesting" behavior if A and B are directories. If B doesn't exist, + * it does the expected operation and everything that was in A is now in B. If B exists, + * it will make A a child of B... thus make sure the rename() is done before creating the + * meta files which will create base_x/ (i.e. B)...*/ fs.rename(fileStatus.getPath(), newPath); + AcidUtils.OrcAcidVersion.writeVersionFile(newPath, fs); createCompactorMarker(conf, newPath, fs); } fs.delete(tmpLocation, true); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 2a1545f1da..4682e7f580 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,16 +17,20 @@ */ package org.apache.hadoop.hive.ql; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.LockType; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -834,4 +838,59 @@ public void testMoreBucketsThanReducers2() throws Exception { int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}}; Assert.assertEquals(stringifyValues(expected), r); } + @Test + public void testVersioning() throws Exception { + hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true"); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as orc"); + int[][] data = {{1,2}}; + //create 1 delta file bucket_00000 + runStatementOnDriver("insert into T" + makeValuesClause(data)); + + //delete the bucket files so now we have empty delta dirs + List rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); + FileSystem fs = FileSystem.get(hiveConf); + Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX)); + Path filePath = new Path(rs.get(0)); + int version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs); + //check it has expected version marker + Assert.assertEquals("Unexpected version marker in " + filePath, + AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version); + + //check that delta dir has a version file with expected value + filePath = filePath.getParent(); + Assert.assertTrue(filePath.getName().startsWith(AcidUtils.DELTA_PREFIX)); + int versionFromMetaFile = AcidUtils.OrcAcidVersion + .getAcidVersionFromMetaFile(filePath, fs); + Assert.assertEquals("Unexpected version marker in " + filePath, + AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile); + + runStatementOnDriver("insert into T" + makeValuesClause(data)); + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + + //check status of compaction job + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize()); + Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState()); + Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local")); + + rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T"); + Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX)); + + filePath = new Path(rs.get(0)); + version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs); + //check that files produced by compaction still have the version marker + Assert.assertEquals("Unexpected version marker in " + filePath, + AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version); + + //check that compacted base dir has a version file with expected value + filePath = filePath.getParent(); + Assert.assertTrue(filePath.getName().startsWith(AcidUtils.BASE_PREFIX)); + versionFromMetaFile = AcidUtils.OrcAcidVersion.getAcidVersionFromMetaFile( + filePath, fs); + Assert.assertEquals("Unexpected version marker in " + filePath, + AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 8a6a056326..37c15a8770 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2473,14 +2473,14 @@ public void testCombinationInputFormatWithAcid() throws Exception { assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(648, split.getLength()); + assertEquals(663, split.getLength()); split = (HiveInputFormat.HiveInputSplit) splits[1]; assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", split.inputFormatClassName()); assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001", split.getPath().toString()); assertEquals(0, split.getStart()); - assertEquals(674, split.getLength()); + assertEquals(690, split.getLength()); CombineHiveInputFormat.CombineHiveInputSplit combineSplit = (CombineHiveInputFormat.CombineHiveInputSplit) splits[2]; assertEquals(BUCKETS, combineSplit.getNumPaths()); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 063812610e..ce5e526832 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -307,7 +307,7 @@ public void minorTableWithBase() throws Exception { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -316,7 +316,7 @@ public void minorTableWithBase() throws Exception { } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -439,7 +439,7 @@ public void minorPartitionWithBase() throws Exception { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -448,7 +448,7 @@ public void minorPartitionWithBase() throws Exception { } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -491,7 +491,7 @@ public void minorTableNoBase() throws Exception { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(1, 4))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -500,7 +500,7 @@ public void minorTableNoBase() throws Exception { } if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); @@ -843,7 +843,7 @@ public void minorTableLegacy() throws Exception { for (int i = 0; i < stat.length; i++) { if (stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21, 24))) { sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + FileStatus[] buckets = fs.listStatus(stat[i].getPath(), AcidUtils.hiddenFileFilter); Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));