diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d7445c..bc75c3c00e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -489,6 +489,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "used in conjunction with 'hive.repl.dump.metadata.only' set to false. if 'hive.repl.dump.metadata.only' \n" + " is set to true then this config parameter has no effect as external table meta data is flushed \n" + " always by default."), + REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false, + "If its set to true, replcopy task copy the source files directly to the destination path. \n" + + "So no move happens from staging directory to final destination directory. Creation of \n" + + " staging directory is also skipped if its set to true. This is to avoid the overhead of creating \n" + + " the staging directory and move in file system like s3 and wasb."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 3d509f3532..bc6c594fe9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -3225,6 +3225,94 @@ public void testRecycleFileNonReplDatabase() throws IOException { assertTrue(fileCount == fileCountAfter); } + @Test + public void testMoveOptimizationBootstrap() throws IOException { + String name = testName.getMethodName(); + String dbName = createDB(name, driver); + String tableNameNoPart = dbName + "_no_part"; + String tableNamePart = dbName + "_part"; + + run(" use " + dbName, driver); + run("CREATE TABLE " + tableNameNoPart + " (fld int) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + tableNamePart + " (fld int) partitioned by (part int) STORED AS TEXTFILE", driver); + + run("insert into " + tableNameNoPart + " values (1) ", driver); + run("insert into " + tableNameNoPart + " values (2) ", driver); + verifySetup("SELECT fld from " + tableNameNoPart , new String[]{ "1" , "2" }, driver); + + run("insert into " + tableNamePart + " partition (part=10) values (1) ", driver); + run("insert into " + tableNamePart + " partition (part=10) values (2) ", driver); + run("insert into " + tableNamePart + " partition (part=11) values (3) ", driver); + verifySetup("SELECT fld from " + tableNamePart , new String[]{ "1" , "2" , "3"}, driver); + verifySetup("SELECT fld from " + tableNamePart + " where part = 10" , new String[]{ "1" , "2"}, driver); + verifySetup("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driver); + + String replDbName = dbName + "_replica"; + Tuple dump = replDumpDb(dbName, null, null, null); + run("REPL LOAD " + replDbName + " FROM '" + dump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror); + + run(" use " + replDbName, driverMirror); + verifyRun("SELECT fld from " + tableNamePart , new String[]{ "1" , "2" , "3"}, driverMirror); + verifyRun("SELECT fld from " + tableNamePart + " where part = 10" , new String[]{ "1" , "2"}, driverMirror); + verifyRun("SELECT fld from " + tableNamePart + " where part = 11" , new String[]{ "3" }, driverMirror); + verifyRun("SELECT fld from " + tableNameNoPart , new String[]{ "1" , "2" }, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart , new String[]{ "3"}, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart + " where part = 10" , new String[]{ "2"}, driverMirror); + verifyRun("SELECT count(*) from " + tableNamePart + " where part = 11" , new String[]{ "1" }, driverMirror); + verifyRun("SELECT count(*) from " + tableNameNoPart , new String[]{ "2" }, driverMirror); + } + + @Test + public void testMoveOptimizationIncremental() throws IOException { + String testName = "testMoveOptimizationIncremental"; + String dbName = createDB(testName, driver); + String replDbName = dbName + "_replica"; + + Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); + String replDumpId = bootstrapDump.lastReplId; + + String[] unptn_data = new String[] { "eleven", "twelve" }; + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data, driver); + + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver); + + Tuple incrementalDump = replDumpDb(dbName, replDumpId, null, null); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); + replDumpId = incrementalDump.lastReplId; + + verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned ", "2", driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned_late", "2", driverMirror); + + String[] unptn_data_after_ins = new String[] { "eleven", "thirteen", "twelve" }; + String[] data_after_ovwrite = new String[] { "hundred" }; + run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[1] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driver); + run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver); + verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver); + + incrementalDump = replDumpDb(dbName, replDumpId, null, null); + run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation + + "' with ('hive.repl.enable.move.optimization'='true')", driverMirror); + verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror); + + verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror); + verifyRun("SELECT a from " + replDbName + ".unptned", data_after_ovwrite, driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned", "1", driverMirror); + verifyRun("SELECT count(*) from " + replDbName + ".unptned_late ", "3", driverMirror); + } + private static String createDB(String name, IDriver myDriver) { LOG.info("Testing " + name); run("CREATE DATABASE " + name + " WITH DBPROPERTIES ( '" + diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index f074428dd5..e043e5446f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -141,10 +141,8 @@ public void tearDown() throws Throwable { primary.run("drop database if exists " + primaryDbName + "_extra cascade"); } - @Test - public void testAcidTablesBootstrap() throws Throwable { - WarehouseInstance.Tuple bootstrapDump = primary - .run("use " + primaryDbName) + private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { + return primary.run("use " + primaryDbName) .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " + "tblproperties (\"transactional\"=\"true\")") .run("insert into t1 values(1)") @@ -165,14 +163,15 @@ public void testAcidTablesBootstrap() throws Throwable { .run("insert into t5 values(1111), (2222)") .run("alter table t5 set tblproperties (\"transactional\"=\"true\")") .run("insert into t5 values(3333)") - .dump(primaryDbName, null); + .dump(primaryDbName, fromReplId); + } - replica.load(replicatedDbName, bootstrapDump.dumpLocation) - .run("use " + replicatedDbName) + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { + replica.run("use " + replicatedDbName) .run("show tables") .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"}) .run("repl status " + replicatedDbName) - .verifyResult(bootstrapDump.lastReplicationId) + .verifyResult(lastReplId) .run("select id from t1 order by id") .verifyResults(new String[]{"1", "2"}) .run("select country from t2 order by country") @@ -185,6 +184,32 @@ public void testAcidTablesBootstrap() throws Throwable { .verifyResults(new String[] {"1111", "2222", "3333"}); } + @Test + public void testAcidTablesBootstrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId); + } + + @Test + public void testAcidTablesMoveOptimizationBootStrap() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId); + } + + @Test + public void testAcidTablesMoveOptimizationIncremental() throws Throwable { + WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootstrapDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, bootstrapDump.lastReplicationId); + replica.load(replicatedDbName, incrDump.dumpLocation, + Collections.singletonList("'hive.repl.enable.move.optimization'='true'")); + verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId); + } + @Test public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable { // Open 5 txns diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 827721f3e8..c1cc6335de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -333,6 +333,9 @@ public int execute(DriverContext driverContext) { for (int i = 0; i implements Serializable { private static final long serialVersionUID = 1L; @@ -136,6 +138,13 @@ protected int execute(DriverContext driverContext) { } LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size()); + + // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if + // its a replace (insert overwrite ) operation. + if (work.getDeleteDestIfExist() && FileUtils.getFileStatusOrNull(dstFs, toPath) != null) { + dstFs.delete(toPath, true); + } + if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; @@ -228,6 +237,9 @@ public String getName() { LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + if (replicationSpec.isReplace() && conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + rcwork.setDeleteDestIfExist(true); + } LOG.debug("ReplCopyTask:\trcwork"); if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c0cfc439d2..24b75df452 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.partSpecToString; @@ -224,14 +225,25 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti + partSpecToString(partSpec.getPartSpec()) + " with source location: " + partSpec.getLocation()); - Path tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + Path tmpPath = replicaWarehousePartitionLocation; + + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType = + event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + if (event.replicationSpec().isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + } + Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), sourceWarehousePartitionLocation, tmpPath, context.hiveConf ); - Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath); + Task movePartitionTask = movePartitionTask(table, partSpec, tmpPath, loadFileType); // Set Checkpoint task as dependant to add partition tasks. So, if same dump is retried for // bootstrap, we skip current partition update. @@ -257,7 +269,8 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti /** * This will create the move of partition data from temp path to actual path */ - private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) { + private Task movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath, + LoadFileType loadFileType) { MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( @@ -268,7 +281,7 @@ private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartiti } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + loadFileType, 0L ); loadTableWork.setInheritTableSpecs(false); moveWork.setLoadTableWork(loadTableWork); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 089b529b7d..f1f020d457 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.TreeMap; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; public class LoadTable { @@ -218,7 +219,22 @@ private String location(ImportTableDesc tblDesc, Database parentDb) private Task loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, Path fromURI) { Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); - Path tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); + Path tmpPath = tgtPath; + + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType = + replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + if (replicationSpec.isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); + } + + LOG.debug("adding dependent CopyWork/AddPart/MoveWork for table " + + table.getCompleteName() + " with source location: " + + dataPath.toString() + " and target location " + tmpPath.toString()); + Task copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); @@ -232,7 +248,7 @@ private String location(ImportTableDesc tblDesc, Database parentDb) } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L + loadFileType, 0L ); moveWork.setLoadTableWork(loadTableWork); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 0b1048c589..954cf4f9a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1886,7 +1886,7 @@ public Partition loadPartition(Path loadPath, Table tbl, Map par // to ACID updates. So the are not themselves ACID. // Note: this assumes both paths are qualified; which they are, currently. - if ((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath)) { + if ((isMmTableWrite || isFullAcidTable) && loadPath.equals(newPartPath) || loadFileType == LoadFileType.IGNORE) { // MM insert query, move itself is a no-op. if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)"); @@ -2497,7 +2497,7 @@ public void loadTable(Path loadPath, String tableName, LoadFileType loadFileType } // Note: this assumes both paths are qualified; which they are, currently. - if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) { + if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath()) || loadFileType == LoadFileType.IGNORE) { /** * some operations on Transactional tables (e.g. Import) write directly to the final location * and avoid the 'move' operation. Since MoveTask does other things, setting 'loadPath' to be diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 6fbe29c5ec..fe711c6ae9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -78,6 +78,8 @@ import java.util.Map; import java.util.TreeMap; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; + /** * ImportSemanticAnalyzer. * @@ -395,24 +397,31 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path destPath = null, loadPath = null; LoadFileType lft; - if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { - String mmSubdir = replace ? AcidUtils.baseDir(writeId) - : AcidUtils.deltaSubdir(writeId, writeId, stmtId); - destPath = new Path(tgtPath, mmSubdir); - /** - * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition - * directory, i.e. the final destination for these files. This has to be a copy to preserve - * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. - * So setting 'loadPath' this way will make - * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, - * boolean, Long, int)} - * skip the unnecessary file (rename) operation but it will perform other things. - */ - loadPath = tgtPath; - lft = LoadFileType.KEEP_EXISTING; + + if (replicationSpec.isInReplicationScope() && + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + lft = LoadFileType.IGNORE; + destPath = loadPath = tgtPath; } else { - destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); - lft = replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { + String mmSubdir = replace ? AcidUtils.baseDir(writeId) + : AcidUtils.deltaSubdir(writeId, writeId, stmtId); + destPath = new Path(tgtPath, mmSubdir); + /** + * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition + * directory, i.e. the final destination for these files. This has to be a copy to preserve + * the archive. MoveTask is optimized to do a 'rename' if files are on the same FileSystem. + * So setting 'loadPath' this way will make + * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean, + * boolean, Long, int)} + * skip the unnecessary file (rename) operation but it will perform other things. + */ + loadPath = tgtPath; + lft = LoadFileType.KEEP_EXISTING; + } else { + destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath); + lft = replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + } } @@ -496,7 +505,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); } - private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, + private static Task addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { @@ -516,11 +525,21 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - //Replication scope the write id will be invalid - Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || - replicationSpec.isInReplicationScope(); - Path destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) - : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + + LoadFileType loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + Path destPath; + if (replicationSpec.isInReplicationScope() && + x.getCtx().getConf().getBoolean(REPL_ENABLE_MOVE_OPTIMIZATION.varname, false)) { + loadFileType = LoadFileType.IGNORE; + destPath = tgtLocation; + } else { + //Replication scope the write id will be invalid + Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) || + replicationSpec.isInReplicationScope(); + destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation) + : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId)); + } + Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation; if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("adding import work for partition with source location: " @@ -558,7 +577,7 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, } else { LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), partSpec.getPartSpec(), - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, + loadFileType, writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index e4186c45a8..e4a128182c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -40,12 +41,15 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import java.io.FileNotFoundException; +import java.net.URI; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT; import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG; @@ -80,6 +84,8 @@ public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints"; + private static final List CLOUD_SCHEME_PREFIXES = Arrays.asList("s3a", "wasb"); + ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); this.db = super.db; @@ -216,6 +222,20 @@ private void analyzeReplDump(ASTNode ast) throws SemanticException { } } + private boolean isCloudFS(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception { + if (filePath == null) { + throw new HiveException("filePath cannot be null"); + } + + URI uri = filePath.toUri(); + String scheme = uri.getScheme(); + scheme = StringUtils.isBlank(scheme) ? FileSystem.get(uri, conf).getScheme() : scheme; + if (StringUtils.isBlank(scheme)) { + throw new HiveException("Cannot get valid scheme for " + filePath); + } + return CLOUD_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()); + } + // REPL LOAD private void initReplLoad(ASTNode ast) throws SemanticException { path = PlanUtils.stripQuotes(ast.getChild(0).getText()); @@ -302,6 +322,18 @@ private void analyzeReplLoad(ASTNode ast) throws SemanticException { throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg()); } + // Ths config is set to make sure that in case of s3 replication, move is skipped. + try { + Warehouse wh = new Warehouse(conf); + Path filePath = wh.getWhRoot(); + if (isCloudFS(filePath, conf)) { + conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true); + LOG.info(" Set move optimization to true for warehouse " + filePath.toString()); + } + } catch (Exception e) { + throw new SemanticException(e.getMessage(), e); + } + // Now, the dumped path can be one of three things: // a) It can be a db dump, in which case we expect a set of dirs, each with a // db name, and with a _metadata file in each, and table dirs inside that. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index f32016725a..78480f22fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -65,7 +65,11 @@ * the file instead of making a duplicate copy. * If any file exist while copy, then just overwrite the file */ - OVERWRITE_EXISTING + OVERWRITE_EXISTING, + /** + * No need to rename the file, used in case of replication to s3 + */ + IGNORE } public LoadTableDesc(final LoadTableDesc o) { super(o.getSourcePath(), o.getWriteType()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index 3ff9f2fdf2..c14b751681 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -49,6 +49,8 @@ // If set to false, it'll behave as a traditional CopyTask. protected boolean readSrcAsFilesList = false; + protected boolean deleteDestIfExist = false; + private String distCpDoAsUser = null; public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { @@ -70,4 +72,12 @@ public void setDistCpDoAsUser(String distCpDoAsUser) { public String distCpDoAsUser() { return distCpDoAsUser; } + + public boolean getDeleteDestIfExist() { + return deleteDestIfExist; + } + + public void setDeleteDestIfExist(boolean deleteDestIfExist) { + this.deleteDestIfExist = deleteDestIfExist; + } }