diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java index 8593480724..8a4bb38e97 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java @@ -376,8 +376,8 @@ void verifyCompactionQueue(Map tables, String dbName, HiveConf con Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), entry.getValue().longValue(), TxnDbUtil.countQueryAgent(conf, - "select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName - + "' and cq_table = '" + entry.getKey() + "'")); + "select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName.toLowerCase() + + "' and cq_table = '" + entry.getKey().toLowerCase() + "'")); } } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 63b32c83db..bb31fe2a07 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -17,11 +17,17 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -33,6 +39,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.Utils; @@ -50,11 +57,15 @@ import java.util.Collections; import java.util.Map; +import static org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner; +import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.junit.Assert.assertTrue; /** * TestReplicationScenariosAcidTables - test replication for ACID tables. */ + public class TestReplicationScenariosAcidTables extends BaseReplicationScenariosAcidTables { @BeforeClass @@ -670,4 +681,164 @@ public void testMultiDBTxn() throws Throwable { replica.run("drop database " + dbName1 + " cascade"); replica.run("drop database " + dbName2 + " cascade"); } + + private void runCompaction(String dbName, String tblName, String partName, CompactionType compactionType) + throws Throwable { + HiveConf hiveConf = new HiveConf(primary.getConf()); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + CompactionRequest rqst = new CompactionRequest(dbName, tblName, compactionType); + rqst.setPartitionname(partName); + txnHandler.compact(rqst); + hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, false); + runWorker(hiveConf); + runCleaner(hiveConf); + } + + private FileStatus[] getDirsInTableLoc(WarehouseInstance wh, String db, String table) throws Throwable { + Path tblLoc = new Path(wh.getTable(db, table).getSd().getLocation()); + FileSystem fs = tblLoc.getFileSystem(wh.getConf()); + return fs.listStatus(tblLoc, EximUtil.getDirectoryFilter(fs)); + } + + private FileStatus[] getDirsInPartitionLoc(WarehouseInstance wh, Partition partition) + throws Throwable { + Path tblLoc = new Path(partition.getSd().getLocation()); + FileSystem fs = tblLoc.getFileSystem(wh.getConf()); + return fs.listStatus(tblLoc, EximUtil.getDirectoryFilter(fs)); + } + + private long getMinorCompactedTxnId(FileStatus[] fileStatuses) { + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight delta = AcidUtils.ParsedDelta.parse(fileStatus.getPath()); + if (delta.getVisibilityTxnId() != 0) { + return delta.getVisibilityTxnId(); + } + } + } + return -1; + } + + private long getMajorCompactedWriteId(FileStatus[] fileStatuses, boolean replica) { + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.getPath().getName().startsWith(AcidUtils.BASE_PREFIX)) { + long writeId = AcidUtils.ParsedBase.parseBase(fileStatus.getPath()).getWriteId(); + if (replica) { + // for replica database, visibility txn id should be removed during repl copy. + assertTrue(AcidUtils.getVisibilityTxnId(fileStatus.getPath().getName()) == -1); + } else { + assertTrue(AcidUtils.getVisibilityTxnId(fileStatus.getPath().getName()) != -1); + } + return writeId; + } + } + return -1; + } + + @Test + public void testAcidTablesBootstrapWithMajorCompaction() throws Throwable { + String tableName = testName.getMethodName(); + String tableNamepart = testName.getMethodName() + "_part"; + primary.run("use " + primaryDbName) + .run("create table " + tableName + " (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into " + tableName + " values(1)") + .run("insert into " + tableName + " values(2)") + .run("create table " + tableNamepart + " (id int) partitioned by (part int) clustered by(id) " + + "into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\") ") + .run("insert into " + tableNamepart + " values(1, 2)") + .run("insert into " + tableNamepart + " values(2, 2)"); + + runCompaction(primaryDbName, tableName, null, CompactionType.MAJOR); + + List partList = primary.getAllPartitions(primaryDbName, tableNamepart); + for (Partition part : partList) { + Table tbl = primary.getTable(primaryDbName, tableNamepart); + String partName = Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()); + runCompaction(primaryDbName, tableNamepart, partName, CompactionType.MAJOR); + } + + WarehouseInstance.Tuple dump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, dump.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {tableName, tableNamepart}) + .run("repl status " + replicatedDbName) + .verifyResult(dump.lastReplicationId) + .run("select id from " + tableName + " order by id") + .verifyResults(new String[]{"1", "2"}) + .run("select id from " + tableNamepart + " order by id") + .verifyResults(new String[]{"1", "2"}); + + FileStatus[] fileStatuses = getDirsInTableLoc(primary, primaryDbName, tableName); + long writeId = getMajorCompactedWriteId(fileStatuses, false); + assertTrue(writeId != -1); + + fileStatuses = getDirsInTableLoc(replica, replicatedDbName, tableName); + // replica write id should be same as source write id. + assertTrue(writeId == getMajorCompactedWriteId(fileStatuses, true)); + + // check for partitioned table. + for (Partition part : partList) { + fileStatuses = getDirsInPartitionLoc(primary, part); + writeId = getMajorCompactedWriteId(fileStatuses, false); + assertTrue(writeId != -1); + Partition partReplica = replica.getPartition(replicatedDbName, tableNamepart, part.getValues()); + fileStatuses = getDirsInPartitionLoc(replica, partReplica); + assertTrue(writeId == getMajorCompactedWriteId(fileStatuses, true)); + } + } + + @Test + public void testAcidTablesBootstrapWithMinorCompaction() throws Throwable { + String tableName = testName.getMethodName(); + String tableNamepart = testName.getMethodName() + "_part"; + primary.run("use " + primaryDbName) + .run("create table " + tableName + " (id int) clustered by(id) into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\")") + .run("insert into " + tableName + " values(1)") + .run("insert into " + tableName + " values(2)") + .run("create table " + tableNamepart + " (id int) partitioned by (part int) clustered by(id) " + + "into 3 buckets stored as orc " + + "tblproperties (\"transactional\"=\"true\") ") + .run("insert into " + tableNamepart + " values(1, 2)") + .run("insert into " + tableNamepart + " values(2, 2)"); + + runCompaction(primaryDbName, tableName, null, CompactionType.MINOR); + + List partList = primary.getAllPartitions(primaryDbName, tableNamepart); + for (Partition part : partList) { + Table tbl = primary.getTable(primaryDbName, tableNamepart); + String partName = Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues()); + runCompaction(primaryDbName, tableNamepart, partName, CompactionType.MINOR); + } + + WarehouseInstance.Tuple dump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, dump.dumpLocation); + replica.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {tableName, tableNamepart}) + .run("repl status " + replicatedDbName) + .verifyResult(dump.lastReplicationId) + .run("select id from " + tableName + " order by id") + .verifyResults(new String[]{"1", "2"}) + .run("select id from " + tableNamepart + " order by id") + .verifyResults(new String[]{"1", "2"}); + + FileStatus[] fileStatuses = getDirsInTableLoc(primary, primaryDbName, tableName); + assertTrue(-1 != getMinorCompactedTxnId(fileStatuses)); + + fileStatuses = getDirsInTableLoc(replica, replicatedDbName, tableName); + Assert.assertEquals(-1, getMinorCompactedTxnId(fileStatuses)); + + // check for partitioned table. + for (Partition part : partList) { + fileStatuses = getDirsInPartitionLoc(primary, part); + assertTrue(-1 != getMinorCompactedTxnId(fileStatuses)); + Partition partReplica = replica.getPartition(replicatedDbName, tableNamepart, part.getValues()); + fileStatuses = getDirsInPartitionLoc(replica, partReplica); + assertTrue(-1 == getMinorCompactedTxnId(fileStatuses)); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 3c508ec6cf..5eaa3a1d42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -364,6 +364,15 @@ static String addVisibilitySuffix(String baseOrDeltaDir, long visibilityTxnId) { return baseOrDeltaDir + VISIBILITY_PREFIX + String.format(DELTA_DIGITS, visibilityTxnId); } + + public static long getVisibilityTxnId(String filename) { + int idxOfv = filename.indexOf(VISIBILITY_PREFIX); + if(idxOfv < 0) { + return -1; + } + return Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())); + } + /** * Represents bucketId and copy_N suffix */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 73c863ed1a..b7bae68535 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -463,7 +464,30 @@ public static Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path String[] subDirs = fileInfo.getSubDir().split(Path.SEPARATOR); Path destination = destRoot; for (String subDir: subDirs) { - destination = new Path(destination, subDir); + // If the directory is created by compactor, then the directory will have the transaction id also. + // In case of replication, the same txn id can not be used at target, as the txn with same id might be a + // aborted or live txn at target. + // In case of bootstrap load, we copy only the committed data, so the directory with only write id + // can be created. The validity txn id can be removed from the directory name. + // TODO : As of now compaction is not replicated so this logic will work. Need to revisit the logic once + // we start replicating the compaction in incremental load. + if (AcidUtils.getVisibilityTxnId(subDir) > 0) { + if (subDir.startsWith(AcidUtils.BASE_PREFIX)) { + AcidUtils.ParsedBase pb = AcidUtils.ParsedBase.parseBase(new Path(subDir)); + destination = new Path(destination, AcidUtils.baseDir(pb.getWriteId())); + } else if (subDir.startsWith(AcidUtils.DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight pdl = AcidUtils.ParsedDeltaLight.parse(new Path(subDir)); + destination = new Path(destination, AcidUtils.deltaSubdir(pdl.getMinWriteId(), pdl.getMaxWriteId())); + } else if (subDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) { + AcidUtils.ParsedDeltaLight pdl = AcidUtils.ParsedDeltaLight.parse(new Path(subDir)); + destination = new Path(destination, AcidUtils.deleteDeltaSubdir(pdl.getMinWriteId(), pdl.getMaxWriteId())); + } else { + LOG.error("Invalid directory prefix " + subDir); + assert (false); // should never happen + } + } else { + destination = new Path(destination, subDir); + } } return destination; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 9e6d47ebc5..36b0718d5c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -692,6 +692,16 @@ public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); } + @Test + public void testgetVisibilityTxnId() { + assertEquals(6, AcidUtils.getVisibilityTxnId("base_0000002_v0000006")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("base_0000102")); + assertEquals(60, AcidUtils.getVisibilityTxnId("delta_0000010_v0000060")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("delta_0000102")); + assertEquals(1000060, AcidUtils.getVisibilityTxnId("delete_delta_0000010_v1000060")); + assertEquals(-1, AcidUtils.getVisibilityTxnId("delete_delta_0000102")); + } + /** * See {@link TestOrcRawRecordMerger#testGetLogicalLength()} */ diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 64a542997b..7cc81e7e35 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1488,7 +1488,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx // Schedule Major compaction on all the partitions/table to clean aborted data if (numAbortedWrites > 0) { - CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(), + CompactionRequest compactRqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); if (rqst.isSetPartNames()) { for (String partName : rqst.getPartNames()) { @@ -3068,7 +3068,8 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { } pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params); - LOG.debug("Going to execute query <" + sb.toString() + ">"); + LOG.debug("Going to execute query <" + sb.toString().replaceAll("\\?", "{}") + ">", + rqst.getDbname(), rqst.getTablename(), rqst.getPartitionname()); ResultSet rs = pst.executeQuery(); if(rs.next()) { long enqueuedId = rs.getLong(1); @@ -3131,7 +3132,8 @@ public CompactionResponse compact(CompactionRequest rqst) throws MetaException { buf.append(")"); String s = buf.toString(); pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); - LOG.debug("Going to execute update <" + s + ">"); + LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">", + rqst.getDbname(), rqst.getTablename(), partName, rqst.getProperties(), rqst.getRunas()); pst.executeUpdate(); LOG.debug("Going to commit"); dbConn.commit();