diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java index 8593480724..8a4bb38e97 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java @@ -376,8 +376,8 @@ void verifyCompactionQueue(Map tables, String dbName, HiveConf con Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), entry.getValue().longValue(), TxnDbUtil.countQueryAgent(conf, - "select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName - + "' and cq_table = '" + entry.getKey() + "'")); + "select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName.toLowerCase() + + "' and cq_table = '" + entry.getKey().toLowerCase() + "'")); } } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 63b32c83db..5ea393272a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -17,11 +17,15 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -33,8 +37,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.hive.ql.parse.WarehouseInstance; import org.junit.After; import org.junit.Assert; @@ -50,11 +57,15 @@ import java.util.Collections; import java.util.Map; +import static org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner; +import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.junit.Assert.assertTrue; /** * TestReplicationScenariosAcidTables - test replication for ACID tables. */ + public class TestReplicationScenariosAcidTables extends BaseReplicationScenariosAcidTables { @BeforeClass @@ -670,4 +681,63 @@ public void testMultiDBTxn() throws Throwable { replica.run("drop database " + dbName1 + " cascade"); replica.run("drop database " + dbName2 + " cascade"); } + + private void runCompaction(String dbName, String tblName, CompactionType compactionType) throws Throwable { + HiveConf hiveConf = new HiveConf(primary.getConf()); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.compact(new CompactionRequest(dbName, tblName, compactionType)); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, false); + runWorker(hiveConf); + runCleaner(hiveConf); + } + + private FileStatus[] getDirsInTableLoc(WarehouseInstance wh, String db, String table) throws Throwable { + Path tblLoc = new Path(wh.getTable(db, table).getSd().getLocation()); + FileSystem fs = tblLoc.getFileSystem(wh.getConf()); + return fs.listStatus(tblLoc, EximUtil.getDirectoryFilter(fs)); + } + + @Test + public void testAcidTablesBootstrapWithCompaction() throws Throwable { + String tableName = testName.getMethodName(); + primary.run("use " + primaryDbName) + .run("create table " + tableName + " (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into " + tableName + " values(1)") + .run("insert into " + tableName + " values(2)"); + runCompaction(primaryDbName, tableName, CompactionType.MAJOR); + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {tableName}) + .run("repl status " + replicatedDbName) + .verifyResult(bootstrapDump.lastReplicationId) + .run("select id from " + tableName + " order by id") + .verifyResults(new String[]{"1", "2"}); + + FileStatus[] dirsInLoadPath = getDirsInTableLoc(primary, primaryDbName, tableName); + long writeId = -1; + for (FileStatus fileStatus : dirsInLoadPath) { + if (fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX)) { + writeId = AcidUtils.ParsedBase.parseBase(fileStatus.getPath()).getWriteId(); + assertTrue(AcidUtils.getVisibilityTxnId(fileStatus.getPath().getName()) != -1); + break; + } + } + //compaction is done so there should be a base directory. + assertTrue(writeId != -1); + + dirsInLoadPath = getDirsInTableLoc(replica, replicatedDbName, tableName); + for (FileStatus fileStatus : dirsInLoadPath) { + if (fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX)) { + assertTrue(writeId == AcidUtils.ParsedBase.parseBase(fileStatus.getPath()).getWriteId()); + assertTrue(AcidUtils.getVisibilityTxnId(fileStatus.getPath().getName()) == -1); + writeId = -1; + break; + } + } + //make sure that it has done the verification. + assertTrue(writeId == -1); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 3c508ec6cf..5eaa3a1d42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -364,6 +364,15 @@ static String addVisibilitySuffix(String baseOrDeltaDir, long visibilityTxnId) { return baseOrDeltaDir + VISIBILITY_PREFIX + String.format(DELTA_DIGITS, visibilityTxnId); } + + public static long getVisibilityTxnId(String filename) { + int idxOfv = filename.indexOf(VISIBILITY_PREFIX); + if(idxOfv < 0) { + return -1; + } + return Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())); + } + /** * Represents bucketId and copy_N suffix */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 73c863ed1a..6be8733fc2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -463,7 +464,29 @@ public static Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path String[] subDirs = fileInfo.getSubDir().split(Path.SEPARATOR); Path destination = destRoot; for (String subDir: subDirs) { - destination = new Path(destination, subDir); + // If the directory is created by compactor, then the directory will have the transaction id also. + // In case of replication, the same txn id can not be used at target, as the txn with same id might be a + // aborted or live txn at target. + // In case of bootstrap load, we copy only the committed data, so the directory with only write id + // can be created. The validity txn id can be removed from the directory name. + // TODO : Support for incremental load flow. This can be done once replication of compaction is decided. + if (AcidUtils.getVisibilityTxnId(subDir) > 0) { + if (subDir.startsWith(AcidUtils.BASE_PREFIX)) { + AcidUtils.ParsedBase pb = AcidUtils.ParsedBase.parseBase(new Path(subDir)); + destination = new Path(destination, AcidUtils.baseDir(pb.getWriteId())); + } else if (subDir.startsWith(AcidUtils.DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight pdl = AcidUtils.ParsedDeltaLight.parse(new Path(subDir)); + destination = new Path(destination, AcidUtils.deltaSubdir(pdl.getMinWriteId(), pdl.getMaxWriteId())); + } else if (subDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight pdl = AcidUtils.ParsedDeltaLight.parse(new Path(subDir)); + destination = new Path(destination, AcidUtils.deleteDeltaSubdir(pdl.getMinWriteId(), pdl.getMaxWriteId())); + } else { + LOG.error("Invalid directory prefix " + subDir); + assert (false); // should never happen + } + } else { + destination = new Path(destination, subDir); + } } return destination; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 9e6d47ebc5..36b0718d5c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -692,6 +692,16 @@ public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); } + @Test + public void testgetVisibilityTxnId() { + assertEquals(6, AcidUtils.getVisibilityTxnId("base_0000002_v0000006")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("base_0000102")); + assertEquals(60, AcidUtils.getVisibilityTxnId("delta_0000010_v0000060")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("delta_0000102")); + assertEquals(1000060, AcidUtils.getVisibilityTxnId("delete_delta_0000010_v1000060")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("delete_delta_0000102")); + } + /** * See {@link TestOrcRawRecordMerger#testGetLogicalLength()} */ diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 5fb6d863d1..552509ca22 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1467,7 +1467,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx // Schedule Major compaction on all the partitions/table to clean aborted data if (numAbortedWrites > 0) { - CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(), + CompactionRequest compactRqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); if (rqst.isSetPartNames()) { for (String partName : rqst.getPartNames()) { @@ -3044,7 +3044,8 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { } pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params); - LOG.debug("Going to execute query <" + sb.toString() + ">"); + LOG.debug("Going to execute query <" + sb.toString().replaceAll("\\?", "{}") + ">", + rqst.getDbname(), rqst.getTablename(), rqst.getPartitionname()); ResultSet rs = pst.executeQuery(); if(rs.next()) { long enqueuedId = rs.getLong(1); @@ -3107,7 +3108,8 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { buf.append(")"); String s = buf.toString(); pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); - LOG.debug("Going to execute update <" + s + ">"); + LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", + rqst.getDbname(), rqst.getTablename(), partName, rqst.getProperties(), rqst.getRunas()); pst.executeUpdate(); LOG.debug("Going to commit"); dbConn.commit();