diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 342985e8b7..ac4d07fd0b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -17,14 +17,18 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; @@ -36,9 +40,11 @@ import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.ql.parse.WarehouseInstance; import org.junit.rules.TestName; import org.junit.After; @@ -59,6 +65,8 @@ import java.util.Collections; import java.util.Map; +import static org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner; +import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker; import static org.junit.Assert.assertTrue; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; @@ -70,7 +78,7 @@ @Rule public final TestName testName = new TestName(); - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosAcidTables.class); static WarehouseInstance primary; private static WarehouseInstance replica, replicaNonAcid; static HiveConf conf; @@ -323,6 +331,7 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { 2, TxnDbUtil.countQueryAgent(replicaConf, "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName + "' and cq_table = 't2'")); + txnHandler.abortTxns(new AbortTxnsRequest(txns)); } @Test @@ -761,4 +770,62 @@ public void testMultiDBTxn() throws Throwable { replica.run("drop database " + dbName1 + " cascade"); replica.run("drop database " + dbName2 + " cascade"); } + + private void runCompaction(String dbName, String tblName, CompactionType compactionType) throws Throwable { + HiveConf hiveConf = new HiveConf(primary.getConf()); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType)); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, false); + runWorker(hiveConf); + runCleaner(hiveConf); + } + + private FileStatus[] getDirsInTableLoc(WarehouseInstance wh, String db, String table) throws Throwable { + Path tblLoc = new Path(wh.getTable(db, table).getSd().getLocation()); + FileSystem fs = tblLoc.getFileSystem(wh.getConf()); + return fs.listStatus(tblLoc, EximUtil.getDirectoryFilter(fs)); + } + + @Test + public void testAcidTablesBootstrapWithCompaction() throws Throwable { + primary.run("use " + primaryDbName) + .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into t1 values(1)") + .run("insert into t1 values(2)"); + runCompaction(primaryDbName, "t1", CompactionType.MAJOR); + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"t1"}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from t1 order by id") + .verifyResults(new String[]{"1", "2"}); + + FileStatus[] dirsInLoadPath = getDirsInTableLoc(primary, primaryDbName, "t1"); + long writeId = -1; + for (FileStatus fileStatus : dirsInLoadPath) { + if (fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX)) { + writeId = AcidUtils.ParsedBase.parseBase(fileStatus.getPath()).getWriteId(); + assertTrue(AcidUtils.getVisibilityTxnId(fileStatus.getPath().getName()) != -1); + break; + } + } + //compaction is done so there should be a base directory. + assertTrue(writeId != -1); + + dirsInLoadPath = getDirsInTableLoc(replica, replicatedDbName, "t1"); + for (FileStatus fileStatus : dirsInLoadPath) { + if (fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX)) { + assertTrue(writeId == AcidUtils.ParsedBase.parseBase(fileStatus.getPath()).getWriteId()); + assertTrue(AcidUtils.getVisibilityTxnId(fileStatus.getPath().getName()) == -1); + writeId = -1; + break; + } + } + //make sure that it has done the verification. + assertTrue(writeId == -1); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index af8743d0f3..ec33ffbd44 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -313,6 +313,15 @@ static String addVisibilitySuffix(String baseOrDeltaDir, long visibilityTxnId) { return baseOrDeltaDir + VISIBILITY_PREFIX + String.format(DELTA_DIGITS, visibilityTxnId); } + + public static long getVisibilityTxnId(String filename) { + int idxOfv = filename.indexOf(VISIBILITY_PREFIX); + if(idxOfv < 0) { + return -1; + } + return Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())); + } + /** * Represents bucketId and copy_N suffix */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 73c863ed1a..6be8733fc2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -463,7 +464,29 @@ public static Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path String[] subDirs = fileInfo.getSubDir().split(Path.SEPARATOR); Path destination = destRoot; for (String subDir: subDirs) { - destination = new Path(destination, subDir); + // If the directory is created by compactor, then the directory will have the transaction id also. + // In case of replication, the same txn id can not be used at target, as the txn with same id might be a + // aborted or live txn at target. + // In case of bootstrap load, we copy only the committed data, so the directory with only write id + // can be created. The validity txn id can be removed from the directory name. + // TODO : Support for incremental load flow. This can be done once replication of compaction is decided. + if (AcidUtils.getVisibilityTxnId(subDir) > 0) { + if (subDir.startsWith(AcidUtils.BASE_PREFIX)) { + AcidUtils.ParsedBase pb = AcidUtils.ParsedBase.parseBase(new Path(subDir)); + destination = new Path(destination, AcidUtils.baseDir(pb.getWriteId())); + } else if (subDir.startsWith(AcidUtils.DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight pdl = AcidUtils.ParsedDeltaLight.parse(new Path(subDir)); + destination = new Path(destination, AcidUtils.deltaSubdir(pdl.getMinWriteId(), pdl.getMaxWriteId())); + } else if (subDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight pdl = AcidUtils.ParsedDeltaLight.parse(new Path(subDir)); + destination = new Path(destination, AcidUtils.deleteDeltaSubdir(pdl.getMinWriteId(), pdl.getMaxWriteId())); + } else { + LOG.error("Invalid directory prefix " + subDir); + assert (false); // should never happen + } + } else { + destination = new Path(destination, subDir); + } } return destination; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index c5faec5e95..ba253984ed 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -648,6 +648,16 @@ public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); } + @Test + public void testgetVisibilityTxnId() { + assertEquals(6, AcidUtils.getVisibilityTxnId("base_0000002_v0000006")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("base_0000102")); + assertEquals(60, AcidUtils.getVisibilityTxnId("delta_0000010_v0000060")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("delta_0000102")); + assertEquals(1000060, AcidUtils.getVisibilityTxnId("delete_delta_0000010_v1000060")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("delete_delta_0000102")); + } + /** * See {@link TestOrcRawRecordMerger#testGetLogicalLength()} */