diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a561e5771..91f42c0d97 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -495,6 +495,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n" + " files are costly operations. In file system like HDFS where move operation is atomic, this \n" + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), + REPL_MOVE_OPTIMIZED_FILE_SCHEMES("hive.repl.move.optimized.scheme", "s3a, wasb", + "Comma separated list of schemes for which move optimization will be enabled during repl load. \n" + + "This configuration overrides the value set using REPL_ENABLE_MOVE_OPTIMIZATION for the given schemes. \n" + + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " + + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" + + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index fae1a2fa10..3072488388 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -1353,7 +1353,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro .run("insert overwrite table t1 select * from t2") .dump(primaryDbName, tuple.lastReplicationId); - testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "INSERT", tuple); + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION", tuple); } @Test @@ -1430,6 +1430,84 @@ public Boolean apply(@Nullable NotificationEvent entry) { .run(" drop database if exists " + replicatedDbName_CM + " cascade"); } + private void injectableForTaskCreation(String primarydb, String replicadb, String tbl, String eventType, + String eventTypeExpect, WarehouseInstance.Tuple tuple) throws Throwable { + List withConfigs = Arrays.asList("'hive.repl.move.optimized.scheme'= ' hDfs , '"); + + // fail add notification for given event type. + BehaviourInjection callerVerifier + = new BehaviourInjection() { + @Nullable + @Override + public Boolean apply(@Nullable NotificationEvent entry) { + if (entry.getEventType().equalsIgnoreCase(eventType) && entry.getTableName().equalsIgnoreCase(tbl)) { + injectionPathCalled = true; + LOG.warn("Verifier - DB: " + String.valueOf(entry.getDbName()) + + " Table: " + String.valueOf(entry.getTableName()) + + " Event: " + String.valueOf(entry.getEventType())); + return false; + } + if (entry.getEventType().equalsIgnoreCase(eventTypeExpect) && entry.getTableName().equalsIgnoreCase(tbl)) { + nonInjectedPathCalled = true; + } + return true; + } + }; + + InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier); + try { + replica.load(replicadb, tuple.dumpLocation, withConfigs); + } finally { + InjectableBehaviourObjectStore.resetAddNotificationModifier(); + } + + callerVerifier.assertInjectionsPerformed(false, true); + } + + @Test + public void testTaskCreationOptimization() throws Throwable { + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .dump(primaryDbName, null); + + //no insert event should be added + injectableForTaskCreation(primaryDbName, replicatedDbName, "t2", + "INSERT", "ADD_PARTITION", tuple); + + replica.run("use " + replicatedDbName) + .run("select country from t2 where country == 'india'") + .verifyResults(Arrays.asList("india")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='india') values ('delhi')") + .dump(primaryDbName, tuple.lastReplicationId); + + //no ADD_PARTITION event should be added + injectableForTaskCreation(primaryDbName, replicatedDbName, "t2","ADD_PARTITION", + "INSERT", tuple); + + replica.run("use " + replicatedDbName) + .run("select place from t2 where country == 'india'") + .verifyResults(Arrays.asList("bangalore", "delhi")); + + tuple = primary.run("use " + primaryDbName) + .run("insert into table t2 partition(country='us') values ('sf')") + .dump(primaryDbName, tuple.lastReplicationId); + + //no insert event should be added + injectableForTaskCreation(primaryDbName, replicatedDbName, "t2","INSERT", + "ADD_PARTITION", tuple); + + replica.run("use " + replicatedDbName) + .run("select place from t2 where country == 'india'") + .verifyResults(Arrays.asList("bangalore", "delhi")) + .run("select place from t2 where country == 'us'") + .verifyResults(Arrays.asList("sf")); + } + @Test public void testDumpExternalTableSetFalse() throws Throwable { WarehouseInstance.Tuple tuple = primary diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 45b674e287..172b4ac446 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -244,7 +244,12 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti tmpPath, context.hiveConf, false, false ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); + + Task movePartitionTask = null; + if (loadFileType != LoadFileType.IGNORE) { + // no need to create move task, if file is moved directly to target location. + movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); + } // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for // bootstrap, we skip current partition update. @@ -260,10 +265,14 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti } else { ptnRootTask.addDependentTask(copyTask); } - copyTask.addDependentTask(addPartTask); - addPartTask.addDependentTask(movePartitionTask); - movePartitionTask.addDependentTask(ckptTask); + copyTask.addDependentTask(addPartTask); + if (movePartitionTask != null) { + addPartTask.addDependentTask(movePartitionTask); + movePartitionTask.addDependentTask(ckptTask); + } else { + addPartTask.addDependentTask(ckptTask); + } return ptnRootTask; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 82f687b7d6..8538463cc6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -234,7 +234,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + table.getCompleteName() + " with source location: " - + dataPath.toString() + " and target location " + tmpPath.toString()); + + dataPath.toString() + " and target location " + tgtPath.toString()); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, false, false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index c99d9c1b1f..1c3416e527 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -609,6 +609,16 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, moveWork.setLoadTableWork(loadTableWork); } + if (loadFileType == LoadFileType.IGNORE) { + // if file is coped directly to the target location, then no need of move task in case the operation getting + // replayed is add partition. For insert operations, add partition task is anyways a no-op. + if (x.getEventType() == DumpType.EVENT_INSERT) { + copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf())); + } else { + copyTask.addDependentTask(addPartTask); + } + return copyTask; + } Task loadPartTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e4a128182c..eea3ae8bae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -84,8 +85,6 @@ public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; - private static final List CLOUD_SCHEME_PREFIXES = Arrays.asList("s3a", "wasb"); - ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; @@ -222,7 +221,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } - private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { + private boolean ifEnableMoveOptimization(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { if (filePath == null) { throw new HiveException("filePath cannot be null"); } @@ -233,7 +232,16 @@ private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration co if (StringUtils.isBlank(scheme)) { throw new HiveException("Cannot get valid scheme for " + filePath); } - return CLOUD_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()); + + LOG.info("scheme is " + scheme); + + String[] schmeList = conf.get(REPL_MOVE_OPTIMIZED_FILE_SCHEMES.varname).toLowerCase().split(","); + for (String schemeIter : schmeList) { + if (schemeIter.trim().equalsIgnoreCase(scheme.trim())) { + return true; + } + } + return false; } // REPL LOAD @@ -326,7 +334,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { try { Warehouse wh = new Warehouse(conf); Path filePath = wh.getWhRoot(); - if (isCloudFS(filePath, conf)) { + if (ifEnableMoveOptimization(filePath, conf)) { conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); }