diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java index 371f90bb1f..98c90c3260 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -179,6 +179,63 @@ public void externalTableReplicationWithLocalStaging() throws Throwable { .verifyResult("800"); } + @Test + public void testHdfsMoveOptimizationOnTargetStaging() throws Throwable { + List withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.addAll(externalTableBasePathWithClause()); + primary.run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("insert into table t1 values (1)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('ranchi')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, withClauseOptions); + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select place from t2 where country = 'india'") + .verifyResult("ranchi") + .run("select place from t2 where country = 'us'") + .verifyResult("austin") + .run("select place from t2 where country = 'france'") + .verifyResult("paris"); + + primary.run("use " + primaryDbName) + .run("insert into table t1 values (2)") + .run("insert into table t2 partition(country='india') values ('mysore')") + .run("insert into table t2 partition(country='us') values ('decorah')") + .run("insert into table t2 partition(country='france') values ('yvoire')") + .run("create table t4 (id int)") + .run("insert into table t4 values (4)") + .dump(primaryDbName, withClauseOptions); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResults(new String[] {"1", "2"}) + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select place from t2 where country = 'india'") + .verifyResults(new String[] {"ranchi", "mysore"}) + .run("select place from t2 where country = 'us'") + .verifyResults(new String[]{"austin", "decorah"}) + .run("select place from t2 where country = 'france'") + .verifyResults(new String[]{"paris", "yvoire"}) + .run("select id from t4") + .verifyResult("4"); + + } + private List getStagingLocationConfig(String stagingLoc) { List confList = new ArrayList<>(); confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc + "'"); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java index 2e1e5e0544..af581b6133 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java @@ -114,7 +114,6 @@ public void tearDown() throws Throwable { } @Test - @Ignore("HIVE-23395") public void testAcidTablesReplLoadBootstrapIncr() throws Throwable { // Bootstrap primary.run("use " + primaryDbName) @@ -382,9 +381,11 @@ private void waitForAck(FileSystem fs, Path ackFile, long timeout) throws IOExce //no-op } if (System.currentTimeMillis() - oldTime > timeout) { - break; + boolean a = fs.exists(ackFile); + a = fs.exists(ackFile); + throw new IOException("Timed out waiting for the ack file: " + ackFile.toString()); } } - throw new IOException("Timed out waiting for the ack file: " + ackFile.toString()); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index b78df44e84..a41a414010 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; @@ -57,6 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -225,6 +227,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc } Path stagingDir = replicaWarehousePartitionLocation; + boolean performOnlyMove = Utils.onSameHDFSFileSystem(event.dataPath(), replicaWarehousePartitionLocation); // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; if (event.replicationSpec().isInReplicationScope() && @@ -243,29 +246,34 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc : LoadFileType.OVERWRITE_EXISTING); stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); } - - Task copyTask = ReplCopyTask.getLoadCopyTask( - event.replicationSpec(), - new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), - stagingDir, - context.hiveConf, false, false - ); - + Path partDataSrc = new Path(event.dataPath() + File.separator + getPartitionName(sourceWarehousePartitionLocation)); + Path moveSource = performOnlyMove ? partDataSrc : stagingDir; Task movePartitionTask = null; if (loadFileType != LoadFileType.IGNORE) { // no need to create move task, if file is moved directly to target location. - movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); + movePartitionTask = movePartitionTask(table, partSpec, moveSource, loadFileType); } - - if (ptnRootTask == null) { - ptnRootTask = copyTask; + if (performOnlyMove) { + if (ptnRootTask == null) { + ptnRootTask = addPartTask; + } else { + ptnRootTask.addDependentTask(addPartTask); + } } else { - ptnRootTask.addDependentTask(copyTask); + Task copyTask = ReplCopyTask.getLoadCopyTask( + event.replicationSpec(), + partDataSrc, + stagingDir, context.hiveConf, false, false + ); + if (ptnRootTask == null) { + ptnRootTask = copyTask; + } else { + ptnRootTask.addDependentTask(copyTask); + } + copyTask.addDependentTask(addPartTask); } - // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is // retried for bootstrap, we skip current partition update. - copyTask.addDependentTask(addPartTask); if (movePartitionTask != null) { addPartTask.addDependentTask(movePartitionTask); movePartitionTask.addDependentTask(ckptTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 9e236fd697..a25a7a0175 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; @@ -56,7 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.util.BitSet; import java.util.Collections; import java.util.HashSet; @@ -276,6 +276,7 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent Path fromURI) { Path dataPath = fromURI; Path tmpPath = tgtPath; + boolean performOnlyMove = Utils.onSameHDFSFileSystem(dataPath, tgtPath); // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; @@ -297,9 +298,7 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); - - Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, - false, false); + Path moveSrcPath = performOnlyMove ? dataPath : tmpPath; MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { @@ -307,7 +306,7 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent // Write-id is hardcoded to 1 so that for migration, we just move all original files under base_1 dir. // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), + moveSrcPath, Utilities.getTableDesc(table), new TreeMap<>(), loadFileType, ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID ); loadTableWork.setStmtId(0); @@ -317,20 +316,26 @@ static TableLocationTuple tableLocation(ImportTableDesc tblDesc, Database parent moveWork.setLoadTableWork(loadTableWork); } else { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(tmpPath), + Collections.singletonList(moveSrcPath), Collections.singletonList(tgtPath), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); } } else { LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), + moveSrcPath, Utilities.getTableDesc(table), new TreeMap<>(), loadFileType, 0L ); moveWork.setLoadTableWork(loadTableWork); } moveWork.setIsInReplicationScope(replicationSpec.isInReplicationScope()); Task loadTableTask = TaskFactory.get(moveWork, context.hiveConf); + if (performOnlyMove) { + //If staging directory is on target cluster and on hdfs, use just move opertaion for data copy. + return loadTableTask; + } + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, + false, false); copyTask.addDependentTask(loadTableTask); return copyTask; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index cd92247338..4329f0fab3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -452,7 +453,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, boolean isAutoPurge = false; boolean needRecycle = false; boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); - + boolean performOnlyMove = replicationSpec.isInReplicationScope() && Utils.onSameHDFSFileSystem(dataPath, tgtPath); if (replicationSpec.isInReplicationScope() && (copyToMigratedTxnTable || x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) { lft = LoadFileType.IGNORE; @@ -498,37 +499,39 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, ); } - Task copyTask = null; - if (replicationSpec.isInReplicationScope()) { - copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isAutoPurge, needRecycle, copyToMigratedTxnTable, false); - } else { - copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); - } - MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false); - - + Path dataSrc = performOnlyMove ? dataPath : destPath; if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(destPath), + Collections.singletonList(dataSrc), Collections.singletonList(tgtPath), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); moveWork.setNeedCleanTarget(replace); } else { LoadTableDesc loadTableWork = new LoadTableDesc( - loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); + dataSrc, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId); if (replicationSpec.isMigratingToTxnTable()) { loadTableWork.setInsertOverwrite(replace); } loadTableWork.setStmtId(stmtId); moveWork.setLoadTableWork(loadTableWork); } + Task loadTableTask = TaskFactory.get(moveWork, x.getConf()); + if (performOnlyMove) { + x.getTasks().add(loadTableTask); + return loadTableTask; + } + Task copyTask = null; + if (replicationSpec.isInReplicationScope()) { + copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), + isAutoPurge, needRecycle, copyToMigratedTxnTable, false); + } else { + copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); + } //if Importing into existing table, FileFormat is checked by // ImportSemanticAnalzyer.checked checkTable() - Task loadTableTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@ -591,6 +594,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return addPartTask; } else { String srcLocation = partSpec.getLocation(); + boolean performOnlyMove = false; if (replicationSpec.isInReplicationScope() && !ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType())) { Path partLocation = new Path(partSpec.getLocation()); @@ -602,6 +606,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } String relativePartDataPath = EximUtil.DATA_PATH_NAME + File.separator + bucketDir; srcLocation = new Path(dataDirBase, relativePartDataPath).toString(); + performOnlyMove = Utils.onSameHDFSFileSystem(new Path(srcLocation), new Path(partSpec.getLocation())); } fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition " @@ -644,13 +649,10 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, ) ); } - - Task copyTask = null; - if (replicationSpec.isInReplicationScope()) { - copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false); - } else { - copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); + if (performOnlyMove) { + moveTaskSrc = new Path(srcLocation); + } else if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) { + moveTaskSrc = destPath; } Task addPartTask = null; @@ -661,7 +663,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, addPartTask = TaskFactory.get( new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); } - MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false); @@ -669,7 +670,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, // See setLoadFileType and setIsAcidIow calls elsewhere for an example. if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(destPath), + Collections.singletonList(moveTaskSrc), Collections.singletonList(tgtLocation), true, null, null); moveWork.setMultiFilesDesc(loadFilesWork); @@ -686,9 +687,25 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, loadTableWork.setInheritTableSpecs(false); moveWork.setLoadTableWork(loadTableWork); } + Task loadPartTask = TaskFactory.get(moveWork, x.getConf()); + if (performOnlyMove) { + if (addPartTask != null) { + addPartTask.addDependentTask(loadPartTask); + } + x.getTasks().add(loadPartTask); + return addPartTask == null ? loadPartTask : addPartTask; + } + + Task copyTask = null; + if (replicationSpec.isInReplicationScope()) { + copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, + x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false); + } else { + copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); + } if (loadFileType == LoadFileType.IGNORE) { - // if file is coped directly to the target location, then no need of move task in case the operation getting + // if file is copied directly to the target location, then no need of move task in case the operation getting // replayed is add partition. As add partition will add the event for create partition. Even the statics are // updated properly in create partition flow as the copy is done directly to the partition location. For insert // operations, add partition task is anyways a no-op as alter partition operation does just some statistics @@ -702,7 +719,6 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } return copyTask; } - Task loadPartTask = TaskFactory.get(moveWork, x.getConf()); copyTask.addDependentTask(loadPartTask); if (addPartTask != null) { addPartTask.addDependentTask(loadPartTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 154f02809e..bfb238f158 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,6 +49,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -367,6 +369,17 @@ public static boolean shouldReplicate(NotificationEvent tableForEvent, } } + public static boolean onSameHDFSFileSystem(Path dataPath, Path tgtPath) { + URI dataUri = dataPath.toUri(); + URI tgtUri = tgtPath.toUri(); + boolean hdfsScheme = dataUri.getScheme().equalsIgnoreCase("hdfs") + && tgtUri.getScheme().equalsIgnoreCase("hdfs"); + if (hdfsScheme && StringUtils.equals(dataUri.getAuthority(), tgtUri.getAuthority())) { + return true; + } + return false; + } + public static boolean shouldDumpMetaDataOnly(HiveConf conf) { return conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY); }