diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index 0a68f09..34eec2a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,17 +53,19 @@ import com.google.common.collect.Lists; public class TestTriggersTezSessionPoolManager { - private static MiniHS2 miniHS2 = null; - private static String dataFileDir; - private static Path kvDataFilePath; + protected static MiniHS2 miniHS2 = null; + protected static String dataFileDir; + protected static Path kvDataFilePath; private static String triggerTestTable = "testtab1"; + private static String testNoSleep = "select t1.under_col, t1.value from " + triggerTestTable + + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; private static String test5msSleepQuery = "select sleep(t1.under_col, 5), t1.value from " + triggerTestTable + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; private static String test500msSleepQuery = "select sleep(t1.under_col, 500), t1.value from " + triggerTestTable + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - private static HiveConf conf = null; - private Connection hs2Conn = null; + protected static HiveConf conf = null; + protected Connection hs2Conn = null; @BeforeClass public static void beforeTest() throws Exception { @@ -118,35 +121,40 @@ public static void afterTest() throws Exception { public void testTriggerSlowQueryElapsedTime() throws Exception { Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test500msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test500msSleepQuery, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test5msSleepQuery, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerHighShuffleBytes() throws Exception { Trigger trigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test5msSleepQuery, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerHighBytesRead() throws Exception { Trigger trigger = new ExecutionTrigger("big_read", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test5msSleepQuery, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerHighBytesWrite() throws Exception { Trigger trigger = new ExecutionTrigger("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test5msSleepQuery, null, "Query was cancelled"); } @Test(timeout = 60000) @@ -158,7 +166,29 @@ public void testTriggerTotalTasks() throws Exception { setCmds.add("set mapred.max.split.size=100"); setCmds.add("set tez.grouping.min-size=100"); setCmds.add("set tez.grouping.max-size=100"); - runQueryWithTrigger(test5msSleepQuery, setCmds, Lists.newArrayList(trigger), "Query was cancelled"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test5msSleepQuery, setCmds, "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomReadOps() throws Exception { + Trigger trigger = new ExecutionTrigger("high_read_ops", ExpressionFactory.fromString("HDFS_READ_OPS > 50"), + Trigger.Action.KILL_QUERY); + List setCmds = new ArrayList<>(); + setCmds.add("set mapred.min.split.size=100"); + setCmds.add("set mapred.max.split.size=100"); + setCmds.add("set tez.grouping.min-size=100"); + setCmds.add("set tez.grouping.max-size=100"); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(test5msSleepQuery, setCmds, "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomNonExistent() throws Exception { + Trigger trigger = new ExecutionTrigger("non_existent", ExpressionFactory.fromString("OPEN_FILES > 50"), + Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(testNoSleep, null, null); } @Test(timeout = 60000) @@ -168,8 +198,8 @@ public void testMultipleTriggers1() throws Exception { Trigger.Action.KILL_QUERY); Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), "Query was " + - "cancelled"); + setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); + runQueryWithTrigger(test5msSleepQuery, null, "Query was cancelled"); } @Test(timeout = 60000) @@ -180,8 +210,8 @@ public void testMultipleTriggers2() throws Exception { Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 100000"), Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), "Query was " + - "cancelled"); + setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); + runQueryWithTrigger(test5msSleepQuery, null, "Query was cancelled"); } private void createSleepUDF() throws SQLException { @@ -192,12 +222,9 @@ private void createSleepUDF() throws SQLException { stmt.close(); } - private void runQueryWithTrigger(final String query, final List setCmds, final List triggers, + protected void runQueryWithTrigger(final String query, final List setCmds, final String expect) throws Exception { - MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); - when(triggersFetcher.fetch()).thenReturn(triggers); - TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); Connection con = hs2Conn; TestJdbcWithMiniLlap.createTestTable(con, null, triggerTestTable, kvDataFilePath.toString()); @@ -222,9 +249,19 @@ private void runQueryWithTrigger(final String query, final List setCmds, queryThread.join(); selStmt.close(); - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); + if (expect == null) { + assertNull("Expected query to succeed", throwable[0]); + } else { + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + } + + protected void setupTriggers(final List triggers) throws Exception { + MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); + when(triggersFetcher.fetch()).thenReturn(triggers); + TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); } } \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 86f5972..8485ba6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -16,18 +16,11 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; import java.net.URL; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,35 +29,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; -import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; -import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Test; -import com.google.common.collect.Lists; - -public class TestTriggersWorkloadManager { - private static MiniHS2 miniHS2 = null; - private static String dataFileDir; - private static Path kvDataFilePath; - private static String triggerTestTable = "testtab1"; - private static String test5msSleepQuery = "select sleep(t1.under_col, 5), t1.value from " + triggerTestTable + - " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - private static String test500msSleepQuery = "select sleep(t1.under_col, 500), t1.value from " + triggerTestTable + - " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - - private static HiveConf conf = null; - private static TestWorkloadManager.WorkloadManagerForTest wm; - private Connection hs2Conn = null; +public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager { @BeforeClass public static void beforeTest() throws Exception { @@ -99,134 +70,11 @@ public static void beforeTest() throws Exception { miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); } - @Before - public void setUp() throws Exception { - hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } - - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } - - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } - - @Test(timeout = 60000) - public void testTriggerSlowQueryElapsedTime() throws Exception { - Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test500msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerSlowQueryExecutionTime() throws Exception { - Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerHighShuffleBytes() throws Exception { - Trigger trigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerHighBytesRead() throws Exception { - Trigger trigger = new ExecutionTrigger("big_read", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerHighBytesWrite() throws Exception { - Trigger trigger = new ExecutionTrigger("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerTotalTasks() throws Exception { - Trigger trigger = new ExecutionTrigger("highly_parallel", ExpressionFactory.fromString("TOTAL_TASKS > 50"), - Trigger.Action.KILL_QUERY); - List setCmds = new ArrayList<>(); - setCmds.add("set mapred.min.split.size=100"); - setCmds.add("set mapred.max.split.size=100"); - setCmds.add("set tez.grouping.min-size=100"); - setCmds.add("set tez.grouping.max-size=100"); - runQueryWithTrigger(test5msSleepQuery, setCmds, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testMultipleTriggers1() throws Exception { - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", - ExpressionFactory.fromString("HDFS_BYTES_READ > 100000"), - Trigger.Action.KILL_QUERY); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), - "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testMultipleTriggers2() throws Exception { - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), - Trigger.Action.KILL_QUERY); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", - ExpressionFactory.fromString("EXECUTION_TIME > 100000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), - "Query was cancelled"); - } - - private void createSleepUDF() throws SQLException { - String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); - Connection con = hs2Conn; - Statement stmt = con.createStatement(); - stmt.execute("create temporary function sleep as '" + udfName + "'"); - stmt.close(); - } - - private void runQueryWithTrigger(final String query, final List setCmds, final List triggers, - final String expect) throws Exception { + @Override + protected void setupTriggers(final List triggers) throws Exception { WorkloadManager wm = WorkloadManager.getInstance(); WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState()); when(poolState.getTriggers()).thenReturn(triggers); wm.getPools().put("llap", poolState); - - Connection con = hs2Conn; - TestJdbcWithMiniLlap.createTestTable(con, null, triggerTestTable, kvDataFilePath.toString()); - createSleepUDF(); - - final Statement selStmt = con.createStatement(); - final Throwable[] throwable = new Throwable[1]; - Thread queryThread = new Thread(() -> { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } - } - selStmt.executeQuery(query); - } catch (SQLException e) { - throwable[0] = e; - } - }); - queryThread.start(); - - queryThread.join(); - selStmt.close(); - - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java new file mode 100644 index 0000000..ad1f7fc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Custom counters with limits (this will only work if the execution engine exposes this counter) + */ +public class CustomCounterLimit implements CounterLimit { + + private String counterName; + private long limit; + + public CustomCounterLimit(final String counterName, final long limit) { + this.counterName = counterName; + this.limit = limit; + } + + @Override + public String getName() { + return counterName; + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new CustomCounterLimit(counterName, limit); + } + + @Override + public String toString() { + return "counter: " + counterName + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * counterName.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof CustomCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + CustomCounterLimit otherVcl = (CustomCounterLimit) other; + return counterName.equalsIgnoreCase(otherVcl.counterName) && limit == otherVcl.limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java index f16125d..29f7c89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -95,8 +95,18 @@ public static Expression fromString(final String expression) { return createExpression(vertexCounterLimit); } } - // unable to create expression at this point, invalid expression - throw new IllegalArgumentException("Invalid expression! " + expression); + + // if nothing matches, try creating a custom counter + try { + counterValue = getCounterValue(counterValueStr, null); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + CustomCounterLimit customCounterLimit = new CustomCounterLimit(counterName, counterValue); + return createExpression(customCounterLimit); } private static long getCounterValue(final String counterValueStr, final Validator validator) throws diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java index ce1dc6e..cd78545 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -80,6 +80,12 @@ public void testSimpleQueryTrigger() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); + + expression = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + trigger = new ExecutionTrigger("write_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(100000)); } @Test @@ -166,6 +172,12 @@ public void testExpressionFromString() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertEquals(expected, expression); assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_WRITE_OPS > 10000"); + expected = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); } @Test