diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 27b1b73..3a8dc84e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -495,6 +495,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n" + " files are costly operations. In file system like HDFS where move operation is atomic, this \n" + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), + REPL_MOVE_OPTIMIZED_FILE_SCHEMES("hive.repl.move.optimized.scheme", "s3a, wasb", + "Comma separated list of schemes for which move optimization will be enabled during repl load. \n" + + "This configuration overrides the value set using REPL_ENABLE_MOVE_OPTIMIZATION for the given schemes. \n" + + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " + + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" + + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index a9783ab..9c35aa6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -52,9 +52,17 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -80,6 +88,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -315,6 +324,101 @@ public void testBasic() throws IOException { verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror); } + private abstract class checkTaskPresent { + public boolean hasTask(Task rootTask) { + if (rootTask == null) { + return false; + } + if (validate(rootTask)) { + return true; + } + List> childTasks = rootTask.getChildTasks(); + if (childTasks == null) { + return false; + } + for (Task childTask : childTasks) { + if (hasTask(childTask)) { + return true; + } + } + return false; + } + + public abstract boolean validate(Task task); + } + + private boolean hasMoveTask(Task rootTask) { + checkTaskPresent validator = new checkTaskPresent() { + public boolean validate(Task task) { + return (task instanceof MoveTask); + } + }; + return validator.hasTask(rootTask); + } + + private boolean hasPartitionTask(Task rootTask) { + checkTaskPresent validator = new checkTaskPresent() { + public boolean validate(Task task) { + if (task instanceof DDLTask) { + DDLTask ddlTask = (DDLTask)task; + if (ddlTask.getWork().getAddPartitionDesc() != null) { + return true; + } + } + return false; + } + }; + return validator.hasTask(rootTask); + } + + private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable { + HiveConf confTemp = new HiveConf(); + confTemp.set("hive.repl.enable.move.optimization", "true"); + ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); + Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); + replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); + replLoadTask.executeTask(null); + Hive.getThreadLocal().closeCurrent(); + return replLoadWork.getRootTask(); + } + + @Test + public void testTaskCreationOptimization() throws Throwable { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String dbNameReplica = dbName + "_replica"; + run("create table " + dbName + ".t2 (place string) partitioned by (country string)", driver); + run("insert into table " + dbName + ".t2 partition(country='india') values ('bangalore')", driver); + + Tuple dump = replDumpDb(dbName, null, null, null); + + //bootstrap load should not have move task + Task task = getReplLoadRootTask(dbNameReplica, false, dump); + assertEquals(false, hasMoveTask(task)); + assertEquals(true, hasPartitionTask(task)); + + loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId); + + run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver); + dump = replDumpDb(dbName, dump.lastReplId, null, null); + + //no partition task should be added as the operation is inserting into an existing partition + task = getReplLoadRootTask(dbNameReplica, true, dump); + assertEquals(true, hasMoveTask(task)); + assertEquals(false, hasPartitionTask(task)); + + loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId); + + run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver); + dump = replDumpDb(dbName, dump.lastReplId, null, null); + + //no move task should be added as the operation is adding a dynamic partition + task = getReplLoadRootTask(dbNameReplica, true, dump); + assertEquals(false, hasMoveTask(task)); + assertEquals(true, hasPartitionTask(task)); + } + @Test public void testBasicWithCM() throws Exception { String name = testName.getMethodName(); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 1ea85eb..16d49cf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -50,6 +50,7 @@ import org.junit.Assert; import java.io.IOException; +import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -1393,7 +1394,7 @@ public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Thro .run("insert overwrite table t1 select * from t2") .dump(primaryDbName, tuple.lastReplicationId); - testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "INSERT", tuple); + testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION", tuple); } @Test diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 45b674e..172b4ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -244,7 +244,12 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti tmpPath, context.hiveConf, false, false ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); + + Task movePartitionTask = null; + if (loadFileType != LoadFileType.IGNORE) { + // no need to create move task, if file is moved directly to target location. + movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); + } // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for // bootstrap, we skip current partition update. @@ -260,10 +265,14 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti } else { ptnRootTask.addDependentTask(copyTask); } - copyTask.addDependentTask(addPartTask); - addPartTask.addDependentTask(movePartitionTask); - movePartitionTask.addDependentTask(ckptTask); + copyTask.addDependentTask(addPartTask); + if (movePartitionTask != null) { + addPartTask.addDependentTask(movePartitionTask); + movePartitionTask.addDependentTask(ckptTask); + } else { + addPartTask.addDependentTask(ckptTask); + } return ptnRootTask; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 82f687b..8538463 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -234,7 +234,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + table.getCompleteName() + " with source location: " - + dataPath.toString() + " and target location " + tmpPath.toString()); + + dataPath.toString() + " and target location " + tgtPath.toString()); Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, false, false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index c99d9c1..16ce5d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -609,6 +609,19 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, moveWork.setLoadTableWork(loadTableWork); } + if (loadFileType == LoadFileType.IGNORE) { + // if file is coped directly to the target location, then no need of move task in case the operation getting + // replayed is add partition. As add partition will add the event for create partition. Even the statics are + // updated properly in create partition flow as the copy is done directly to the partition location. For insert + // operations, add partition task is anyways a no-op as alter partition operation does just some statistics + // update which is again done in load operations as part of move task. + if (x.getEventType() == DumpType.EVENT_INSERT) { + copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf())); + } else { + copyTask.addDependentTask(addPartTask); + } + return copyTask; + } Task loadPartTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index cfeb31a..87c69cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -50,6 +50,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -84,8 +85,6 @@ public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; - private static final List CLOUD_SCHEME_PREFIXES = Arrays.asList("s3a", "wasb"); - ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; @@ -222,7 +221,7 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } - private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { + private boolean ifEnableMoveOptimization(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { if (filePath == null) { throw new HiveException("filePath cannot be null"); } @@ -233,7 +232,16 @@ private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration co if (StringUtils.isBlank(scheme)) { throw new HiveException("Cannot get valid scheme for " + filePath); } - return CLOUD_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()); + + LOG.info("scheme is " + scheme); + + String[] schmeList = conf.get(REPL_MOVE_OPTIMIZED_FILE_SCHEMES.varname).toLowerCase().split(","); + for (String schemeIter : schmeList) { + if (schemeIter.trim().equalsIgnoreCase(scheme.trim())) { + return true; + } + } + return false; } // REPL LOAD @@ -326,7 +334,7 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { try { Warehouse wh = new Warehouse(conf); Path filePath = wh.getWhRoot(); - if (isCloudFS(filePath, conf)) { + if (ifEnableMoveOptimization(filePath, conf)) { conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 2267df9..d9333b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -67,7 +67,9 @@ */ OVERWRITE_EXISTING, /** - * No need to move the file, used in case of replication to s3 + * No need to move the file, used in case of replication to s3. If load type is set to ignore, then only the file + * operations(move/rename) is ignored at load table/partition method. Other operations like statistics update, + * event notification happens as usual. */ IGNORE }