diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9d41790dff..cd8c1ed4c4 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -489,6 +489,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "used in conjunction with 'hive.repl.dump.metadata.only' set to false. if 'hive.repl.dump.metadata.only' \n" + " is set to true then this config parameter has no effect as external table meta data is flushed \n" + " always by default."), + REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false, + "If its set to true, REPL LOAD copies data files directly to the target table/partition location \n" + + "instead of copying to staging directory first and then move to target location. This optimizes \n" + + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n" + + " files are costly operations. In file system like HDFS where move operation is atomic, this \n" + + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 3d509f3532..a9783abe10 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3225,6 +3225,93 @@ public void testRecycleFileNonReplDatabase() throws IOException { assertTrue(fileCount == fileCountAfter); } + @Test + public void testMoveOptimizationBootstrap() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String tableNameNoPart = dbName + "_no_part"; + String tableNamePart = dbName + "_part"; + + run(" use " + dbName, driver); + run("CREATE TABLE " + tableNameNoPart + " (fld int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + tableNamePart + " (fld int) partitioned by (part int) STORED AS TEXTFILE", driver); + + run("insert into " + tableNameNoPart + " values (1) ", driver); + run("insert into " + tableNameNoPart + " values (2) ", driver); + verifyRun("SELECT fld from " + tableNameNoPart , new String[]{ "1" , "2" }, driver); + + run("insert into " + tableNamePart + " partition (part=10) values (1) ", driver); + run("insert into " + tableNamePart + " partition (part=10) values (2) ", driver); + run("insert into " + tableNamePart + " partition (part=11) values (3) ", driver); + verifyRun("SELECT fld from " + tableNamePart , new String[]{ "1" , "2" , "3"}, driver); + verifyRun("SELECT fld from " + tableNamePart + " where part = 10" , new String[]{ "1" , "2"}, driver); + verifyRun("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driver); + + String replDbName = dbName + "_replica"; + Tuple dump = replDumpDb(dbName, null, null, null); + run("REPL LOAD " + replDbName + " FROM '" + dump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror); + + run(" use " + replDbName, driverMirror); + verifyRun("SELECT fld from " + tableNamePart , new String[]{ "1" , "2" , "3"}, driverMirror); + verifyRun("SELECT fld from " + tableNamePart + " where part = 10" , new String[]{ "1" , "2"}, driverMirror); + verifyRun("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driverMirror); + verifyRun("SELECT fld from " + tableNameNoPart , new String[]{ "1" , "2" }, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart , new String[]{ "3"}, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart + " where part = 10" , new String[]{ "2"}, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart + " where part = 11" , new String[]{ "1" }, driverMirror); + verifyRun("SELECT count(*) from " + tableNameNoPart , new String[]{ "2" }, driverMirror); + } + + @Test + public void testMoveOptimizationIncremental() throws IOException { + String testName = "testMoveOptimizationIncremental"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_replica"; + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + String[] unptn_data = new String[] { "eleven", "twelve" }; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); + + run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); + + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned ", "2", driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned_late", "2", driverMirror); + + String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" }; + String[] data_after_ovwrite = new String[] { "hundred" }; + run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driver); + run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); + + incrementalDump = replDumpDb(dbName, replDumpId, null, null); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); + + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned", data_after_ovwrite, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned", "1", driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned_late ", "3", driverMirror); + } + private static String createDB(String name, IDriver myDriver) { LOG.info("Testing " + name); run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" + diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index f074428dd5..e043e5446f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -141,10 +141,8 @@ public void tearDown() throws Throwable { primary.run("drop database if exists " + primaryDbName + "_extra cascade"); } - @Test - public void testAcidTablesBootstrap() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = primary - .run("use " + primaryDbName) + private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { + return primary.run("use " + primaryDbName) .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + "tblproperties (\"transactional\"=\"true\")") .run("insert into t1 values(1)") @@ -165,14 +163,15 @@ public void testAcidTablesBootstrap() throws Throwable { .run("insert into t5 values(1111), (2222)") .run("alter table t5 set tblproperties (\"transactional\"=\"true\")") .run("insert into t5 values(3333)") - .dump(primaryDbName, null); + .dump(primaryDbName, fromReplId); + } - replica.load(replicatedDbName, bootstrapDump.dumpLocation) - .run("use " + replicatedDbName) + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { + replica.run("use " + replicatedDbName) .run("show tables") .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"}) .run("repl status " + replicatedDbName) - .verifyResult(bootstrapDump.lastReplicationId) + .verifyResult(lastReplId) .run("select id from t1 order by id") .verifyResults(new String[]{"1", "2"}) .run("select country from t2 order by country") @@ -185,6 +184,32 @@ public void testAcidTablesBootstrap() throws Throwable { .verifyResults(new String[] {"1111", "2222", "3333"}); } + @Test + public void testAcidTablesBootstrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId); + } + + @Test + public void testAcidTablesMoveOptimizationBootStrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId); + } + + @Test + public void testAcidTablesMoveOptimizationIncremental() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, bootstrapDump.lastReplicationId); + replica.load(replicatedDbName, incrDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId); + } + @Test public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { // Open 5 txns diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 35ad982f20..d983dfa936 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -1321,6 +1322,114 @@ public Boolean apply(@Nullable CallerArguments args) { .verifyResult(replicatedDbName + ".testFunctionOne"); } + @Test + public void testMoveOptimizationBootstrapReplLoadRetryAfterFailure() throws Throwable { + String replicatedDbName_CM = replicatedDbName + "_CM"; + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .dump(primaryDbName, null); + + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", + "ADD_PARTITION", tuple); + } + + @Test + public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + String replicatedDbName_CM = replicatedDbName + "_CM"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("create table t1 (place string) partitioned by (country string)") + .dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") + .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + + tuple = primary.run("use " + primaryDbName) + .run("insert overwrite table t1 select * from t2") + .dump(primaryDbName, tuple.lastReplicationId); + + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "INSERT", tuple); + } + + @Test + public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + String replicatedDbName_CM = replicatedDbName + "_CM"; + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("ALTER TABLE t2 ADD PARTITION (country='india')") + .dump(primaryDbName, null); + replica.load(replicatedDbName, tuple.dumpLocation, withConfigs); + replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')") + .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('bangalore')") + .dump(primaryDbName, tuple.lastReplicationId); + + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple); + } + + private void testMoveOptimization(String primarydb, String replicadb, String replicatedDbName_CM, + String tbl, String eventType, WarehouseInstance.Tuple tuple) throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.enable.move.optimization'='true'"); + + // fail add notification for given event type. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable NotificationEvent entry) { + if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName()) + + " Table: " + String.valueOf(entry.getTableName()) + + " Event: " + String.valueOf(entry.getEventType())); + return false; + } + return true; + } + }; + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); + try { + replica.loadFailure(replicadb, tuple.dumpLocation, withConfigs); + } finally { + InjectableBehaviourObjectStore.resetAddNotificationModifier(); + } + + callerVerifier.assertInjectionsPerformed(true, false); + replica.load(replicadb, tuple.dumpLocation, withConfigs); + + replica.run("use " + replicadb) + .run("select country from " + tbl + " where country == 'india'") + .verifyResults(Arrays.asList("india")); + + primary.run("use " + primarydb) + .run("drop table " + tbl); + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); + try { + replica.loadFailure(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + } finally { + InjectableBehaviourObjectStore.resetAddNotificationModifier(); + } + + callerVerifier.assertInjectionsPerformed(true, false); + replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs); + + replica.run("use " + replicatedDbName_CM) + .run("select country from " + tbl) + .verifyResults(Arrays.asList("india")) + .run(" drop database if exists " + replicatedDbName_CM + " cascade"); + } + @Test public void testDumpExternalTableSetFalse() throws Throwable { WarehouseInstance.Tuple tuple = primary diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 827721f3e8..c1cc6335de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -333,6 +333,9 @@ public int execute(DriverContext driverContext) { for (int i = 0; i implements Serializable { private static final long serialVersionUID = 1L; @@ -61,7 +65,10 @@ protected int execute(DriverContext driverContext) { LOG.debug("ReplCopyTask.execute()"); FileSystem dstFs = null; Path toPath = null; + try { + Hive hiveDb = getHive(); + // Note: CopyWork supports copying multiple files, but ReplCopyWork doesn't. // Not clear of ReplCopyWork should inherit from CopyWork. if (work.getFromPaths().length > 1 || work.getToPaths().length > 1) { @@ -136,6 +143,15 @@ protected int execute(DriverContext driverContext) { } LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + + // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if + // its a replace (insert overwrite ) operation. + if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) { + LOG.debug(" path " + toPath + " is cleaned before renaming"); + hiveDb.cleanUpOneDirectoryForReplace(toPath, dstFs, HIDDEN_FILES_PATH_FILTER, conf, work.getNeedRecycle(), + work.getIsAutoPerge()); + } + if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; @@ -156,6 +172,14 @@ protected int execute(DriverContext driverContext) { if (dstFs.exists(destFile)) { String destFileWithSourceName = srcFile.getSourcePath().getName(); Path newDestFile = new Path(destRoot, destFileWithSourceName); + + // if the new file exist then delete it before renaming, to avoid rename failure. If the copy is done + // directly to table path (bypassing staging directory) then there might be some stale files from previous + // incomplete/failed load. No need of recycle as this is a case of stale file. + if (dstFs.exists(newDestFile)) { + LOG.debug(" file " + newDestFile + " is deleted before renaming"); + dstFs.delete(newDestFile, true); + } boolean result = dstFs.rename(destFile, newDestFile); if (!result) { throw new IllegalStateException( @@ -223,11 +247,17 @@ public String getName() { return "REPL_COPY"; } - public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) { + public static Task getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle) { Task copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + rcwork.setDeleteDestIfExist(true); + rcwork.setAutoPurge(isAutoPurge); + rcwork.setNeedRecycle(needRecycle); + } LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c0cfc439d2..45b674e287 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString; @@ -224,14 +225,26 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti + partSpecToString(partSpec.getPartSpec()) + " with source location: " + partSpec.getLocation()); - Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + Path tmpPath = replicaWarehousePartitionLocation; + + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType; + if (event.replicationSpec().isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + loadFileType = + event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + } + Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, - context.hiveConf + context.hiveConf, false, false ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath); + Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for // bootstrap, we skip current partition update. @@ -257,7 +270,8 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti /** * This will create the move of partition data from temp path to actual path */ - private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) { + private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath, + LoadFileType loadFileType) { MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( @@ -268,7 +282,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + loadFileType, 0L ); loadTableWork.setInheritTableSpecs(false); moveWork.setLoadTableWork(loadTableWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 089b529b7d..82f687b7d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.TreeMap; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; public class LoadTable { @@ -218,9 +219,25 @@ private String location(ImportTableDesc tblDesc, Database parentDb) private Task loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) { Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); - Path tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); - Task copyTask = - ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + Path tmpPath = tgtPath; + + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType; + if (replicationSpec.isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + loadFileType = + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); + } + + LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + + table.getCompleteName() + " with source location: " + + dataPath.toString() + " and target location " + tmpPath.toString()); + + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, + false, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { @@ -232,7 +249,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + loadFileType, 0L ); moveWork.setLoadTableWork(loadTableWork); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0b1048c589..50e8c14ac0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1886,7 +1886,8 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par // to ACID updates. So the are not themselves ACID. // Note: this assumes both paths are qualified; which they are, currently. - if ((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) { + if (((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) || + (loadFileType == LoadFileType.IGNORE)) { // MM insert query, move itself is a no-op. if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); @@ -2497,7 +2498,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType } // Note: this assumes both paths are qualified; which they are, currently. - if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { + if (((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) || (loadFileType == LoadFileType.IGNORE)) { /** * some operations on Transactional tables (e.g. Import) write directly to the final location * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be @@ -4446,7 +4447,7 @@ private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, } - private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs, + public void cleanUpOneDirectoryForReplace(Path path, FileSystem fs, PathFilter pathFilter, HiveConf conf, boolean purge, boolean isNeedRecycle) throws IOException, HiveException { if (isNeedRecycle && conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { recycleDirToCmPath(path, purge); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 6fbe29c5ec..fc9a95cd24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ReplCopyTask; @@ -78,6 +79,8 @@ import java.util.Map; import java.util.TreeMap; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; + /** * ImportSemanticAnalyzer. * @@ -389,33 +392,50 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, private static Task loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId) { + Long writeId, int stmtId) throws HiveException { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { - String mmSubdir = replace ? AcidUtils.baseDir(writeId) - : AcidUtils.deltaSubdir(writeId, writeId, stmtId); - destPath = new Path(tgtPath, mmSubdir); - /** - * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition - * directory, i.e. the final destination for these files. This has to be a copy to preserve - * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. - * So setting 'loadPath' this way will make - * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, - * boolean, Long, int)} - * skip the unnecessary file (rename) operation but it will perform other things. - */ - loadPath = tgtPath; - lft = LoadFileType.KEEP_EXISTING; + boolean isAutoPurge; + boolean needRecycle; + + if (replicationSpec.isInReplicationScope() && + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + lft = LoadFileType.IGNORE; + destPath = loadPath = tgtPath; + isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); + if (table.isTemporary()) { + needRecycle = false; + } else { + org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); + needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); + } } else { - destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); - lft = replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { + String mmSubdir = replace ? AcidUtils.baseDir(writeId) + : AcidUtils.deltaSubdir(writeId, writeId, stmtId); + destPath = new Path(tgtPath, mmSubdir); + /** + * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition + * directory, i.e. the final destination for these files. This has to be a copy to preserve + * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. + * So setting 'loadPath' this way will make + * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, + * boolean, Long, int)} + * skip the unnecessary file (rename) operation but it will perform other things. + */ + loadPath = tgtPath; + lft = LoadFileType.KEEP_EXISTING; + } else { + destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); + lft = replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + } + needRecycle = false; + isAutoPurge = false; } - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for table with source location: " + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + writeId + @@ -428,7 +448,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); + copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), + isAutoPurge, needRecycle); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -442,7 +463,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Collections.singletonList(tgtPath), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); - moveWork.setNeedCleanTarget(false); + moveWork.setNeedCleanTarget(replace); } else { LoadTableDesc loadTableWork = new LoadTableDesc( loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); @@ -496,11 +517,14 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); } - private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, + private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); + boolean isAutoPurge; + boolean needRecycle; + if (tblDesc.isExternal() && tblDesc.getLocation() == null) { x.getLOG().debug("Importing in-place: adding AddPart for partition " + partSpecToString(partSpec.getPartSpec())); @@ -516,11 +540,30 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - //Replication scope the write id will be invalid - Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || - replicationSpec.isInReplicationScope(); - Path destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) - : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + + LoadFileType loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + Path destPath; + if (replicationSpec.isInReplicationScope() && + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + loadFileType = LoadFileType.IGNORE; + destPath = tgtLocation; + isAutoPurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); + if (table.isTemporary()) { + needRecycle = false; + } else { + org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); + needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); + } + } else { + //Replication scope the write id will be invalid + Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || + replicationSpec.isInReplicationScope(); + destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) + : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + isAutoPurge = false; + needRecycle = false; + } + Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: " @@ -535,7 +578,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask( - replicationSpec, new Path(srcLocation), destPath, x.getConf()); + replicationSpec, new Path(srcLocation), destPath, x.getConf(), isAutoPurge, needRecycle); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } @@ -554,11 +597,11 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Collections.singletonList(tgtLocation), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); - moveWork.setNeedCleanTarget(false); + moveWork.setNeedCleanTarget(replicationSpec.isReplace()); } else { LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, + loadFileType, writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e4186c45a8..e4a128182c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -40,12 +41,15 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import java.io.FileNotFoundException; +import java.net.URI; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -80,6 +84,8 @@ public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; + private static final List CLOUD_SCHEME_PREFIXES = Arrays.asList("s3a", "wasb"); + ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; @@ -216,6 +222,20 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } + private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { + if (filePath == null) { + throw new HiveException("filePath cannot be null"); + } + + URI uri = filePath.toUri(); + String scheme = uri.getScheme(); + scheme = StringUtils.isBlank(scheme) ? FileSystem.get(uri, conf).getScheme() : scheme; + if (StringUtils.isBlank(scheme)) { + throw new HiveException("Cannot get valid scheme for " + filePath); + } + return CLOUD_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()); + } + // REPL LOAD private void initReplLoad(ASTNode ast) throws SemanticException { path = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -302,6 +322,18 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg()); } + // Ths config is set to make sure that in case of s3 replication, move is skipped. + try { + Warehouse wh = new Warehouse(conf); + Path filePath = wh.getWhRoot(); + if (isCloudFS(filePath, conf)) { + conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); + LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); + } + } catch (Exception e) { + throw new SemanticException(e.getMessage(), e); + } + // Now, the dumped path can be one of three things: // a) It can be a db dump, in which case we expect a set of dirs, each with a // db name, and with a _metadata file in each, and table dirs inside that. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index f04cd93069..82a722fbc8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -68,7 +68,9 @@ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.met } Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME); - withinContext.replicationSpec.setIsReplace(true); + // In case of ACID operations, same directory may have many other sub directory for different write id stmt id + // combination. So we can not set isreplace to true. + withinContext.replicationSpec.setIsReplace(false); EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath, qlMdTable, qlPtns, withinContext.replicationSpec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java index 3a32885d1d..14c7f06af6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java @@ -195,7 +195,7 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri) Task copyTask = ReplCopyTask.getLoadCopyTask( metadata.getReplicationSpec(), new Path(sourceUri), qualifiedDestinationPath, - context.hiveConf + context.hiveConf, false, false ); replCopyTasks.add(copyTask); ResourceUri destinationUri = diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index f32016725a..2267df934e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -65,7 +65,11 @@ * the file instead of making a duplicate copy. * If any file exist while copy, then just overwrite the file */ - OVERWRITE_EXISTING + OVERWRITE_EXISTING, + /** + * No need to move the file, used in case of replication to s3 + */ + IGNORE } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index 3ff9f2fdf2..fac5affdc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -49,6 +49,12 @@ // If set to false, it'll behave as a traditional CopyTask. protected boolean readSrcAsFilesList = false; + private boolean deleteDestIfExist = false; + + private boolean isAutoPurge = false; + + private boolean needRecycle = false; + private String distCpDoAsUser = null; public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { @@ -70,4 +76,28 @@ public void setDistCpDoAsUser(String distCpDoAsUser) { public String distCpDoAsUser() { return distCpDoAsUser; } + + public boolean getDeleteDestIfExist() { + return deleteDestIfExist; + } + + public void setDeleteDestIfExist(boolean deleteDestIfExist) { + this.deleteDestIfExist = deleteDestIfExist; + } + + public boolean getNeedRecycle() { + return needRecycle; + } + + public void setNeedRecycle(boolean needRecycle) { + this.needRecycle = needRecycle; + } + + public boolean getIsAutoPerge() { + return isAutoPurge; + } + + public void setAutoPurge(boolean isAutoPurge) { + this.isAutoPurge = isAutoPurge; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java index 7b6c3e7507..2771cf00b4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java @@ -92,7 +92,7 @@ public void createDestinationPath() throws IOException, SemanticException, URISy mockStatic(ReplCopyTask.class); Task mock = mock(Task.class); when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class), - any(HiveConf.class))).thenReturn(mock); + any(HiveConf.class), false, false)).thenReturn(mock); ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR, "hdfs://localhost:9000/user/someplace/ab.jar#e094828883")); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java index 6ca3e5d8bb..3d5395adde 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import static org.junit.Assert.assertEquals; @@ -82,6 +83,8 @@ public CallerArguments(String dbName) { private static com.google.common.base.Function callerVerifier = null; + private static com.google.common.base.Function addNotificationEventModifier = null; + // Methods to set/reset getTable modifier public static void setGetTableBehaviour(com.google.common.base.Function modifier){ getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier; @@ -115,6 +118,14 @@ public static void setGetNextNotificationBehaviour( getNextNotificationModifier = (modifier == null)? com.google.common.base.Functions.identity() : modifier; } + public static void setAddNotificationModifier(com.google.common.base.Function modifier) { + addNotificationEventModifier = modifier; + } + + public static void resetAddNotificationModifier() { + setAddNotificationModifier(null); + } + public static void resetGetNextNotificationBehaviour(){ setGetNextNotificationBehaviour(null); } @@ -156,6 +167,18 @@ public NotificationEventResponse getNextNotification(NotificationEventRequest rq return getNextNotificationModifier.apply(super.getNextNotification(rqst)); } + @Override + public void addNotificationEvent(NotificationEvent entry) throws MetaException { + if (addNotificationEventModifier != null) { + Boolean success = addNotificationEventModifier.apply(entry); + if ((success != null) && !success) { + throw new MetaException("InjectableBehaviourObjectStore: Invalid addNotificationEvent operation on DB: " + + entry.getDbName() + " table: " + entry.getTableName() + " event : " + entry.getEventType()); + } + } + super.addNotificationEvent(entry); + } + @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { if (callerVerifier != null) {