diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 62fc150..328663f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -22,17 +22,16 @@ import java.io.DataInput; import java.io.IOException; import java.io.Serializable; +import java.nio.channels.AsynchronousCloseException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -124,6 +123,7 @@ private HiveConf conf; private DataInput resStream; private Context ctx; + private DriverContext driverCxt; private QueryPlan plan; private Schema schema; private HiveLockManager hiveLockMgr; @@ -134,8 +134,9 @@ // A limit on the number of threads that can be launched private int maxthreads; - private static final int SLEEP_TIME = 2000; - protected int tryCount = Integer.MAX_VALUE; + private int tryCount = Integer.MAX_VALUE; + + private boolean destroyed; private String userName; @@ -1229,12 +1230,11 @@ public int execute() throws CommandNeedRetryException { // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. - Queue> runnable = new ConcurrentLinkedQueue>(); - Map running = new HashMap(); - - DriverContext driverCxt = new DriverContext(runnable, ctx); + DriverContext driverCxt = new DriverContext(ctx); ctx.setHDFSCleanup(true); + this.driverCxt = driverCxt; // for canceling the query (should be bound to session?) + SessionState.get().setLastMapRedStatsList(new ArrayList()); SessionState.get().setStackTraces(new HashMap>>()); SessionState.get().setLocalMapRedErrors(new HashMap>()); @@ -1250,27 +1250,32 @@ public int execute() throws CommandNeedRetryException { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); // Loop while you either have tasks running, or tasks queued up - while (running.size() != 0 || runnable.peek() != null) { + while (!destroyed && driverCxt.isRunning()) { + // Launch upto maxthreads tasks - while (runnable.peek() != null && running.size() < maxthreads) { - Task tsk = runnable.remove(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId()); - launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); + Task task; + while ((task = driverCxt.getRunnable(maxthreads)) != null) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); + TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); + if (!runner.isRunning()) { + break; + } } // poll the Tasks to see which one completed - TaskResult tskRes = pollTasks(running.keySet()); - TaskRunner tskRun = running.remove(tskRes); - Task tsk = tskRun.getTask(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId()); + TaskRunner tskRun = driverCxt.pollFinished(); + if (tskRun == null) { + continue; + } hookContext.addCompleteTask(tskRun); - int exitVal = tskRes.getExitVal(); + Task tsk = tskRun.getTask(); + TaskResult result = tskRun.getTaskResult(); + + int exitVal = result.getExitVal(); if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { - if (!running.isEmpty()) { - taskCleanup(running); - } + driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -1278,7 +1283,7 @@ public int execute() throws CommandNeedRetryException { } Task backupTask = tsk.getAndInitBackupTask(); if (backupTask != null) { - setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk); + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); console.printError(errorMessage); errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); console.printError(errorMessage); @@ -1299,12 +1304,10 @@ public int execute() throws CommandNeedRetryException { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); } - setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk); + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); SQLState = "08S01"; console.printError(errorMessage); - if (!running.isEmpty()) { - taskCleanup(running); - } + driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -1332,6 +1335,13 @@ public int execute() throws CommandNeedRetryException { // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); + if (driverCxt.isShutdown()) { + SQLState = "HY008"; + errorMessage = "FAILED: Operation cancelled"; + console.printError(errorMessage); + return 1000; + } + // remove incomplete outputs. // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions. // remove them @@ -1453,10 +1463,8 @@ private void setErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task t * @param cxt * the driver context */ - - public void launchTask(Task tsk, String queryId, boolean noName, - Map running, String jobname, int jobs, DriverContext cxt) { - + private TaskRunner launchTask(Task tsk, String queryId, boolean noName, + String jobname, int jobs, DriverContext cxt) { if (SessionState.get() != null) { SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); } @@ -1473,6 +1481,7 @@ public void launchTask(Task tsk, String queryId, boolean TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); + cxt.launching(tskRun); // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { // Launch it in the parallel mode, as a separate thread only for MR tasks @@ -1480,53 +1489,7 @@ public void launchTask(Task tsk, String queryId, boolean } else { tskRun.runSequential(); } - running.put(tskRes, tskRun); - } - - /** - * Cleans up remaining tasks in case of failure - */ - public void taskCleanup(Map running) { - for (Map.Entry entry : running.entrySet()) { - if (entry.getKey().isRunning()) { - Task task = entry.getValue().getTask(); - try { - task.shutdown(); - } catch (Exception e) { - console.printError("Exception on shutting down task " + task.getId() + ": " + e); - } - } - } - running.clear(); - } - - /** - * Polls running tasks to see if a task has ended. - * - * @param results - * Set of result objects for running tasks - * @return The result object for any completed/failed task - */ - - public TaskResult pollTasks(Set results) { - Iterator resultIterator = results.iterator(); - while (true) { - while (resultIterator.hasNext()) { - TaskResult tskRes = resultIterator.next(); - if (!tskRes.isRunning()) { - return tskRes; - } - } - - // In this loop, nothing was found - // Sleep 10 seconds and restart - try { - Thread.sleep(SLEEP_TIME); - } catch (InterruptedException ie) { - // Do Nothing - } - resultIterator = results.iterator(); - } + return tskRun; } public boolean isFetchingTable() { @@ -1534,6 +1497,9 @@ public boolean isFetchingTable() { } public boolean getResults(List res) throws IOException, CommandNeedRetryException { + if (destroyed) { + throw new AsynchronousCloseException(); + } if (isFetchingTable()) { FetchTask ft = plan.getFetchTask(); ft.setMaxRows(maxRows); @@ -1621,6 +1587,10 @@ public int close() { } } } + if (driverCxt != null) { + driverCxt.shutdown(); + driverCxt = null; + } if (ctx != null) { ctx.clear(); } @@ -1641,6 +1611,10 @@ public int close() { } public void destroy() { + if (destroyed) { + return; + } + destroyed = true; if (ctx != null) { releaseLocks(ctx.getHiveLocks()); } diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index 1c84523..dd37e0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -19,11 +19,16 @@ package org.apache.hadoop.hive.ql; import java.io.Serializable; -import java.util.LinkedList; +import java.util.Iterator; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.hive.ql.exec.TaskRunner; +import org.apache.hadoop.hive.ql.session.SessionState; /** * DriverContext. @@ -31,27 +36,96 @@ */ public class DriverContext { - Queue> runnable = new LinkedList>(); + private static final Log LOG = LogFactory.getLog(Driver.class.getName()); + private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); + + private static final int SLEEP_TIME = 2000; + + private Queue> runnable; + private Queue running; // how many jobs have been started - int curJobNo; + private int curJobNo; - Context ctx; + private Context ctx; + private boolean shutdown; public DriverContext() { - this.runnable = null; - this.ctx = null; } - public DriverContext(Queue> runnable, Context ctx) { - this.runnable = runnable; + public DriverContext(Context ctx) { + this.runnable = new ConcurrentLinkedQueue>(); + this.running = new LinkedBlockingQueue(); this.ctx = ctx; } + public boolean isShutdown() { + return shutdown; + } + + public boolean isRunning() { + return !shutdown && (!running.isEmpty() || !runnable.isEmpty()); + } + public Queue> getRunnable() { return runnable; } + public void launching(TaskRunner runner) { + running.add(runner); + } + + public Task getRunnable(int maxthreads) { + if (runnable.peek() != null && running.size() < maxthreads) { + return runnable.remove(); + } + return null; + } + + /** + * Polls running tasks to see if a task has ended. + * + * @return The result object for any completed/failed task + */ + public TaskRunner pollFinished() throws InterruptedException { + while (!shutdown) { + Iterator it = running.iterator(); + while (it.hasNext()) { + TaskRunner runner = it.next(); + if (runner != null && !runner.isRunning()) { + it.remove(); + return runner; + } + } + Thread.sleep(SLEEP_TIME); + } + return null; + } + + /** + * Cleans up remaining tasks in case of failure + */ + public void shutdown() { + LOG.warn("Shutting down query " + ctx.getCmd()); + shutdown = true; + for (TaskRunner runner : running) { + if (runner.isRunning()) { + Task task = runner.getTask(); + LOG.warn("Shutting down task : " + task); + try { + task.shutdown(); + } catch (Exception e) { + console.printError("Exception on shutting down task " + task.getId() + ": " + e); + } + Thread thread = runner.getRunner(); + if (thread != null) { + thread.interrupt(); + } + } + } + running.clear(); + } + /** * Checks if a task can be launched. * @@ -82,5 +156,4 @@ public Context getCtx() { public void incCurJobNo(int amount) { this.curJobNo = this.curJobNo + amount; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index ead7b59..beec968 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -28,6 +28,7 @@ **/ public class TaskRunner extends Thread { + protected Task tsk; protected TaskResult result; protected SessionState ss; @@ -39,6 +40,8 @@ protected Long initialValue() { } }; + protected Thread runner; + public TaskRunner(Task tsk, TaskResult result) { this.tsk = tsk; this.result = result; @@ -49,10 +52,27 @@ public TaskRunner(Task tsk, TaskResult result) { return tsk; } + public TaskResult getTaskResult() { + return result; + } + + public Thread getRunner() { + return runner; + } + + public boolean isRunning() { + return result.isRunning(); + } + @Override public void run() { - SessionState.start(ss); - runSequential(); + runner = Thread.currentThread(); + try { + SessionState.start(ss); + runSequential(); + } finally { + runner = null; + } } /**