Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 1571905) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -22,10 +22,10 @@ import java.io.DataInput; import java.io.IOException; import java.io.Serializable; +import java.nio.channels.AsynchronousCloseException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -32,7 +32,6 @@ import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -128,6 +127,7 @@ private HiveConf conf; private DataInput resStream; private Context ctx; + private DriverContext driverCxt; private QueryPlan plan; private Schema schema; private HiveLockManager hiveLockMgr; @@ -138,9 +138,10 @@ // A limit on the number of threads that can be launched private int maxthreads; - private static final int SLEEP_TIME = 2000; - protected int tryCount = Integer.MAX_VALUE; + private int tryCount = Integer.MAX_VALUE; + private boolean destroyed; + private String userName; private boolean checkConcurrency() throws SemanticException { @@ -1312,14 +1313,13 @@ // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. - Queue> runnable = new ConcurrentLinkedQueue>(); - Map running = new HashMap(); - - DriverContext driverCxt = new DriverContext(runnable, ctx); + DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); ctx.setHDFSCleanup(true); + this.driverCxt = driverCxt; // for canceling the query (should be bound to session?) + SessionState.get().setLastMapRedStatsList(new ArrayList()); SessionState.get().setStackTraces(new HashMap>>()); SessionState.get().setLocalMapRedErrors(new HashMap>()); @@ -1335,27 +1335,32 @@ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); // Loop while you either have tasks running, or tasks queued up - while (running.size() != 0 || runnable.peek() != null) { + while (!destroyed && driverCxt.isRunning()) { + // Launch upto maxthreads tasks - while (runnable.peek() != null && running.size() < maxthreads) { - Task tsk = runnable.remove(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId()); - launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); + Task task; + while ((task = driverCxt.getRunnable(maxthreads)) != null) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); + TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); + if (!runner.isRunning()) { + break; + } } // poll the Tasks to see which one completed - TaskResult tskRes = pollTasks(running.keySet()); - TaskRunner tskRun = running.remove(tskRes); - Task tsk = tskRun.getTask(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId()); + TaskRunner tskRun = driverCxt.pollFinished(); + if (tskRun == null) { + continue; + } hookContext.addCompleteTask(tskRun); - int exitVal = tskRes.getExitVal(); + Task tsk = tskRun.getTask(); + TaskResult result = tskRun.getTaskResult(); + + int exitVal = result.getExitVal(); if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { - if (!running.isEmpty()) { - taskCleanup(running); - } + driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -1363,7 +1368,7 @@ } Task backupTask = tsk.getAndInitBackupTask(); if (backupTask != null) { - setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk); + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); console.printError(errorMessage); errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); console.printError(errorMessage); @@ -1384,12 +1389,10 @@ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); } - setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk); + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); SQLState = "08S01"; console.printError(errorMessage); - if (!running.isEmpty()) { - taskCleanup(running); - } + driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -1419,6 +1422,13 @@ // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); + if (driverCxt.isShutdown()) { + SQLState = "HY008"; + errorMessage = "FAILED: Operation cancelled"; + console.printError(errorMessage); + return 1000; + } + // remove incomplete outputs. // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions. // remove them @@ -1540,10 +1550,8 @@ * @param cxt * the driver context */ - - public void launchTask(Task tsk, String queryId, boolean noName, - Map running, String jobname, int jobs, DriverContext cxt) { - + private TaskRunner launchTask(Task tsk, String queryId, boolean noName, + String jobname, int jobs, DriverContext cxt) { if (SessionState.get() != null) { SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); } @@ -1560,8 +1568,7 @@ TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); - cxt.prepare(tskRun); - + cxt.launching(tskRun); // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { // Launch it in the parallel mode, as a separate thread only for MR tasks @@ -1569,60 +1576,17 @@ } else { tskRun.runSequential(); } - running.put(tskRes, tskRun); + return tskRun; } - /** - * Cleans up remaining tasks in case of failure - */ - public void taskCleanup(Map running) { - for (Map.Entry entry : running.entrySet()) { - if (entry.getKey().isRunning()) { - Task task = entry.getValue().getTask(); - try { - task.shutdown(); - } catch (Exception e) { - console.printError("Exception on shutting down task " + task.getId() + ": " + e); - } - } - } - running.clear(); - } - - /** - * Polls running tasks to see if a task has ended. - * - * @param results - * Set of result objects for running tasks - * @return The result object for any completed/failed task - */ - - public TaskResult pollTasks(Set results) { - Iterator resultIterator = results.iterator(); - while (true) { - while (resultIterator.hasNext()) { - TaskResult tskRes = resultIterator.next(); - if (!tskRes.isRunning()) { - return tskRes; - } - } - - // In this loop, nothing was found - // Sleep 10 seconds and restart - try { - Thread.sleep(SLEEP_TIME); - } catch (InterruptedException ie) { - // Do Nothing - } - resultIterator = results.iterator(); - } - } - public boolean isFetchingTable() { return plan != null && plan.getFetchTask() != null; } public boolean getResults(List res) throws IOException, CommandNeedRetryException { + if (destroyed) { + throw new AsynchronousCloseException(); + } if (isFetchingTable()) { FetchTask ft = plan.getFetchTask(); ft.setMaxRows(maxRows); @@ -1710,6 +1674,10 @@ } } } + if (driverCxt != null) { + driverCxt.shutdown(); + driverCxt = null; + } if (ctx != null) { ctx.clear(); } @@ -1730,6 +1698,10 @@ } public void destroy() { + if (destroyed) { + return; + } + destroyed = true; if (ctx != null) { releaseLocks(ctx.getHiveLocks()); } Index: ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (revision 1571905) +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java (working copy) @@ -32,11 +32,17 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Iterator; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.session.SessionState; + /** * DriverContext. * @@ -43,30 +49,106 @@ */ public class DriverContext { - Queue> runnable = new LinkedList>(); + private static final Log LOG = LogFactory.getLog(Driver.class.getName()); + private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); + private static final int SLEEP_TIME = 2000; + + private Queue> runnable; + private Queue running; + // how many jobs have been started - int curJobNo; + private int curJobNo; - Context ctx; + private Context ctx; + private volatile boolean shutdown; final Map statsTasks = new HashMap(1); public DriverContext() { - this.runnable = null; - this.ctx = null; } - public DriverContext(Queue> runnable, Context ctx) { - this.runnable = runnable; + public DriverContext(Context ctx) { + this.runnable = new ConcurrentLinkedQueue>(); + this.running = new LinkedBlockingQueue(); this.ctx = ctx; } - public Queue> getRunnable() { - return runnable; + public synchronized boolean isShutdown() { + return shutdown; } + public synchronized boolean isRunning() { + return !shutdown && (!running.isEmpty() || !runnable.isEmpty()); + } + + public synchronized void remove(Task task) { + runnable.remove(task); + } + + public synchronized void launching(TaskRunner runner) { + checkShutdown(); + running.add(runner); + } + + public synchronized Task getRunnable(int maxthreads) { + checkShutdown(); + if (runnable.peek() != null && running.size() < maxthreads) { + return runnable.remove(); + } + return null; + } + /** + * Polls running tasks to see if a task has ended. + * + * @return The result object for any completed/failed task + */ + public synchronized TaskRunner pollFinished() throws InterruptedException { + while (!shutdown) { + Iterator it = running.iterator(); + while (it.hasNext()) { + TaskRunner runner = it.next(); + if (runner != null && !runner.isRunning()) { + it.remove(); + return runner; + } + } + wait(SLEEP_TIME); + } + return null; + } + + private void checkShutdown() { + if (shutdown) { + throw new IllegalStateException("shutdown"); + } + } + /** + * Cleans up remaining tasks in case of failure + */ + public synchronized void shutdown() { + LOG.warn("Shutting down query " + ctx.getCmd()); + shutdown = true; + for (TaskRunner runner : running) { + if (runner.isRunning()) { + Task task = runner.getTask(); + LOG.warn("Shutting down task : " + task); + try { + task.shutdown(); + } catch (Exception e) { + console.printError("Exception on shutting down task " + task.getId() + ": " + e); + } + Thread thread = runner.getRunner(); + if (thread != null) { + thread.interrupt(); + } + } + } + running.clear(); + } + + /** * Checks if a task can be launched. * * @param tsk @@ -80,9 +162,14 @@ return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable(); } - public void addToRunnable(Task tsk) { + public synchronized boolean addToRunnable(Task tsk) { + if (runnable.contains(tsk)) { + return false; + } + checkShutdown(); runnable.add(tsk); tsk.setQueued(); + return true; } public int getCurJobNo() { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (revision 1571905) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (working copy) @@ -22,7 +22,6 @@ import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.plan.ConditionalResolver; @@ -83,7 +82,7 @@ for (Task tsk : getListTasks()) { if (!resTasks.contains(tsk)) { - driverContext.getRunnable().remove(tsk); + driverContext.remove(tsk); console.printInfo(tsk.getId() + " is filtered out by condition resolver."); if (tsk.isMapRedTask()) { driverContext.incCurJobNo(1); @@ -98,7 +97,7 @@ } } // resolved task - if (!driverContext.getRunnable().contains(tsk)) { + if (!driverContext.addToRunnable(tsk)) { console.printInfo(tsk.getId() + " is selected by condition resolver."); driverContext.addToRunnable(tsk); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (revision 1571905) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (working copy) @@ -28,6 +28,7 @@ **/ public class TaskRunner extends Thread { + protected Task tsk; protected TaskResult result; protected SessionState ss; @@ -39,6 +40,8 @@ } }; + protected Thread runner; + public TaskRunner(Task tsk, TaskResult result) { this.tsk = tsk; this.result = result; @@ -49,10 +52,27 @@ return tsk; } + public TaskResult getTaskResult() { + return result; + } + + public Thread getRunner() { + return runner; + } + + public boolean isRunning() { + return result.isRunning(); + } + @Override public void run() { - SessionState.start(ss); - runSequential(); + runner = Thread.currentThread(); + try { + SessionState.start(ss); + runSequential(); + } finally { + runner = null; + } } /** @@ -64,6 +84,9 @@ try { exitVal = tsk.executeTask(); } catch (Throwable t) { + if (tsk.getException() == null) { + tsk.setException(t); + } t.printStackTrace(); } result.setExitVal(exitVal, tsk.getException());