diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 76ba975987..6b5fc545ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -329,11 +329,7 @@ public StageType getType() { private int executeIncrementalLoad(DriverContext driverContext) { try { IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder(); - this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG)); - if (work.getIncrementalIterator().hasNext()) { - // attach a load task at the tail of task list to start the next iteration. - createBuilderTask(this.childTasks); - } + this.childTasks = Collections.singletonList(load.build(driverContext, getHive(), LOG, work)); return 0; } catch (Exception e) { LOG.error("failed replication", e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 5e113c1241..2aefb574ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; @@ -84,7 +85,8 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP replLogger.startLog(); } - public Task build(DriverContext driverContext, Hive hive, Logger log) throws Exception { + public Task build(DriverContext driverContext, Hive hive, Logger log, + ReplLoadWork loadWork) throws Exception { Task evTaskRoot = TaskFactory.get(new DependencyCollectionWork()); Task taskChainTail = evTaskRoot; Long lastReplayedEvent = null; @@ -145,8 +147,10 @@ public IncrementalLoadTasksBuilder(String dbName, String tableName, String loadP lastReplayedEvent = eventDmd.getEventTo(); } - // If any event is there and db name is known, then dump the start and end logs - if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) { + if (iterator.hasNext()) { + // add load task to start the next iteration + taskChainTail.addDependentTask(TaskFactory.get(loadWork, conf)); + } else { Map dbProps = new HashMap<>(); dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent)); ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps);