diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index c227a63..2391ec0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -118,7 +118,7 @@ public void testTriggerSlowQueryElapsedTime() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -128,7 +128,7 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -138,7 +138,7 @@ public void testTriggerHighShuffleBytes() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -148,7 +148,7 @@ public void testTriggerHighBytesRead() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -158,7 +158,7 @@ public void testTriggerHighBytesWrite() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -168,7 +168,7 @@ public void testTriggerTotalTasks() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated"); } @Test(timeout = 60000) @@ -180,7 +180,7 @@ public void testMultipleTriggers1() throws Exception { setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, execTimeTrigger + " violated"); } @Test(timeout = 60000) @@ -192,7 +192,7 @@ public void testMultipleTriggers2() throws Exception { setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, shuffleTrigger + " violated"); } private void createSleepUDF() throws SQLException { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a8e3bd9..d693d96 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -37,6 +37,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TGetQueryIdReq; import org.apache.hive.service.rpc.thrift.TOperationHandle; +import org.apache.hive.service.rpc.thrift.TOperationState; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -386,7 +387,12 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { break; case CANCELED_STATE: // 01000 -> warning - throw new SQLException("Query was cancelled", "01000"); + String reason = statusResp.getErrorMessage(); + if (reason != null && !reason.isEmpty()) { + throw new SQLException("Query was cancelled. Reason: " + reason, "01000"); + } else { + throw new SQLException("Query was cancelled", "01000"); + } case TIMEDOUT_STATE: throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); case ERROR_STATE: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 87f6e1d..e33c7ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3136,7 +3136,7 @@ private int abortTxns(Hive db, AbortTxnsDesc desc) throws HiveException { private int killQuery(Hive db, KillQueryDesc desc) throws HiveException { SessionState sessionState = SessionState.get(); for (String queryId : desc.getQueryIds()) { - sessionState.getKillQuery().killQuery(queryId); + sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY"); } LOG.info("kill query called ({})", desc.getQueryIds()); return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 4c15d90..474fae9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -42,7 +42,7 @@ public void applyAction(final Map queriesViolat KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 42cb3d8..9969f42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -52,10 +52,14 @@ public void run() { String desiredCounter = t.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check if (currentCounters.containsKey(desiredCounter)) { - if (t.apply(currentCounters.get(desiredCounter))) { + long currentValue = currentCounters.get(desiredCounter); + if (t.apply(currentValue)) { String queryId = s.getTriggerContext().getQueryId(); LOG.info("Query {} violated trigger {}. Going to apply action {}", queryId, t, t.getAction()); - violatedSessions.put(s, t.getAction()); + Trigger.Action action = t.getAction(); + String msg = "Trigger " + t + " violated. Current value: " + currentValue; + action.setMsg(msg); + violatedSessions.put(s, action); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java index ce9147f..c46569b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java @@ -35,7 +35,7 @@ public void applyAction(final Map queriesViolat TezSessionState sessionState = entry.getKey(); String queryId = sessionState.getTriggerContext().getQueryId(); try { - sessionState.getKillQuery().killQuery(queryId); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java index 4bc81b3..5f815c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -21,5 +21,5 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; public interface KillQuery { - void killQuery(String queryId) throws HiveException; + void killQuery(String queryId, String reason) throws HiveException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java index eab8fbf..af5f214 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -22,7 +22,7 @@ public class NullKillQuery implements KillQuery { @Override - public void killQuery(String queryId) throws HiveException { + public void killQuery(String queryId, String reason) throws HiveException { // Do nothing } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java index bed0ac1..6299f2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java @@ -27,6 +27,7 @@ MOVE_TO_POOL(""); String poolName; + String msg; Action(final String poolName) { this.poolName = poolName; @@ -40,6 +41,14 @@ public Action setPoolName(final String poolName) { public String getPoolName() { return poolName; } + + public String getMsg() { + return msg; + } + + public void setMsg(final String msg) { + this.msg = msg; + } } /** diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java index ae1ff5e..5c16f9e 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationState.java +++ b/service/src/java/org/apache/hive/service/cli/OperationState.java @@ -37,6 +37,7 @@ private final TOperationState tOperationState; private final boolean terminal; + private String reason; OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; @@ -109,4 +110,12 @@ public TOperationState toTOperationState() { public boolean isTerminal() { return terminal; } + + public String getReason() { + return reason; + } + + public void setReason(final String reason) { + this.reason = reason; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 21809f9..dcb0ec4 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -272,6 +272,10 @@ protected synchronized void cleanupOperationLog() { } } + public void cancel(OperationState stateAfterCancel, String reason) throws HiveSQLException { + cancel(stateAfterCancel); + } + public abstract void cancel(OperationState stateAfterCancel) throws HiveSQLException; public abstract void close() throws HiveSQLException; diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 8224bcc..f4cf2f8 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -259,10 +259,11 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) /** * Cancel the running operation unless it is already in a terminal state - * @param opHandle + * @param opHandle operation handle + * @param reason reason for query cancellation * @throws HiveSQLException */ - public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + public void cancelOperation(OperationHandle opHandle, String reason) throws HiveSQLException { Operation operation = getOperation(opHandle); OperationState opState = operation.getStatus().getState(); if (opState.isTerminal()) { @@ -270,13 +271,24 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { LOG.debug(opHandle + ": Operation is already aborted in state - " + opState); } else { LOG.debug(opHandle + ": Attempting to cancel from state - " + opState); - operation.cancel(OperationState.CANCELED); + OperationState operationState = OperationState.CANCELED; + operationState.setReason(reason); + operation.cancel(operationState); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } } } + /** + * Cancel the running operation unless it is already in a terminal state + * @param opHandle + * @throws HiveSQLException + */ + public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + cancelOperation(opHandle, ""); + } + public void closeOperation(OperationHandle opHandle) throws HiveSQLException { LOG.info("Closing operation: " + opHandle); Operation operation = removeOperation(opHandle); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 7012a25..6b73992 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -661,6 +661,7 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th OperationStatus operationStatus = cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate()); resp.setOperationState(operationStatus.getState().toTOperationState()); + resp.setErrorMessage(operationStatus.getState().getReason()); HiveSQLException opException = operationStatus.getOperationException(); resp.setTaskStatus(operationStatus.getTaskStatus()); resp.setOperationStarted(operationStatus.getOperationStarted()); diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 5412371..67b748e 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -37,14 +37,14 @@ public KillQueryImpl(OperationManager operationManager) { } @Override - public void killQuery(String queryId) throws HiveException { + public void killQuery(String queryId, String reason) throws HiveException { try { Operation operation = operationManager.getOperationByQueryId(queryId); if (operation == null) { LOG.info("Query not found: " + queryId); } else { OperationHandle handle = operation.getHandle(); - operationManager.cancelOperation(handle); + operationManager.cancelOperation(handle, reason); } } catch (HiveSQLException e) { throw new HiveException(e);