diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 2af728fdc5..3dee0d23d6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -232,8 +232,8 @@ public void testMultipleStagesOfReplicationLoadTask() throws Throwable { .run("create table t1 (id int)") .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('bangalore')") - .run("insert into table t2 partition(country='india') values ('mumbai')") - .run("insert into table t2 partition(country='india') values ('delhi')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") .run("create table t3 (rank int)") .dump(primaryDbName, null); @@ -244,7 +244,8 @@ public void testMultipleStagesOfReplicationLoadTask() throws Throwable { .run("show tables") .verifyResults(new String[] { "t1", "t2", "t3" }) .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId); - + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 order by country") + .verifyResults(new String[] { "france", "india", "us" }); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c944a13c67..e65ece10ff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -27,9 +27,9 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; -import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -44,12 +44,17 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; @@ -173,21 +178,29 @@ private ReplicationState initialReplicationState() throws SemanticException { private TaskTracker forNewTable() throws Exception { Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { - AddPartitionDesc addPartitionDesc = iterator.next(); - tracker.addTask(addSinglePartition(table, addPartitionDesc)); - if (iterator.hasNext() && !tracker.canAddMoreTasks()) { - ReplicationState currentReplicationState = - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); - updateReplicationState(currentReplicationState); - } + AddPartitionDesc currentPartitionDesc = iterator.next(); + /* + the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the + current retrieved lastReplicatedPartition + */ + addPartition(iterator.hasNext(), currentPartitionDesc); } return tracker; } + private void addPartition(boolean hasMorePartitions, AddPartitionDesc addPartitionDesc) throws Exception { + tracker.addTask(tasksForAddPartition(table, addPartitionDesc)); + if (hasMorePartitions && !tracker.canAddMoreTasks()) { + ReplicationState currentReplicationState = + new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); + updateReplicationState(currentReplicationState); + } + } + /** * returns the root task for adding a partition */ - private Task addSinglePartition(Table table, + private Task tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); @@ -262,50 +275,48 @@ private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartiti private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception { boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); + Map lastReplicatedPartSpec = null; + if (!encounteredTheLastReplicatedPartition) { + lastReplicatedPartSpec = lastPartitionReplicated.getPartition(0).getPartSpec(); + LOG.info("Start processing from partition info spec : {}", + StringUtils.mapToString(lastReplicatedPartSpec)); + } + ReplicationSpec replicationSpec = event.replicationSpec(); - LOG.debug("table partitioned"); + Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); + while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { + AddPartitionDesc addPartitionDesc = partitionIterator.next(); + Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); + encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); + } - Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); - while (iterator.hasNext()) { - /* - encounteredTheLastReplicatedPartition will be set, when we break creation of partition tasks - for a table, as we have reached the limit of number of tasks we should create for execution. - in this case on the next run we have to iterate over the partitions desc to reach the last replicated - partition so that we can start replicating partitions after that. - */ - AddPartitionDesc addPartitionDesc = iterator.next(); - if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) { - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Partition ptn; - - if ((ptn = context.hiveDb.getPartition(table, partSpec, false)) == null) { - if (!replicationSpec.isMetadataOnly()) { - forNewTable(); - } - } else { - // If replicating, then the partition already existing means we need to replace, maybe, if - // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn.getParameters())) { - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (iterator.hasNext() && !tracker.canAddMoreTasks()) { - tracker.setReplicationState( - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc) - ) - ); - } - } else { - forNewTable(); + while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { + AddPartitionDesc addPartitionDesc = partitionIterator.next(); + Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Partition ptn = context.hiveDb.getPartition(table, partSpec, false); + if (ptn == null) { + if (!replicationSpec.isMetadataOnly()) { + addPartition(partitionIterator.hasNext(), addPartitionDesc); + } + } else { + // If replicating, then the partition already existing means we need to replace, maybe, if + // the destination ptn's repl.last.id is older than the replacement's. + if (replicationSpec.allowReplacementInto(ptn.getParameters())) { + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); + if (!tracker.canAddMoreTasks()) { + tracker.setReplicationState( + new ReplicationState( + new PartitionState(table.getTableName(), addPartitionDesc) + ) + ); } } else { - // ignore this ptn, do nothing, not an error. + addPartition(partitionIterator.hasNext(), addPartitionDesc); } + } else { + // ignore this ptn, do nothing, not an error. } - } else { - Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Map lastReplicatedPartSpec = - lastPartitionReplicated.getPartition(0).getPartSpec(); - encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } } return tracker;