diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 70151eebb1..d47263db06 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -61,6 +61,7 @@ import java.util.Collections; import java.util.Map; + import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT; import static org.junit.Assert.assertEquals; @@ -2207,131 +2208,154 @@ public void testORCTableRegularCopyWithCopyOnTarget() throws Throwable { public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable { //Distcp copy List withClause = Arrays.asList( - "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", - "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", - "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", - "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", - "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" - + UserGroupInformation.getCurrentUser().getUserName() + "'"); + "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'", + "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'", + "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'", + "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='" + + UserGroupInformation.getCurrentUser().getUserName() + "'"); WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName) - .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") - .run("insert into t1 values (1)") - .run("insert into t1 values (11)") - .run("insert into t2 values (2)") - .run("insert into t2 values (22)") - .run("insert into t3 values (33)") - .run("insert into tpart1 partition(name='Tom') values(100)") - .run("insert into tpart1 partition(name='Jerry') values(101)") - .run("insert into tpart2 partition(name='Bob') values(200)") - .run("insert into tpart2 partition(name='Carl') values(201)") - .run("insert into text1 values ('ricky')") - .dump(primaryDbName, withClause); + .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart1(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE text1(a string) STORED AS TEXTFILE") + .run("insert into t1 values (1)") + .run("insert into t1 values (11)") + .run("insert into t2 values (2)") + .run("insert into t2 values (22)") + .run("insert into t3 values (33)") + .run("insert into tpart1 partition(name='Tom') values(100)") + .run("insert into tpart1 partition(name='Jerry') values(101)") + .run("insert into tpart2 partition(name='Bob') values(200)") + .run("insert into tpart2 partition(name='Carl') values(201)") + .run("insert into text1 values ('ricky')") + .dump(primaryDbName, withClause); replica.run("DROP TABLE t3"); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables") - .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) - .run("select * from " + replicatedDbName + ".t1") - .verifyResults(new String[] {"1", "11"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"100", "101"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Tom", "name=Jerry"}) - .run("select a from " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"200", "201"}) - .run("show partitions " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"name=Bob", "name=Carl"}) - .run("select a from " + replicatedDbName + ".text1") - .verifyResults(new String[]{"ricky"}); + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[]{"1", "11"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky"}); WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName) - .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + - " stored as orc TBLPROPERTIES ('transactional'='true')") - .run("insert into t1 values (111)") - .run("insert into t2 values (222)") - .run("insert into t4 values (4)") - .run("insert into tpart1 partition(name='Tom') values(102)") - .run("insert into tpart1 partition(name='Jerry') values(103)") - .run("insert into tpart2 partition(name='Bob') values(202)") - .run("insert into tpart2 partition(name='Carl') values(203)") - .run("insert into tpart3 partition(name='Tom3') values(300)") - .run("insert into tpart3 partition(name='Jerry3') values(301)") - .run("insert into tpart4 partition(name='Bob4') values(400)") - .run("insert into tpart4 partition(name='Carl4') values(401)") - .run("insert into text1 values ('martin')") - .dump(primaryDbName, withClause); + .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart3(a int) partitioned by (name string)" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" + + " stored as orc TBLPROPERTIES ('transactional'='true')") + .run("insert into t1 values (111)") + .run("insert into t2 values (222)") + .run("insert into t4 values (4)") + .run("insert into tpart1 partition(name='Tom') values(102)") + .run("insert into tpart1 partition(name='Jerry') values(103)") + .run("insert into tpart2 partition(name='Bob') values(202)") + .run("insert into tpart2 partition(name='Carl') values(203)") + .run("insert into tpart3 partition(name='Tom3') values(300)") + .run("insert into tpart3 partition(name='Jerry3') values(301)") + .run("insert into tpart4 partition(name='Bob4') values(400)") + .run("insert into tpart4 partition(name='Carl4') values(401)") + .run("insert into text1 values ('martin')") + .dump(primaryDbName, withClause); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables ") - .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) - .run("select * from " + replicatedDbName + ".t1") - .verifyResults(new String[] {"1", "11", "111"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22", "222"}) - .run("select * from " + replicatedDbName + ".t4") - .verifyResults(new String[]{"4"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"100", "101", "102", "103"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Tom", "name=Jerry"}) - .run("select a from " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"200", "201", "202", "203"}) - .run("show partitions " + replicatedDbName + ".tpart2") - .verifyResults(new String[]{"name=Bob", "name=Carl"}) - .run("select a from " + replicatedDbName + ".tpart3") - .verifyResults(new String[]{"300", "301"}) - .run("show partitions " + replicatedDbName + ".tpart3") - .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) - .run("select a from " + replicatedDbName + ".tpart4") - .verifyResults(new String[]{"400", "401"}) - .run("show partitions " + replicatedDbName + ".tpart4") - .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) - .run("select a from " + replicatedDbName + ".text1") - .verifyResults(new String[]{"ricky", "martin"}); + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t1") + .verifyResults(new String[]{"1", "11", "111"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222"}) + .run("select * from " + replicatedDbName + ".t4") + .verifyResults(new String[]{"4"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"100", "101", "102", "103"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Tom", "name=Jerry"}) + .run("select a from " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"200", "201", "202", "203"}) + .run("show partitions " + replicatedDbName + ".tpart2") + .verifyResults(new String[]{"name=Bob", "name=Carl"}) + .run("select a from " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"300", "301"}) + .run("show partitions " + replicatedDbName + ".tpart3") + .verifyResults(new String[]{"name=Tom3", "name=Jerry3"}) + .run("select a from " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"400", "401"}) + .run("show partitions " + replicatedDbName + ".tpart4") + .verifyResults(new String[]{"name=Bob4", "name=Carl4"}) + .run("select a from " + replicatedDbName + ".text1") + .verifyResults(new String[]{"ricky", "martin"}); incrementalDump = primary.run("use " + primaryDbName) - .run("insert into t4 values (44)") - .run("insert into t1 values (1111)") - .run("DROP TABLE t1") - .run("insert into t2 values (2222)") - .run("insert into tpart1 partition(name='Tom') values(104)") - .run("insert into tpart1 partition(name='Tom_del') values(1000)") - .run("insert into tpart1 partition(name='Harry') values(10001)") - .run("insert into tpart1 partition(name='Jerry') values(105)") - .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") - .run("DROP TABLE tpart2") - .dump(primaryDbName, withClause); + .run("insert into t4 values (44)") + .run("insert into t1 values (1111)") + .run("DROP TABLE t1") + .run("insert into t2 values (2222)") + .run("insert into tpart1 partition(name='Tom') values(104)") + .run("insert into tpart1 partition(name='Tom_del') values(1000)") + .run("insert into tpart1 partition(name='Harry') values(10001)") + .run("insert into tpart1 partition(name='Jerry') values(105)") + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')") + .run("DROP TABLE tpart2") + .dump(primaryDbName, withClause); replica.run("DROP TABLE t4") - .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); + .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')"); replica.load(replicatedDbName, primaryDbName, withClause) - .run("use " + replicatedDbName) - .run("show tables ") - .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) - .run("select * from " + replicatedDbName + ".t2") - .verifyResults(new String[]{"2", "22", "222", "2222"}) - .run("select a from " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) - .run("show partitions " + replicatedDbName + ".tpart1") - .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + .run("use " + replicatedDbName) + .run("show tables ") + .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"}) + .run("select * from " + replicatedDbName + ".t2") + .verifyResults(new String[]{"2", "22", "222", "2222"}) + .run("select a from " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"101", "103", "105", "1000", "10001"}) + .run("show partitions " + replicatedDbName + ".tpart1") + .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"}); + } + + public void testTableWithPartitionsInBatch() throws Throwable { + + List withClause = new ArrayList<>(); + withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE.varname + "'='" + 1 + "'"); + + primary.run("use " + primaryDbName) + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into t2 partition(country='india') values ('bangalore')") + .run("insert into t2 partition(country='france') values ('paris')") + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, withClause); + + replica.load(replicatedDbName, primaryDbName, withClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] { "t2" }) + .run("select distinct(country) from t2") + .verifyResults(new String[] { "india", "france", "australia" }) + .run("select place from t2") + .verifyResults(new String[] { "bangalore", "paris", "sydney" }) + .verifyReplTargetProperty(replicatedDbName); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java index c4b2dab439..d61c575173 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.ddl.table.partition.add; import java.io.Serializable; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,6 +106,12 @@ public void setLocation(String location) { return params; } + public void addPartParams(Map partParams) { + if (params != null) { + params.putAll(partParams); + } + } + @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getPartParamsForExplain() { return params.toString(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 48c5e737ba..5dabbbfec0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -58,6 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -175,7 +176,7 @@ private boolean isMetaDataOp() { * * @throws SemanticException */ - private void addConsolidatedPartitionDesc() throws Exception { + private void addConsolidatedPartitionDesc(Task ptnRootTask) throws Exception { //Load partitions equal to batch size at one go for metadata only and for external tables. int maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE); int currentPartitionCount = 0; @@ -205,7 +206,7 @@ private void addConsolidatedPartitionDesc() throws Exception { tableDesc.getTableName(), true, partitions); //don't need to add ckpt task separately. Added as part of add partition task - addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, null); + addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, ptnRootTask); if (partitions.size() > 0) { LOG.info("Added {} partitions", partitions.size()); } @@ -214,21 +215,8 @@ private void addConsolidatedPartitionDesc() throws Exception { } private TaskTracker forNewTable() throws Exception { - if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - // Place all partitions in single task to reduce load on HMS. - addConsolidatedPartitionDesc(); - return tracker; - } - - Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); - while (iterator.hasNext() && tracker.canAddMoreTasks()) { - AlterTableAddPartitionDesc currentPartitionDesc = iterator.next(); - /* - the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the - current retrieved lastReplicatedPartition - */ - addPartition(iterator.hasNext(), currentPartitionDesc, null); - } + // Place all partitions in single task to reduce load on HMS. + addConsolidatedPartitionDesc(null); return tracker; } @@ -243,7 +231,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc } /** - * returns the root task for adding a partition + * returns the root task for adding all partitions in a batch */ private Task tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task ptnRootTask) throws MetaException, HiveException { @@ -251,7 +239,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc), context.hiveConf ); - //checkpointing task already added as part of add batch of partition in case for metadata only and external tables + //checkpointing task already added as part of add batch of partition if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) { if (ptnRootTask == null) { ptnRootTask = addPartTask; @@ -261,66 +249,68 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc return ptnRootTask; } - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); - Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); - Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); - partSpec.setLocation(replicaWarehousePartitionLocation.toString()); - LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) + " with source location: " - + partSpec.getLocation()); - Task ckptTask = ReplUtils.getTableCheckpointTask( - tableDesc, - (HashMap)partSpec.getPartSpec(), - context.dumpDirectory, - context.hiveConf - ); - - Path stagingDir = replicaWarehousePartitionLocation; - // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. - LoadFileType loadFileType; - if (event.replicationSpec().isInReplicationScope() && - context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { - loadFileType = LoadFileType.IGNORE; - } else { - loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; - stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); - } - boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); - Task copyTask = ReplCopyTask.getLoadCopyTask( + //Add Copy task for all partitions + List> copyTaskList = new ArrayList<>(); + List> moveTaskList = new ArrayList<>(); + for (AlterTableAddPartitionDesc.PartitionDesc partSpec : addPartitionDesc.getPartitions()) { + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); + partSpec.setLocation(replicaWarehousePartitionLocation.toString()); + LOG.debug("adding dependent CopyWork for partition " + + partSpecToString(partSpec.getPartSpec()) + " with source location: " + + partSpec.getLocation()); + + Path stagingDir = replicaWarehousePartitionLocation; + // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. + LoadFileType loadFileType; + if (event.replicationSpec().isInReplicationScope() && + context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { + loadFileType = LoadFileType.IGNORE; + } else { + loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); + } + boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET); + Task copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), - new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)), + new Path(event.dataPath() + Path.SEPARATOR + Warehouse.makePartPath(partSpec.getPartSpec())), stagingDir, context.hiveConf, copyAtLoad, false - ); + ); + copyTaskList.add(copyTask); - Task movePartitionTask = null; - if (loadFileType != LoadFileType.IGNORE) { - // no need to create move task, if file is moved directly to target location. - movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); + Task movePartitionTask = null; + if (loadFileType != LoadFileType.IGNORE) { + // no need to create move task, if file is moved directly to target location. + movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType); + moveTaskList.add(movePartitionTask); + } + } + Iterator> copyTaskIterator = copyTaskList.iterator(); + Iterator> moveTaskIterator = moveTaskList.iterator(); + //Add the copy tasks. once copy tasks are done, add partition metadata + while (copyTaskIterator.hasNext()) { + if (ptnRootTask == null) { + ptnRootTask = copyTaskIterator.next(); + } else { + ptnRootTask.addDependentTask(copyTaskIterator.next()); + } } - if (ptnRootTask == null) { - ptnRootTask = copyTask; + ptnRootTask = addPartTask; } else { - ptnRootTask.addDependentTask(copyTask); + ptnRootTask.addDependentTask(addPartTask); } - // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is - // retried for bootstrap, we skip current partition update. - copyTask.addDependentTask(addPartTask); - if (movePartitionTask != null) { - addPartTask.addDependentTask(movePartitionTask); - movePartitionTask.addDependentTask(ckptTask); - } else { - addPartTask.addDependentTask(ckptTask); + //Add the move tasks after add partition metadata is done + while (moveTaskIterator.hasNext()) { + if (ptnRootTask == null) { + ptnRootTask = moveTaskIterator.next(); + } else { + ptnRootTask.addDependentTask(moveTaskIterator.next()); + } } - return ptnRootTask; - } - private String getPartitionName(Path partitionMetadataFullPath) { - //Get partition name by removing the metadata base path. - //Needed for getting the data path - return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length()); + return ptnRootTask; } /** @@ -407,25 +397,36 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep Map currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } - + Task ptnRootTask = null; while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); - Task ptnRootTask = null; + AlterTableAddPartitionDesc.PartitionDesc src = addPartitionDesc.getPartitions().get(0); + //Add check point task as part of add partition + Map partParams = new HashMap<>(); + partParams.put(REPL_CHECKPOINT_KEY, context.dumpDirectory); + Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, src); + src.setLocation(replicaWarehousePartitionLocation.toString()); + src.addPartParams(partParams); + Map partSpec = src.getPartSpec(); + ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); switch (loadPtnType) { case LOAD_NEW: break; case LOAD_REPLACE: - ptnRootTask = dropPartitionTask(table, partSpec); + if (ptnRootTask == null) { + ptnRootTask = dropPartitionTask(table, partSpec); + } else { + ptnRootTask.addDependentTask(dropPartitionTask(table, partSpec)); + } break; case LOAD_SKIP: continue; default: break; } - addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask); } + addConsolidatedPartitionDesc(ptnRootTask); return tracker; }