diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index cf24247..30d7700 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -120,7 +120,7 @@ public void testTriggerSlowQueryElapsedTime() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -130,7 +130,7 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -140,7 +140,7 @@ public void testTriggerHighShuffleBytes() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -150,7 +150,7 @@ public void testTriggerHighBytesRead() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -160,7 +160,7 @@ public void testTriggerHighBytesWrite() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, trigger + " violated"); } @Test(timeout = 60000) @@ -170,7 +170,7 @@ public void testTriggerTotalTasks() throws Exception { setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated"); } @Test(timeout = 60000) @@ -301,7 +301,7 @@ public void testMultipleTriggers1() throws Exception { setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, execTimeTrigger + " violated"); } @Test(timeout = 60000) @@ -313,7 +313,7 @@ public void testMultipleTriggers2() throws Exception { setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + runQueryWithTrigger(query, null, shuffleTrigger + " violated"); } private void createSleepUDF() throws SQLException { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a8e3bd9..0f2193f 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -37,6 +37,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TGetQueryIdReq; import org.apache.hive.service.rpc.thrift.TOperationHandle; +import org.apache.hive.service.rpc.thrift.TOperationState; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -386,7 +387,12 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException { break; case CANCELED_STATE: // 01000 -> warning - throw new SQLException("Query was cancelled", "01000"); + String errMsg = statusResp.getErrorMessage(); + if (errMsg != null && !errMsg.isEmpty()) { + throw new SQLException("Query was cancelled. " + errMsg, "01000"); + } else { + throw new SQLException("Query was cancelled", "01000"); + } case TIMEDOUT_STATE: throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); case ERROR_STATE: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 87f6e1d..e33c7ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -3136,7 +3136,7 @@ private int abortTxns(Hive db, AbortTxnsDesc desc) throws HiveException { private int killQuery(Hive db, KillQueryDesc desc) throws HiveException { SessionState sessionState = SessionState.get(); for (String queryId : desc.getQueryIds()) { - sessionState.getKillQuery().killQuery(queryId); + sessionState.getKillQuery().killQuery(queryId, "User invoked KILL QUERY"); } LOG.info("kill query called ({})", desc.getQueryIds()); return 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 4c15d90..474fae9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -42,7 +42,7 @@ public void applyAction(final Map queriesViolat KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index b0378eb..33bccbe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -57,7 +57,10 @@ public void run() { String queryId = s.getTriggerContext().getQueryId(); LOG.info("Query {} violated trigger {}. Current counter value: {}. Going to apply action {}", queryId, t, currentCounterValue, t.getAction()); - violatedSessions.put(s, t.getAction()); + Trigger.Action action = t.getAction(); + String msg = "Trigger " + t + " violated. Current value: " + currentCounterValue; + action.setMsg(msg); + violatedSessions.put(s, action); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java index ce9147f..c46569b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java @@ -35,7 +35,7 @@ public void applyAction(final Map queriesViolat TezSessionState sessionState = entry.getKey(); String queryId = sessionState.getTriggerContext().getQueryId(); try { - sessionState.getKillQuery().killQuery(queryId); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java index 4bc81b3..a110ce7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -21,5 +21,5 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; public interface KillQuery { - void killQuery(String queryId) throws HiveException; + void killQuery(String queryId, String errMsg) throws HiveException; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java index eab8fbf..ea633ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -22,7 +22,7 @@ public class NullKillQuery implements KillQuery { @Override - public void killQuery(String queryId) throws HiveException { + public void killQuery(String queryId, String errMsg) throws HiveException { // Do nothing } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java index bed0ac1..6299f2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java @@ -27,6 +27,7 @@ MOVE_TO_POOL(""); String poolName; + String msg; Action(final String poolName) { this.poolName = poolName; @@ -40,6 +41,14 @@ public Action setPoolName(final String poolName) { public String getPoolName() { return poolName; } + + public String getMsg() { + return msg; + } + + public void setMsg(final String msg) { + this.msg = msg; + } } /** diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java index ae1ff5e..cda319a 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationState.java +++ b/service/src/java/org/apache/hive/service/cli/OperationState.java @@ -37,6 +37,7 @@ private final TOperationState tOperationState; private final boolean terminal; + private String errorMessage; OperationState(TOperationState tOperationState, boolean terminal) { this.tOperationState = tOperationState; @@ -109,4 +110,12 @@ public TOperationState toTOperationState() { public boolean isTerminal() { return terminal; } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(final String errorMessage) { + this.errorMessage = errorMessage; + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 8224bcc..257ab54 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.log.LogDivertAppender; import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest; import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -50,12 +49,6 @@ import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; -import org.apache.hive.service.server.KillQueryImpl; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.Configuration; -import org.apache.logging.log4j.core.config.LoggerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,10 +252,11 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) /** * Cancel the running operation unless it is already in a terminal state - * @param opHandle + * @param opHandle operation handle + * @param errMsg error message * @throws HiveSQLException */ - public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + public void cancelOperation(OperationHandle opHandle, String errMsg) throws HiveSQLException { Operation operation = getOperation(opHandle); OperationState opState = operation.getStatus().getState(); if (opState.isTerminal()) { @@ -270,13 +264,24 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { LOG.debug(opHandle + ": Operation is already aborted in state - " + opState); } else { LOG.debug(opHandle + ": Attempting to cancel from state - " + opState); - operation.cancel(OperationState.CANCELED); + OperationState operationState = OperationState.CANCELED; + operationState.setErrorMessage(errMsg); + operation.cancel(operationState); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } } } + /** + * Cancel the running operation unless it is already in a terminal state + * @param opHandle + * @throws HiveSQLException + */ + public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { + cancelOperation(opHandle, ""); + } + public void closeOperation(OperationHandle opHandle) throws HiveSQLException { LOG.info("Closing operation: " + opHandle); Operation operation = removeOperation(opHandle); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 7012a25..6354c8c 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -661,6 +661,7 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th OperationStatus operationStatus = cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate()); resp.setOperationState(operationStatus.getState().toTOperationState()); + resp.setErrorMessage(operationStatus.getState().getErrorMessage()); HiveSQLException opException = operationStatus.getOperationException(); resp.setTaskStatus(operationStatus.getTaskStatus()); resp.setOperationStarted(operationStatus.getOperationStarted()); diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java index 5412371..f302b4b 100644 --- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java +++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java @@ -37,14 +37,14 @@ public KillQueryImpl(OperationManager operationManager) { } @Override - public void killQuery(String queryId) throws HiveException { + public void killQuery(String queryId, String errMsg) throws HiveException { try { Operation operation = operationManager.getOperationByQueryId(queryId); if (operation == null) { LOG.info("Query not found: " + queryId); } else { OperationHandle handle = operation.getHandle(); - operationManager.cancelOperation(handle); + operationManager.cancelOperation(handle, errMsg); } } catch (HiveSQLException e) { throw new HiveException(e);