diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index b2bb443..1691f44 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -45,6 +45,16 @@ public void testTriggerSlowQueryElapsedTime() throws Exception { } @Test(timeout = 60000) + public void testTriggerShortQueryElapsedTime() throws Exception { + Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 100"); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, trigger + " violated"); + } + + @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 9ccaa1f..5187596 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.ql.wm.WmContext; @@ -48,8 +49,9 @@ public void run() { for (TezSessionState sessionState : sessions) { WmContext wmContext = sessionState.getWmContext(); if (wmContext != null && !wmContext.isQueryCompleted() - && !wmContext.getCurrentCounters().isEmpty()) { + && !wmContext.getSubscribedCounters().isEmpty()) { Map currentCounters = wmContext.getCurrentCounters(); + updateElapsedTimeCounter(wmContext); for (Trigger currentTrigger : triggers) { String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check @@ -101,4 +103,11 @@ public void run() { LOG.warn(TriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t); } } + + private void updateElapsedTimeCounter(final WmContext wmContext) { + Map currentCounters = wmContext.getCurrentCounters(); + if (wmContext.getSubscribedCounters().contains(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name())) { + currentCounters.put(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(), wmContext.getElapsedTime()); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 166ecfc..98048c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -360,11 +360,6 @@ public int monitorExecution() { // Time based counters. If DAG is done already don't update these counters. if (!done) { - counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(); - if (desiredCounters.contains(counterName)) { - updatedCounters.put(counterName, context.getWmContext().getElapsedTime()); - } - counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(); if (desiredCounters.contains(counterName) && executionStartTime > 0) { updatedCounters.put(counterName, System.currentTimeMillis() - executionStartTime);