diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index aeafe856b5..01f8cda2ff 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -293,7 +293,7 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro } WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable { - run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "' with ('hive.exec.parallel'='true')"); return this; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 3a20130820..255c65aa73 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2692,7 +2692,7 @@ private TaskRunner launchTask(Task tsk, String queryId, console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); } tsk.initialize(queryState, plan, cxt, ctx.getOpContext()); - TaskRunner tskRun = new TaskRunner(tsk); + TaskRunner tskRun = new TaskRunner(tsk, cxt); cxt.launching(tskRun); // Launch Task diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index 2dd83fbbc3..1938e5c672 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -27,6 +27,8 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.NodeUtils; @@ -65,6 +67,8 @@ final Map statsTasks = new HashMap<>(1); + private final Semaphore counter = new Semaphore(0); + public DriverContext() { } @@ -99,6 +103,10 @@ public synchronized void launching(TaskRunner runner) throws HiveException { return null; } + public void releaseRunnable() { + counter.release(); + } + /** * Polls running tasks to see if a task has ended. * @@ -106,6 +114,9 @@ public synchronized void launching(TaskRunner runner) throws HiveException { */ public synchronized TaskRunner pollFinished() throws InterruptedException { while (!shutdown) { + if (!counter.tryAcquire(SLEEP_TIME, TimeUnit.MILLISECONDS)) { + continue; + } Iterator it = running.iterator(); while (it.hasNext()) { TaskRunner runner = it.next(); @@ -114,7 +125,6 @@ public synchronized TaskRunner pollFinished() throws InterruptedException { return runner; } } - wait(SLEEP_TIME); } return null; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index 8904139114..6f6d721515 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; @@ -46,10 +47,13 @@ protected Long initialValue() { private static transient final Logger LOG = LoggerFactory.getLogger(TaskRunner.class); - public TaskRunner(Task tsk) { + private final DriverContext driverCtx; + + public TaskRunner(Task tsk, DriverContext ctx) { this.tsk = tsk; this.result = new TaskResult(); ss = SessionState.get(); + driverCtx = ctx; } public Task getTask() { @@ -102,6 +106,7 @@ public void runSequential() { LOG.error("Error in executeTask", t); } result.setExitVal(exitVal); + driverCtx.releaseRunnable(); if (tsk.getException() != null) { result.setTaskError(tsk.getException()); }