diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index b2bb443..1691f44 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -45,6 +45,16 @@ public void testTriggerSlowQueryElapsedTime() throws Exception { } @Test(timeout = 60000) + public void testTriggerShortQueryElapsedTime() throws Exception { + Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 100"); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, trigger + " violated"); + } + + @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 9ccaa1f..670184b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.ql.wm.WmContext; @@ -48,8 +49,9 @@ public void run() { for (TezSessionState sessionState : sessions) { WmContext wmContext = sessionState.getWmContext(); if (wmContext != null && !wmContext.isQueryCompleted() - && !wmContext.getCurrentCounters().isEmpty()) { + && !wmContext.getSubscribedCounters().isEmpty()) { Map currentCounters = wmContext.getCurrentCounters(); + wmContext.updateElapsedTimeCounter(); for (Trigger currentTrigger : triggers) { String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 166ecfc..98048c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -360,11 +360,6 @@ public int monitorExecution() { // Time based counters. If DAG is done already don't update these counters. if (!done) { - counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(); - if (desiredCounters.contains(counterName)) { - updatedCounters.put(counterName, context.getWmContext().getElapsedTime()); - } - counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(); if (desiredCounters.contains(counterName) && executionStartTime > 0) { updatedCounters.put(counterName, System.currentTimeMillis() - executionStartTime); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java index d09bf89..b896ddc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/WmContext.java @@ -236,4 +236,10 @@ public void shortPrint(final SessionState.LogHelper console) throws ExecutionExc " Cluster %: " + WmContext.DECIMAL_FORMAT.format(wmEvent.getWmTezSessionInfo().getClusterPercent())); } } + + public void updateElapsedTimeCounter() { + if (subscribedCounters.contains(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name())) { + currentCounters.put(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(), getElapsedTime()); + } + } }