diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 2af728fdc5..3dee0d23d6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -232,8 +232,8 @@ public void testMultipleStagesOfReplicationLoadTask() throws Throwable { .run("create table t1 (id int)") .run("create table t2 (place string) partitioned by (country string)") .run("insert into table t2 partition(country='india') values ('bangalore')") - .run("insert into table t2 partition(country='india') values ('mumbai')") - .run("insert into table t2 partition(country='india') values ('delhi')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") .run("create table t3 (rank int)") .dump(primaryDbName, null); @@ -244,7 +244,8 @@ public void testMultipleStagesOfReplicationLoadTask() throws Throwable { .run("show tables") .verifyResults(new String[] { "t1", "t2", "t3" }) .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId); - + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 order by country") + .verifyResults(new String[] { "france", "india", "us" }); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index f088ba9c69..ee01d6df5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -41,6 +41,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.datanucleus.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,19 +148,22 @@ private ReplicationState initialReplicationState() throws SemanticException { private TaskTracker forNewTable() throws Exception { Iterator iterator = event.partitionDescriptions(tableDesc).iterator(); while (iterator.hasNext() && tracker.canAddMoreTasks()) { - AddPartitionDesc addPartitionDesc = iterator.next(); - tracker.addTask(addSinglePartition(table, addPartitionDesc)); - ReplicationState currentReplicationState = - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); - updateReplicationState(currentReplicationState); + addPartition(iterator.next()); } return tracker; } + private void addPartition(AddPartitionDesc addPartitionDesc) throws Exception { + tracker.addTask(tasksForAddPartition(table, addPartitionDesc)); + ReplicationState currentReplicationState = + new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); + updateReplicationState(currentReplicationState); + } + /** * returns the root task for adding a partition */ - private Task addSinglePartition(Table table, + private Task tasksForAddPartition(Table table, AddPartitionDesc addPartitionDesc) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); @@ -234,47 +238,48 @@ private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartiti private TaskTracker forExistingTable(AddPartitionDesc lastPartitionReplicated) throws Exception { boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); + Map lastReplicatedPartSpec = null; + if (!encounteredTheLastReplicatedPartition) { + lastReplicatedPartSpec = lastPartitionReplicated.getPartition(0).getPartSpec(); + LOG.info("Start processing from partition info spec : {}", + StringUtils.mapToString(lastReplicatedPartSpec)); + } + ReplicationSpec replicationSpec = event.replicationSpec(); - LOG.debug("table partitioned"); - for (AddPartitionDesc addPartitionDesc : event.partitionDescriptions(tableDesc)) { - /* - encounteredTheLastReplicatedPartition will be set, when we break creation of partition tasks - for a table, as we have reached the limit of number of tasks we should create for execution. - in this case on the next run we have to iterate over the partitions desc to reach the last replicated - partition so that we can start replicating partitions after that. - */ - if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) { - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Partition ptn; + Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); + while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { + AddPartitionDesc addPartitionDesc = partitionIterator.next(); + Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); + encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); + } - if ((ptn = context.hiveDb.getPartition(table, partSpec, false)) == null) { - if (!replicationSpec.isMetadataOnly()) { - forNewTable(); - } - } else { - // If replicating, then the partition already existing means we need to replace, maybe, if - // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn.getParameters())) { - if (replicationSpec.isMetadataOnly()) { - tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (!tracker.canAddMoreTasks()) { - tracker.setReplicationState( - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc) - ) - ); - } - } else { - forNewTable(); + while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { + AddPartitionDesc addPartitionDesc = partitionIterator.next(); + Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Partition ptn = context.hiveDb.getPartition(table, partSpec, false); + if (ptn == null) { + if (!replicationSpec.isMetadataOnly()) { + addPartition(addPartitionDesc); + } + } else { + // If replicating, then the partition already existing means we need to replace, maybe, if + // the destination ptn's repl.last.id is older than the replacement's. + if (replicationSpec.allowReplacementInto(ptn.getParameters())) { + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); + if (!tracker.canAddMoreTasks()) { + tracker.setReplicationState( + new ReplicationState( + new PartitionState(table.getTableName(), addPartitionDesc) + ) + ); } } else { - // ignore this ptn, do nothing, not an error. + addPartition(addPartitionDesc); } + } else { + // ignore this ptn, do nothing, not an error. } - } else { - Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); - Map lastReplicatedPartSpec = - lastPartitionReplicated.getPartition(0).getPartSpec(); - encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } } return tracker;