diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index a800046..a7c535c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -408,7 +408,7 @@ public int compile(String command, boolean resetTaskIds, boolean deferClose) { } if (isInterrupted()) { - return handleInterruption("at beginning of compilation."); //indicate if need clean resource + return handleInterruption("at beginning of compilation.", null, null); //indicate if need clean resource } if (ctx != null && ctx.getExplainAnalyze() != AnalyzeState.RUNNING) { @@ -454,7 +454,7 @@ public void run() { ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY); if (isInterrupted()) { - return handleInterruption("before parsing and analysing the query"); + return handleInterruption("before parsing and analysing the query", null, null); } if (ctx == null) { ctx = new Context(conf); @@ -522,7 +522,7 @@ public void run() { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE); if (isInterrupted()) { - return handleInterruption("after analyzing query."); + return handleInterruption("after analyzing query.", null, null); } // get the output schema @@ -573,7 +573,7 @@ public void run() { return 0; } catch (Exception e) { if (isInterrupted()) { - return handleInterruption("during query compilation: " + e.getMessage()); + return handleInterruption("during query compilation: " + e.getMessage(), null, null); } compileError = true; @@ -644,10 +644,18 @@ public void run() { } - private int handleInterruption(String msg) { + private int handleInterruption(String msg, HookContext hookContext, + PerfLogger perfLogger) { SQLState = "HY008"; //SQLState for cancel operation errorMessage = "FAILED: command has been interrupted: " + msg; console.printError(errorMessage); + if (hookContext != null) { + try { + invokeFailureHooks(perfLogger, hookContext, errorMessage, null); + } catch (Exception e) { + LOG.warn("Caught exception attempting to invoke Failure Hooks", e); + } + } return 1000; } @@ -1508,7 +1516,7 @@ else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { if (requiresLock()) { // a checkpoint to see if the thread is interrupted or not before an expensive operation if (isInterrupted()) { - ret = handleInterruption("at acquiring the lock."); + ret = handleInterruption("at acquiring the lock.", null, null); } else { ret = acquireLocksAndOpenTxn(startTxnImplicitly); } @@ -1798,7 +1806,7 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { // The main thread polls the TaskRunners to check if they have finished. if (isInterrupted()) { - return handleInterruption("before running tasks."); + return handleInterruption("before running tasks.", hookContext, perfLogger); } DriverContext driverCxt = new DriverContext(ctx); driverCxt.prepare(plan); @@ -1848,7 +1856,7 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { int exitVal = result.getExitVal(); if (isInterrupted()) { - return handleInterruption("when checking the execution result."); + return handleInterruption("when checking the execution result.", hookContext, perfLogger); } if (exitVal != 0) { if (tsk.ifRetryCmdWhenFail()) { @@ -1873,6 +1881,9 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { } else { setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk); + if (driverCxt.isShutdown()) { + errorMessage = "FAILED: Operation cancelled. " + errorMessage; + } invokeFailureHooks(perfLogger, hookContext, errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError()); SQLState = "08S01"; @@ -1961,7 +1972,7 @@ public int execute(boolean deferClose) throws CommandNeedRetryException { } catch (Throwable e) { executionError = true; if (isInterrupted()) { - return handleInterruption("during query execution: \n" + e.getMessage()); + return handleInterruption("during query execution: \n" + e.getMessage(), hookContext, perfLogger); } ctx.restoreOriginalTracker();