diff --git pom.xml pom.xml
index e220891fdb..317587fe70 100644
--- pom.xml
+++ pom.xml
@@ -1035,6 +1035,9 @@
org.apache.maven.plugins
maven-surefire-plugin
${maven.surefire.plugin.version}
+
+ -Xmx1024m -XX:MaxPermSize=256m
+
org.apache.maven.plugins
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 98bb938c13..adba60eaa3 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -264,15 +264,7 @@ private void commitOneOutPath(int idx, FileSystem fs, List commitPaths)
}
FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf);
}
- // If we're updating or deleting there may be no file to close. This can happen
- // because the where clause strained out all of the records for a given bucket. So
- // before attempting the rename below, check if our file exists. If it doesn't,
- // then skip the rename. If it does try it. We could just blindly try the rename
- // and avoid the extra stat, but that would mask other errors.
- Operation acidOp = conf.getWriteType();
- boolean needToRename = outPaths[idx] != null && ((acidOp != Operation.UPDATE
- && acidOp != Operation.DELETE) || fs.exists(outPaths[idx]));
- if (needToRename && outPaths[idx] != null) {
+ if(outPaths[idx] != null && fs.exists(outPaths[idx])) {
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("committing " + outPaths[idx] + " to "
+ finalPaths[idx] + " (" + isMmTable + ")");
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 553e8bcf4e..5731d791a4 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -18,17 +18,7 @@
package org.apache.hadoop.hive.ql.io;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,14 +29,15 @@
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
import org.apache.hadoop.hive.shims.HadoopShims;
@@ -59,7 +50,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
@@ -1605,4 +1606,88 @@ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOE
}
}
}
+
+ /**
+ * Logic related to versioning acid data format. An {@code ACID_FORMAT} file is written to each
+ * base/delta/delete_delta dir written by a full acid write or compaction. This is the primary
+ * mechanism for versioning acid data.
+ *
+ * Each individual ORC file written stores the current version as a user property in ORC footer.
+ * All data files produced by Acid write should have this (starting with Hive 3.0), including
+ * those written by compactor. This is more for sanity checking in case someone moved the files
+ * around or something like that.
+ */
+ public static final class OrcAcidVersion {
+ private static final String ACID_VERSION_KEY = "hive.acid.version";
+ private static final String ACID_FORMAT = "_orc_acid_version";
+ public static final int ORC_ACID_VERSION_DEFAULT = 0;
+ /**
+ * 2 is the version of Acid released in Hive 3.0
+ */
+ public static final int ORC_ACID_VERSION = 2;
+ /**
+ * inlucde current acid version in file footer
+ * @param writer - file written
+ */
+ public static void setAcidVersionInDataFile(Writer writer) {
+ //so that we know which version wrote the file
+ ByteBuffer bf = ByteBuffer.allocate(4).putInt(ORC_ACID_VERSION);
+ bf.rewind();//don't ask - some ByteBuffer weridness. w/o this, empty buffer is written
+ writer.addUserMetadata(ACID_VERSION_KEY, bf);
+ }
+ /**
+ * This is smart enough to handle streaming ingest where there could be a
+ * {@link OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX} side file.
+ * @param dataFile - ORC acid data file
+ * @return version property from file if there,
+ * {@link #ORC_ACID_VERSION_DEFAULT} otherwise
+ */
+ public static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException {
+ FileStatus fileStatus = fs.getFileStatus(dataFile);
+ Reader orcReader = OrcFile.createReader(dataFile,
+ OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs)
+ //make sure to check for side file in case streaming ingest died
+ .maxLength(getLogicalLength(fs, fileStatus)));
+ if(orcReader.hasMetadataValue(ACID_VERSION_KEY)) {
+ return orcReader.getMetadataValue(ACID_VERSION_KEY).getInt();
+ }
+ return ORC_ACID_VERSION_DEFAULT;
+ }
+ /**
+ * This creates a version file in {@code deltaOrBaseDir}
+ * @param deltaOrBaseDir - where to create the version file
+ */
+ public static void writeVerionFile(Path deltaOrBaseDir, FileSystem fs) throws IOException {
+ Path formatFile = getVersionFilePath(deltaOrBaseDir);
+ if(!fs.exists(formatFile)) {
+ try (FSDataOutputStream strm = fs.create(formatFile, false)) {
+ strm.writeInt(ORC_ACID_VERSION);
+ } catch (IOException ioe) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to create " + formatFile + " with " + ioe);
+ }
+ }
+ }
+ }
+ public static Path getVersionFilePath(Path deltaOrBase) {
+ return new Path(deltaOrBase, ACID_FORMAT);
+ }
+ public static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs)
+ throws IOException {
+ Path formatFile = getVersionFilePath(deltaOrBaseDir);
+ if(!fs.exists(formatFile)) {
+ LOG.info(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT);
+ return ORC_ACID_VERSION_DEFAULT;
+ }
+ try (FSDataInputStream inputStream = fs.open(formatFile)) {
+ return inputStream.readInt();
+ }
+ catch(IOException ex) {
+ LOG.error(formatFile + " is unreadable, returning default: " +
+ ORC_ACID_VERSION_DEFAULT, ex);
+ return ORC_ACID_VERSION_DEFAULT;
+ }
+ }
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index f1f638d980..d714bef486 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -40,19 +40,11 @@
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
@@ -300,6 +292,7 @@ public RecordUpdater getRecordUpdater(Path path,
opts.inspector(options.getInspector())
.callback(watcher);
final Writer writer = OrcFile.createWriter(filename, opts);
+ AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer);
return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
@Override
public void write(Writable w) throws IOException {
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
index b90ce6ed2f..5b8c07bf41 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
@@ -35,7 +35,6 @@
import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -63,9 +62,6 @@
private static final Logger LOG = LoggerFactory.getLogger(OrcRecordUpdater.class);
static final String ACID_KEY_INDEX_NAME = "hive.acid.key.index";
- private static final String ACID_FORMAT = "_orc_acid_version";
- private static final int ORC_ACID_VERSION = 0;
-
final static int INSERT_OPERATION = 0;
final static int UPDATE_OPERATION = 1;
@@ -86,6 +82,7 @@
final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024;
private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final CharsetDecoder utf8Decoder = UTF8.newDecoder();
private final AcidOutputFormat.Options options;
private final AcidUtils.AcidOperationalProperties acidOperationalProperties;
@@ -197,9 +194,9 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) {
return new OrcStruct.OrcStructInspector(fields);
}
/**
- * @param path - partition root
+ * @param partitionRoot - partition root (or table root if not partitioned)
*/
- OrcRecordUpdater(Path path,
+ OrcRecordUpdater(Path partitionRoot,
AcidOutputFormat.Options options) throws IOException {
this.options = options;
// Initialize acidOperationalProperties based on table properties, and
@@ -227,27 +224,16 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) {
}
}
this.bucket.set(bucketCodec.encode(options));
- this.path = AcidUtils.createFilename(path, options);
+ this.path = AcidUtils.createFilename(partitionRoot, options);
this.deleteEventWriter = null;
this.deleteEventPath = null;
FileSystem fs = options.getFilesystem();
if (fs == null) {
- fs = path.getFileSystem(options.getConfiguration());
+ fs = partitionRoot.getFileSystem(options.getConfiguration());
}
this.fs = fs;
- Path formatFile = new Path(path, ACID_FORMAT);
- if(!fs.exists(formatFile)) {
- try (FSDataOutputStream strm = fs.create(formatFile, false)) {
- strm.writeInt(ORC_ACID_VERSION);
- } catch (IOException ioe) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to create " + path + "/" + ACID_FORMAT + " with " +
- ioe);
- }
- }
- }
if (options.getMinimumTransactionId() != options.getMaximumTransactionId()
- && !options.isWritingBase()){
+ && !options.isWritingBase()) {
//throw if file already exists as that should never happen
flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8,
options.getReporter());
@@ -284,7 +270,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) {
// This writes to a file in directory which starts with "delete_delta_..."
// The actual initialization of a writer only happens if any delete events are written
//to avoid empty files.
- this.deleteEventPath = AcidUtils.createFilename(path, deleteOptions);
+ this.deleteEventPath = AcidUtils.createFilename(partitionRoot, deleteOptions);
/**
* HIVE-14514 is not done so we can't clone writerOptions(). So here we create a new
* options object to make sure insert and delete writers don't share them (like the
@@ -321,7 +307,6 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) {
item.setFieldValue(BUCKET, bucket);
item.setFieldValue(ROW_ID, rowId);
}
-
@Override
public String toString() {
return getClass().getName() + "[" + path +"]";
@@ -382,9 +367,7 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId,
item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation));
item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
- if (writer == null) {
- writer = OrcFile.createWriter(path, writerOptions);
- }
+ initWriter();
writer.addRow(item);
restoreBucket(currentBucket, operation);
}
@@ -418,7 +401,9 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro
// Initialize an indexBuilder for deleteEvents. (HIVE-17284)
deleteEventIndexBuilder = new KeyIndexBuilder("delete");
this.deleteEventWriter = OrcFile.createWriter(deleteEventPath,
- deleteWriterOptions.callback(deleteEventIndexBuilder));
+ deleteWriterOptions.callback(deleteEventIndexBuilder));
+ AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(deleteEventWriter);
+ AcidUtils.OrcAcidVersion.writeVerionFile(this.deleteEventPath.getParent(), fs);
}
// A delete/update generates a delete event for the original row.
@@ -484,9 +469,7 @@ public void flush() throws IOException {
throw new IllegalStateException("Attempting to flush a RecordUpdater on "
+ path + " with a single transaction.");
}
- if (writer == null) {
- writer = OrcFile.createWriter(path, writerOptions);
- }
+ initWriter();
long len = writer.writeIntermediateFooter();
flushLengths.writeLong(len);
OrcInputFormat.SHIMS.hflush(flushLengths);
@@ -509,10 +492,8 @@ public void close(boolean abort) throws IOException {
writer.close(); // normal close, when there are inserts.
}
} else {
- if (writer == null) {
- //so that we create empty bucket files when needed (but see HIVE-17138)
- writer = OrcFile.createWriter(path, writerOptions);
- }
+ //so that we create empty bucket files when needed (but see HIVE-17138)
+ initWriter();
writer.close(); // normal close.
}
if (deleteEventWriter != null) {
@@ -533,6 +514,13 @@ public void close(boolean abort) throws IOException {
deleteEventWriter = null;
writerClosed = true;
}
+ private void initWriter() throws IOException {
+ if (writer == null) {
+ writer = OrcFile.createWriter(path, writerOptions);
+ AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer);
+ AcidUtils.OrcAcidVersion.writeVerionFile(path.getParent(), fs);
+ }
+ }
@Override
public SerDeStats getStats() {
@@ -543,9 +531,6 @@ public SerDeStats getStats() {
return stats;
}
- private static final Charset utf8 = Charset.forName("UTF-8");
- private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
-
static RecordIdentifier[] parseKeyIndex(Reader reader) {
String[] stripes;
try {
diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 2152f00028..7d882ed070 100644
--- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3895,13 +3895,13 @@ produced by a (optimized) Union All query
└── -ext-10000
├── HIVE_UNION_SUBDIR_1
│ └── 000000_0
- │ ├── _orc_acid_version
│ └── delta_0000019_0000019_0001
+ │ ├── _orc_acid_version
│ └── bucket_00000
├── HIVE_UNION_SUBDIR_2
│ └── 000000_0
- │ ├── _orc_acid_version
│ └── delta_0000019_0000019_0002
+ │ ├── _orc_acid_version
│ └── bucket_00000
The assumption is that we either have all data in subdirs or root of srcPath
but not both.
@@ -3960,7 +3960,10 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F
try {
if (!createdDeltaDirs.contains(deltaDest)) {
try {
- fs.mkdirs(deltaDest);
+ if(fs.mkdirs(deltaDest)) {
+ fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
+ AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
+ }
createdDeltaDirs.add(deltaDest);
} catch (IOException swallowIt) {
// Don't worry about this, as it likely just means it's already been created.
diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 236e585dc8..81f0b3b79b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -48,7 +48,6 @@
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -935,6 +934,7 @@ public void commitJob(JobContext context) throws IOException {
" not found. Assuming 0 splits. Creating " + newDeltaDir);
fs.mkdirs(newDeltaDir);
createCompactorMarker(conf, newDeltaDir, fs);
+ AcidUtils.OrcAcidVersion.writeVerionFile(newDeltaDir, fs);
return;
}
FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
@@ -943,7 +943,12 @@ public void commitJob(JobContext context) throws IOException {
for (FileStatus fileStatus : contents) {
//newPath is the base/delta dir
Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
+ /*rename(A, B) has "interesting" behavior if A and B are directories. If B doesn't exist,
+ * it does the expected operation and everything that was in A is now in B. If B exists,
+ * it will make A a child of B... thus make sure the rename() is done before creating the
+ * meta files which will create base_x/ (i.e. B)...*/
fs.rename(fileStatus.getPath(), newPath);
+ AcidUtils.OrcAcidVersion.writeVerionFile(newPath, fs);
createCompactorMarker(conf, newPath, fs);
}
fs.delete(tmpLocation, true);
diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 2a1545f1da..4682e7f580 100644
--- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -17,16 +17,20 @@
*/
package org.apache.hadoop.hive.ql;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -834,4 +838,59 @@ public void testMoreBucketsThanReducers2() throws Exception {
int[][] expected = {{0, -1},{0, -1}, {1, -1}, {1, -1}, {2, -1}, {2, -1}, {3, -1}, {3, -1}};
Assert.assertEquals(stringifyValues(expected), r);
}
+ @Test
+ public void testVersioning() throws Exception {
+ hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
+ runStatementOnDriver("drop table if exists T");
+ runStatementOnDriver("create table T (a int, b int) stored as orc");
+ int[][] data = {{1,2}};
+ //create 1 delta file bucket_00000
+ runStatementOnDriver("insert into T" + makeValuesClause(data));
+
+ //delete the bucket files so now we have empty delta dirs
+ List rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
+ FileSystem fs = FileSystem.get(hiveConf);
+ Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX));
+ Path filePath = new Path(rs.get(0));
+ int version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs);
+ //check it has expected version marker
+ Assert.assertEquals("Unexpected version marker in " + filePath,
+ AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
+
+ //check that delta dir has a version file with expected value
+ filePath = filePath.getParent();
+ Assert.assertTrue(filePath.getName().startsWith(AcidUtils.DELTA_PREFIX));
+ int versionFromMetaFile = AcidUtils.OrcAcidVersion
+ .getAcidVersionFromMetaFile(filePath, fs);
+ Assert.assertEquals("Unexpected version marker in " + filePath,
+ AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
+
+ runStatementOnDriver("insert into T" + makeValuesClause(data));
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+
+ //check status of compaction job
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
+ Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
+ Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+
+ rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
+ Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX));
+
+ filePath = new Path(rs.get(0));
+ version = AcidUtils.OrcAcidVersion.getAcidVersionFromDataFile(filePath, fs);
+ //check that files produced by compaction still have the version marker
+ Assert.assertEquals("Unexpected version marker in " + filePath,
+ AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
+
+ //check that compacted base dir has a version file with expected value
+ filePath = filePath.getParent();
+ Assert.assertTrue(filePath.getName().startsWith(AcidUtils.BASE_PREFIX));
+ versionFromMetaFile = AcidUtils.OrcAcidVersion.getAcidVersionFromMetaFile(
+ filePath, fs);
+ Assert.assertEquals("Unexpected version marker in " + filePath,
+ AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
+ }
}