diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 92f245652a..85c188a773 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -299,12 +299,12 @@ WarehouseInstance dumpFailure(String dbName, String lastReplicationId) throws Th WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Throwable { run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); printOutput(); - run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "' with ('hive.exec.parallel' = 'true')"); return this; } WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable { - run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "' with ('hive.exec.parallel' = 'true')"); return this; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 608cbd560c..65a325a16d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2684,7 +2684,7 @@ private TaskRunner launchTask(Task tsk, String queryId, cxt.launching(tskRun); // Launch Task - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { + if (HiveConf.getBoolVar(tsk.getConf(), HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { // Launch it in the parallel mode, as a separate thread only for MR tasks if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index ca4391fce0..0b8d9b407b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -848,4 +848,9 @@ public StageType getType() { public String getName() { return "MOVE"; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 11ef62c1c6..30732f9acf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -433,6 +433,10 @@ public void setConf(HiveConf conf) { this.conf = conf; } + public HiveConf getConf() { + return this.conf; + } + public void setWork(T work) { this.work = work; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index ae6411d4f1..73577f5a9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -383,11 +383,7 @@ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbNa } // Link import tasks to the barrier task which will in-turn linked with repl state update tasks - for (Task t : importTasks){ - t.addDependentTask(barrierTask); - log.debug("Added {}:{} as a precursor of barrier task {}:{}", - t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId()); - } + DAGTraversal.traverse(importTasks, new AddDependencyToLeaves(barrierTask)); // At least one task would have been added to update the repl state return tasks;