diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 318e21a..deadeae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -19,6 +19,19 @@ package org.apache.hadoop.hive.ql; +import java.io.DataInput; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -67,13 +80,6 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import java.io.DataInput; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; - - public class Driver implements CommandProcessor { static final private String CLASS_NAME = Driver.class.getName(); @@ -88,6 +94,7 @@ private HiveConf conf; private DataInput resStream; private Context ctx; + private DriverContext driverCxt; private QueryPlan plan; private Schema schema; private String errorMessage; @@ -97,8 +104,9 @@ // A limit on the number of threads that can be launched private int maxthreads; - private static final int SLEEP_TIME = 2000; - protected int tryCount = Integer.MAX_VALUE; + private int tryCount = Integer.MAX_VALUE; + + private boolean destroyed; private String userName; @@ -1198,14 +1206,13 @@ public int execute() throws CommandNeedRetryException { // At any time, at most maxthreads tasks can be running // The main thread polls the TaskRunners to check if they have finished. - Queue> runnable = new ConcurrentLinkedQueue>(); - Map running = new HashMap(); - - DriverContext driverCxt = new DriverContext(runnable, ctx); + DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); ctx.setHDFSCleanup(true); + this.driverCxt = driverCxt; // for canceling the query (should be bound to session?) + SessionState.get().setLastMapRedStatsList(new ArrayList()); SessionState.get().setStackTraces(new HashMap>>()); SessionState.get().setLocalMapRedErrors(new HashMap>()); @@ -1221,27 +1228,32 @@ public int execute() throws CommandNeedRetryException { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TIME_TO_SUBMIT); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS); // Loop while you either have tasks running, or tasks queued up - while (running.size() != 0 || runnable.peek() != null) { + while (!destroyed && driverCxt.isRunning()) { + // Launch upto maxthreads tasks - while (runnable.peek() != null && running.size() < maxthreads) { - Task tsk = runnable.remove(); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId()); - launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt); + Task task; + while ((task = driverCxt.getRunnable(maxthreads)) != null) { + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TASK + task.getName() + "." + task.getId()); + TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); + if (!runner.isRunning()) { + break; + } } // poll the Tasks to see which one completed - TaskResult tskRes = pollTasks(running.keySet()); - TaskRunner tskRun = running.remove(tskRes); - Task tsk = tskRun.getTask(); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TASK + tsk.getName() + "." + tsk.getId()); + TaskRunner tskRun = driverCxt.pollFinished(); + if (tskRun == null) { + continue; + } hookContext.addCompleteTask(tskRun); - int exitVal = tskRes.getExitVal(); + Task tsk = tskRun.getTask(); + TaskResult result = tskRun.getTaskResult(); + + int exitVal = result.getExitVal(); if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { - if (!running.isEmpty()) { - taskCleanup(running); - } + driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -1249,7 +1261,7 @@ public int execute() throws CommandNeedRetryException { } Task backupTask = tsk.getAndInitBackupTask(); if (backupTask != null) { - setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk); + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); console.printError(errorMessage); errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName(); console.printError(errorMessage); @@ -1270,12 +1282,10 @@ public int execute() throws CommandNeedRetryException { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.FAILURE_HOOK + ofh.getClass().getName()); } - setErrorMsgAndDetail(exitVal, tskRes.getTaskError(), tsk); + setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); SQLState = "08S01"; console.printError(errorMessage); - if (!running.isEmpty()) { - taskCleanup(running); - } + driverCxt.shutdown(); // in case we decided to run everything in local mode, restore the // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); @@ -1305,6 +1315,13 @@ public int execute() throws CommandNeedRetryException { // the jobtracker setting to its initial value ctx.restoreOriginalTracker(); + if (driverCxt.isShutdown()) { + SQLState = "HY008"; + errorMessage = "FAILED: Operation cancelled"; + console.printError(errorMessage); + return 1000; + } + // remove incomplete outputs. // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions. // remove them @@ -1426,10 +1443,8 @@ private void setErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task t * @param cxt * the driver context */ - - public void launchTask(Task tsk, String queryId, boolean noName, - Map running, String jobname, int jobs, DriverContext cxt) { - + private TaskRunner launchTask(Task tsk, String queryId, boolean noName, + String jobname, int jobs, DriverContext cxt) throws HiveException { if (SessionState.get() != null) { SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName()); } @@ -1446,8 +1461,7 @@ public void launchTask(Task tsk, String queryId, boolean TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); - cxt.prepare(tskRun); - + cxt.launching(tskRun); // Launch Task if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { // Launch it in the parallel mode, as a separate thread only for MR tasks @@ -1455,53 +1469,7 @@ public void launchTask(Task tsk, String queryId, boolean } else { tskRun.runSequential(); } - running.put(tskRes, tskRun); - } - - /** - * Cleans up remaining tasks in case of failure - */ - public void taskCleanup(Map running) { - for (Map.Entry entry : running.entrySet()) { - if (entry.getKey().isRunning()) { - Task task = entry.getValue().getTask(); - try { - task.shutdown(); - } catch (Exception e) { - console.printError("Exception on shutting down task " + task.getId() + ": " + e); - } - } - } - running.clear(); - } - - /** - * Polls running tasks to see if a task has ended. - * - * @param results - * Set of result objects for running tasks - * @return The result object for any completed/failed task - */ - - public TaskResult pollTasks(Set results) { - Iterator resultIterator = results.iterator(); - while (true) { - while (resultIterator.hasNext()) { - TaskResult tskRes = resultIterator.next(); - if (!tskRes.isRunning()) { - return tskRes; - } - } - - // In this loop, nothing was found - // Sleep 10 seconds and restart - try { - Thread.sleep(SLEEP_TIME); - } catch (InterruptedException ie) { - // Do Nothing - } - resultIterator = results.iterator(); - } + return tskRun; } public boolean isFetchingTable() { @@ -1509,6 +1477,9 @@ public boolean isFetchingTable() { } public boolean getResults(List res) throws IOException, CommandNeedRetryException { + if (destroyed) { + throw new IOException("FAILED: Operation cancelled"); + } if (isFetchingTable()) { FetchTask ft = plan.getFetchTask(); ft.setMaxRows(maxRows); @@ -1596,6 +1567,10 @@ public int close() { } } } + if (driverCxt != null) { + driverCxt.shutdown(); + driverCxt = null; + } if (ctx != null) { ctx.clear(); } @@ -1616,6 +1591,10 @@ public int close() { } public void destroy() { + if (destroyed) { + return; + } + destroyed = true; if (ctx != null) { try { releaseLocks(ctx.getHiveLocks()); diff --git ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index c51a9c8..c64aa46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -26,16 +26,23 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Iterator; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.session.SessionState; /** * DriverContext. @@ -43,27 +50,103 @@ */ public class DriverContext { - Queue> runnable = new LinkedList>(); + private static final Log LOG = LogFactory.getLog(Driver.class.getName()); + private static final SessionState.LogHelper console = new SessionState.LogHelper(LOG); + + private static final int SLEEP_TIME = 2000; + + private Queue> runnable; + private Queue running; // how many jobs have been started - int curJobNo; + private int curJobNo; - Context ctx; + private Context ctx; + private boolean shutdown; final Map statsTasks = new HashMap(1); public DriverContext() { - this.runnable = null; - this.ctx = null; } - public DriverContext(Queue> runnable, Context ctx) { - this.runnable = runnable; + public DriverContext(Context ctx) { + this.runnable = new ConcurrentLinkedQueue>(); + this.running = new LinkedBlockingQueue(); this.ctx = ctx; } - public Queue> getRunnable() { - return runnable; + public synchronized boolean isShutdown() { + return shutdown; + } + + public synchronized boolean isRunning() { + return !shutdown && (!running.isEmpty() || !runnable.isEmpty()); + } + + public synchronized void remove(Task task) { + runnable.remove(task); + } + + public synchronized void launching(TaskRunner runner) throws HiveException { + checkShutdown(); + running.add(runner); + } + + public synchronized Task getRunnable(int maxthreads) throws HiveException { + checkShutdown(); + if (runnable.peek() != null && running.size() < maxthreads) { + return runnable.remove(); + } + return null; + } + + /** + * Polls running tasks to see if a task has ended. + * + * @return The result object for any completed/failed task + */ + public synchronized TaskRunner pollFinished() throws InterruptedException { + while (!shutdown) { + Iterator it = running.iterator(); + while (it.hasNext()) { + TaskRunner runner = it.next(); + if (runner != null && !runner.isRunning()) { + it.remove(); + return runner; + } + } + wait(SLEEP_TIME); + } + return null; + } + + private void checkShutdown() throws HiveException { + if (shutdown) { + throw new HiveException("FAILED: Operation cancelled"); + } + } + /** + * Cleans up remaining tasks in case of failure + */ + public synchronized void shutdown() { + LOG.warn("Shutting down query " + ctx.getCmd()); + shutdown = true; + for (TaskRunner runner : running) { + if (runner.isRunning()) { + Task task = runner.getTask(); + LOG.warn("Shutting down task : " + task); + try { + task.shutdown(); + } catch (Exception e) { + console.printError("Exception on shutting down task " + task.getId() + ": " + e); + } + Thread thread = runner.getRunner(); + if (thread != null) { + thread.interrupt(); + } + } + } + running.clear(); } /** @@ -80,9 +163,14 @@ public static boolean isLaunchable(Task tsk) { return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable(); } - public void addToRunnable(Task tsk) { + public synchronized boolean addToRunnable(Task tsk) throws HiveException { + if (runnable.contains(tsk)) { + return false; + } + checkShutdown(); runnable.add(tsk); tsk.setQueued(); + return true; } public int getCurJobNo() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java index 854cd52..031331e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java @@ -22,9 +22,9 @@ import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ConditionalResolver; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -81,9 +81,19 @@ public int execute(DriverContext driverContext) { resTasks = resolver.getTasks(conf, resolverCtx); resolved = true; + try { + resolveTask(driverContext); + } catch (Exception e) { + setException(e); + return 1; + } + return 0; + } + + private void resolveTask(DriverContext driverContext) throws HiveException { for (Task tsk : getListTasks()) { if (!resTasks.contains(tsk)) { - driverContext.getRunnable().remove(tsk); + driverContext.remove(tsk); console.printInfo(tsk.getId() + " is filtered out by condition resolver."); if (tsk.isMapRedTask()) { driverContext.incCurJobNo(1); @@ -98,13 +108,11 @@ public int execute(DriverContext driverContext) { } } // resolved task - if (!driverContext.getRunnable().contains(tsk)) { + if (driverContext.addToRunnable(tsk)) { console.printInfo(tsk.getId() + " is selected by condition resolver."); - driverContext.addToRunnable(tsk); } } } - return 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index ead7b59..99adf2e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -28,6 +28,7 @@ **/ public class TaskRunner extends Thread { + protected Task tsk; protected TaskResult result; protected SessionState ss; @@ -39,6 +40,8 @@ protected Long initialValue() { } }; + protected Thread runner; + public TaskRunner(Task tsk, TaskResult result) { this.tsk = tsk; this.result = result; @@ -49,10 +52,27 @@ public TaskRunner(Task tsk, TaskResult result) { return tsk; } + public TaskResult getTaskResult() { + return result; + } + + public Thread getRunner() { + return runner; + } + + public boolean isRunning() { + return result.isRunning(); + } + @Override public void run() { - SessionState.start(ss); - runSequential(); + runner = Thread.currentThread(); + try { + SessionState.start(ss); + runSequential(); + } finally { + runner = null; + } } /** @@ -64,6 +84,9 @@ public void runSequential() { try { exitVal = tsk.executeTask(); } catch (Throwable t) { + if (tsk.getException() == null) { + tsk.setException(t); + } t.printStackTrace(); } result.setExitVal(exitVal, tsk.getException());