diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index 0a68f09..38f2c79 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -28,6 +29,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +40,7 @@ import org.apache.hadoop.hive.llap.LlapBaseInputFormat; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; @@ -52,27 +55,21 @@ import com.google.common.collect.Lists; public class TestTriggersTezSessionPoolManager { - private static MiniHS2 miniHS2 = null; - private static String dataFileDir; - private static Path kvDataFilePath; - private static String triggerTestTable = "testtab1"; - private static String test5msSleepQuery = "select sleep(t1.under_col, 5), t1.value from " + triggerTestTable + - " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - private static String test500msSleepQuery = "select sleep(t1.under_col, 500), t1.value from " + triggerTestTable + - " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - - private static HiveConf conf = null; - private Connection hs2Conn = null; + protected static MiniHS2 miniHS2 = null; + protected static String dataFileDir; + static Path kvDataFilePath; + private static String tableName = "testtab1"; + + protected static HiveConf conf = null; + protected Connection hs2Conn = null; @BeforeClass public static void beforeTest() throws Exception { Class.forName(MiniHS2.getJdbcDriverName()); String confDir = "../../data/conf/llap/"; - if (confDir != null && !confDir.isEmpty()) { - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); - } + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); @@ -91,7 +88,7 @@ public static void beforeTest() throws Exception { dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - Map confOverlay = new HashMap(); + Map confOverlay = new HashMap<>(); miniHS2.start(confOverlay); miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); } @@ -116,72 +113,166 @@ public static void afterTest() throws Exception { @Test(timeout = 60000) public void testTriggerSlowQueryElapsedTime() throws Exception { - Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test500msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 20000"); + Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { - Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerHighShuffleBytes() throws Exception { - Trigger trigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + Expression expression = ExpressionFactory.fromString("SHUFFLE_BYTES > 100"); + Trigger trigger = new ExecutionTrigger("big_shuffle", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerHighBytesRead() throws Exception { - Trigger trigger = new ExecutionTrigger("big_read", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + Expression expression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Trigger trigger = new ExecutionTrigger("big_read", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerHighBytesWrite() throws Exception { - Trigger trigger = new ExecutionTrigger("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + Expression expression = ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"); + Trigger trigger = new ExecutionTrigger("big_write", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } @Test(timeout = 60000) public void testTriggerTotalTasks() throws Exception { - Trigger trigger = new ExecutionTrigger("highly_parallel", ExpressionFactory.fromString("TOTAL_TASKS > 50"), - Trigger.Action.KILL_QUERY); - List setCmds = new ArrayList<>(); - setCmds.add("set mapred.min.split.size=100"); - setCmds.add("set mapred.max.split.size=100"); - setCmds.add("set tez.grouping.min-size=100"); - setCmds.add("set tez.grouping.max-size=100"); - runQueryWithTrigger(test5msSleepQuery, setCmds, Lists.newArrayList(trigger), "Query was cancelled"); + Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50"); + Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomReadOps() throws Exception { + Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomCreatedFiles() throws Exception { + List configs = getConfigs(); + + Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "create table testtab2 as select * from " + tableName; + runQueryWithTrigger(query, configs, "Query was cancelled"); + + runQueryWithTrigger("create table src3 (key int) partitioned by (value string)", null, null); + + // partitioned insert + expression = ExpressionFactory.fromString("CREATED_FILES > 10"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName + + " where under_col < 100"; + runQueryWithTrigger(query, configs, "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerCustomCreatedDynamicPartitions() throws Exception { + runQueryWithTrigger("create table src2 (key int) partitioned by (value string)", null, null); + List configs = getConfigs(); + + // query will get cancelled before creating 57 partitions + String query = + "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; + Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20"); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + + // let it create 57 partitions without any triggers + query = "insert overwrite table src2 partition (value) select under_col, value from " + tableName + + " where under_col < 100"; + setupTriggers(Lists.newArrayList()); + runQueryWithTrigger(query, configs, null); + + // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, "Query was cancelled"); + + // let it create 64 more partitions (total 57 + 64 = 121) without any triggers + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + setupTriggers(Lists.newArrayList()); + runQueryWithTrigger(query, configs, null); + + // re-run insert into but this time no new partitions will be created, so there will be no violation + query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; + expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10"); + trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + runQueryWithTrigger(query, configs, null); + } + + @Test(timeout = 60000) + public void testTriggerCustomNonExistent() throws Exception { + Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50"); + Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = + "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col"; + runQueryWithTrigger(query, null, null); } @Test(timeout = 60000) public void testMultipleTriggers1() throws Exception { - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > " + - "1000000"), - Trigger.Action.KILL_QUERY); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), "Query was " + - "cancelled"); + Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000"); + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); + Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } @Test(timeout = 60000) public void testMultipleTriggers2() throws Exception { - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > " + - "100"), - Trigger.Action.KILL_QUERY); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", - ExpressionFactory.fromString("EXECUTION_TIME > 100000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), "Query was " + - "cancelled"); + Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); + Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 100000"); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, "Query was cancelled"); } private void createSleepUDF() throws SQLException { @@ -192,15 +283,12 @@ private void createSleepUDF() throws SQLException { stmt.close(); } - private void runQueryWithTrigger(final String query, final List setCmds, final List triggers, + private void runQueryWithTrigger(final String query, final List setCmds, final String expect) throws Exception { - MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); - when(triggersFetcher.fetch()).thenReturn(triggers); - TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); Connection con = hs2Conn; - TestJdbcWithMiniLlap.createTestTable(con, null, triggerTestTable, kvDataFilePath.toString()); + TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); createSleepUDF(); final Statement selStmt = con.createStatement(); @@ -212,7 +300,7 @@ private void runQueryWithTrigger(final String query, final List setCmds, selStmt.execute(setCmd); } } - selStmt.executeQuery(query); + selStmt.execute(query); } catch (SQLException e) { throwable[0] = e; } @@ -222,9 +310,32 @@ private void runQueryWithTrigger(final String query, final List setCmds, queryThread.join(); selStmt.close(); - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); + if (expect == null) { + assertNull("Expected query to succeed", throwable[0]); + } else { + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + } + + protected void setupTriggers(final List triggers) throws Exception { + MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); + when(triggersFetcher.fetch()).thenReturn(triggers); + TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + } + + private List getConfigs(String... more) { + List setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); + setCmds.add("set mapred.min.split.size=100"); + setCmds.add("set mapred.max.split.size=100"); + setCmds.add("set tez.grouping.min-size=100"); + setCmds.add("set tez.grouping.max-size=100"); + if (more != null) { + setCmds.addAll(Arrays.asList(more)); + } + return setCmds; } } \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 86f5972..8485ba6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -16,18 +16,11 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; import java.net.URL; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,35 +29,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; -import org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; -import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; -import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Test; -import com.google.common.collect.Lists; - -public class TestTriggersWorkloadManager { - private static MiniHS2 miniHS2 = null; - private static String dataFileDir; - private static Path kvDataFilePath; - private static String triggerTestTable = "testtab1"; - private static String test5msSleepQuery = "select sleep(t1.under_col, 5), t1.value from " + triggerTestTable + - " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - private static String test500msSleepQuery = "select sleep(t1.under_col, 500), t1.value from " + triggerTestTable + - " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; - - private static HiveConf conf = null; - private static TestWorkloadManager.WorkloadManagerForTest wm; - private Connection hs2Conn = null; +public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager { @BeforeClass public static void beforeTest() throws Exception { @@ -99,134 +70,11 @@ public static void beforeTest() throws Exception { miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); } - @Before - public void setUp() throws Exception { - hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } - - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } - - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } - - @Test(timeout = 60000) - public void testTriggerSlowQueryElapsedTime() throws Exception { - Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test500msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerSlowQueryExecutionTime() throws Exception { - Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerHighShuffleBytes() throws Exception { - Trigger trigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerHighBytesRead() throws Exception { - Trigger trigger = new ExecutionTrigger("big_read", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerHighBytesWrite() throws Exception { - Trigger trigger = new ExecutionTrigger("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testTriggerTotalTasks() throws Exception { - Trigger trigger = new ExecutionTrigger("highly_parallel", ExpressionFactory.fromString("TOTAL_TASKS > 50"), - Trigger.Action.KILL_QUERY); - List setCmds = new ArrayList<>(); - setCmds.add("set mapred.min.split.size=100"); - setCmds.add("set mapred.max.split.size=100"); - setCmds.add("set tez.grouping.min-size=100"); - setCmds.add("set tez.grouping.max-size=100"); - runQueryWithTrigger(test5msSleepQuery, setCmds, Lists.newArrayList(trigger), "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testMultipleTriggers1() throws Exception { - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", - ExpressionFactory.fromString("HDFS_BYTES_READ > 100000"), - Trigger.Action.KILL_QUERY); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), - "Query was cancelled"); - } - - @Test(timeout = 60000) - public void testMultipleTriggers2() throws Exception { - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), - Trigger.Action.KILL_QUERY); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", - ExpressionFactory.fromString("EXECUTION_TIME > 100000"), - Trigger.Action.KILL_QUERY); - runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), - "Query was cancelled"); - } - - private void createSleepUDF() throws SQLException { - String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); - Connection con = hs2Conn; - Statement stmt = con.createStatement(); - stmt.execute("create temporary function sleep as '" + udfName + "'"); - stmt.close(); - } - - private void runQueryWithTrigger(final String query, final List setCmds, final List triggers, - final String expect) throws Exception { + @Override + protected void setupTriggers(final List triggers) throws Exception { WorkloadManager wm = WorkloadManager.getInstance(); WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState()); when(poolState.getTriggers()).thenReturn(triggers); wm.getPools().put("llap", poolState); - - Connection con = hs2Conn; - TestJdbcWithMiniLlap.createTestTable(con, null, triggerTestTable, kvDataFilePath.toString()); - createSleepUDF(); - - final Statement selStmt = con.createStatement(); - final Throwable[] throwable = new Throwable[1]; - Thread queryThread = new Thread(() -> { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } - } - selStmt.executeQuery(query); - } catch (SQLException e) { - throwable[0] = e; - } - }); - queryThread.start(); - - queryThread.join(); - selStmt.close(); - - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index bc265eb..804f854 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -125,6 +125,8 @@ protected transient long cntr = 1; protected transient long logEveryNRows = 0; protected transient int rowIndex = 0; + private transient Path destTablePath; + private transient boolean isInsertOverwrite; /** * Counters. */ @@ -150,6 +152,7 @@ RecordWriter[] outWriters; RecordUpdater[] updaters; Stat stat; + String dpDir; public FSPaths(Path specPath) { tmpPath = Utilities.toTempPath(specPath); @@ -378,7 +381,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); serializer.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); - + destTablePath = conf.getDestPath(); + isInsertOverwrite = conf.getInsertOverwrite(); if (LOG.isInfoEnabled()) { LOG.info("Using serializer : " + serializer + " and formatter : " + hiveOutputFormat + (isCompressed ? " with compression" : "")); @@ -613,6 +617,32 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]); } + // There are 2 cases where we increment CREATED_DYNAMIC_PARTITIONS counters + // 1) Insert overwrite + // 2) Insert into table which creates new partitions + if (bDynParts && fsp.dpDir != null) { + Path destPartPath = new Path(destTablePath, fsp.dpDir); + Path dpStagingDir = fsp.outPaths[filesIdx].getParent(); + try { + // for insert overwrite, dp directory gets atomically created in staging dir (which is typically inside + // destination table directory). If the dp dir in staging dir is not created yet, create it and increment the + // counter. Record writer will create the destination file inside the dp dir. + // If the dp dir already exists then we have accounted for that partition. + if (isInsertOverwrite && !fs.exists(dpStagingDir)) { + createStagingDpDir(fsp, filesIdx); + } + // for insert into case, check if dp dir already exists in the destination table, if exists then we are not + // creating new partition but adding to existing one. If dp dir does not exist, create one in staging dir and + // increment the counter. + if (!fs.exists(destPartPath) && !fs.exists(dpStagingDir)) { + createStagingDpDir(fsp, filesIdx); + } + } catch (IOException e) { + LOG.warn("Skipping to increment CREATED_DYNAMIC_PARTITIONS counter. destPartPath: {} dpStagingDir: {} " + + "Exception: {}", destPartPath, dpStagingDir, e.getMessage()); + } + } + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition @@ -631,9 +661,10 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(), acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1); } + if (reporter != null) { reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERCREATEDFILES, 1); + Operator.HIVE_COUNTER_CREATED_FILES, 1); } } catch (IOException e) { @@ -641,6 +672,14 @@ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) } } + private void createStagingDpDir(final FSPaths fsp, final int filesIdx) throws IOException { + fs.mkdirs(fsp.outPaths[filesIdx].getParent()); + if (reporter != null) { + reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP), + Operator.HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS, 1); + } + } + /** * Report status to JT so that JT won't kill this task if closing takes too long * due to too many files to close and the NN is overloaded. @@ -900,6 +939,9 @@ private FSPaths createNewPaths(String dirName) throws HiveException { fsp2.taskOutputTempPath = new Path(fsp2.taskOutputTempPath, dirName); } + if (bDynParts) { + fsp2.dpDir = dirName; + } if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) { createBucketFiles(fsp2); valToPaths.put(dirName, fsp2); @@ -973,6 +1015,8 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive if (fsp2 == null) { // check # of dp + // TODO: add an option to skip this if number of partitions checks is done by Triggers via + // CREATED_DYNAMIC_PARTITION counter if (valToPaths.size() > maxPartitions) { // we cannot proceed and need to tell the hive client that retries won't succeed either throw new HiveFatalException( @@ -1018,6 +1062,7 @@ protected FSPaths getDynOutPaths(List row, String lbDirName) throws Hive createBucketForFileIdx(fsp2, 0); valToPaths.put(pathKey, fsp2); } + } fp = fsp2; } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 73ddf86..391666a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -71,8 +71,9 @@ private static final long serialVersionUID = 1L; - public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES"; - public static final String HIVECOUNTERFATAL = "FATAL_ERROR"; + public static final String HIVE_COUNTER_CREATED_FILES = "CREATED_FILES"; + public static final String HIVE_COUNTER_CREATED_DYNAMIC_PARTITIONS = "CREATED_DYNAMIC_PARTITIONS"; + public static final String HIVE_COUNTER_FATAL = "FATAL_ERROR"; public static final String CONTEXT_NAME_KEY = "__hive.context.name"; private transient Configuration configuration; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 97df36e..88a75ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -208,7 +207,7 @@ public ExecDriver(MapredWork plan, JobConf job, boolean isSilent) throws HiveExc public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { Counters.Counter cntr = ctrs.findCounter( HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERFATAL); + Operator.HIVE_COUNTER_FATAL); return cntr != null && cntr.getValue() > 0; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 3c07197..2d2eafd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -209,7 +209,7 @@ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { } // check for number of created files Counters.Counter cntr = ctrs.findCounter(HiveConf.getVar(job, ConfVars.HIVECOUNTERGROUP), - Operator.HIVECOUNTERCREATEDFILES); + Operator.HIVE_COUNTER_CREATED_FILES); long numFiles = cntr != null ? cntr.getValue() : 0; long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES); if (numFiles > upperLimit) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 2ee8c93..0f5f708 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -352,7 +352,7 @@ private void printConfigInfo() throws IOException { Map> counters = new HashMap>(); List hiveCounters = new LinkedList(); counters.put(groupName, hiveCounters); - hiveCounters.add(Operator.HIVECOUNTERCREATEDFILES); + hiveCounters.add(Operator.HIVE_COUNTER_CREATED_FILES); // MapOperator is out of SparkWork, SparkMapRecordHandler use it to bridge // Spark transformation and Hive operators in SparkWork. for (MapOperator.Counter counter : MapOperator.Counter.values()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a5b69ec..bcb68ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7228,6 +7228,25 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx); } + boolean isInsertOverwrite = false; + switch (dest_type) { + case QBMetaData.DEST_PARTITION: + //fall through + case QBMetaData.DEST_TABLE: + //INSERT [OVERWRITE] path + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + isInsertOverwrite = true; + } + break; + case QBMetaData.DEST_DFS_FILE: + //CTAS path + break; + default: + throw new IllegalStateException("Unexpected dest_type=" + dest_tab); + } + FileSinkDesc fileSinkDesc = new FileSinkDesc( queryTmpdir, table_desc, @@ -7239,7 +7258,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path); + dest_path, + isInsertOverwrite); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); @@ -7249,24 +7269,6 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE : (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - - switch (dest_type) { - case QBMetaData.DEST_PARTITION: - //fall through - case QBMetaData.DEST_TABLE: - //INSERT [OVERWRITE] path - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); - } - break; - case QBMetaData.DEST_DFS_FILE: - //CTAS path - break; - default: - throw new IllegalStateException("Unexpected dest_type=" + dest_tab); - } acidFileSinks.add(fileSinkDesc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index a3df166..c71d86d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -115,7 +115,8 @@ public FileSinkDesc() { public FileSinkDesc(final Path dirName, final TableDesc tableInfo, final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, - final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) { + final ArrayList partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, + final boolean isInsertOverwrite) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -129,6 +130,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, this.dpCtx = dpCtx; this.dpSortState = DPSortState.NONE; this.destPath = destPath; + this.isInsertOverwrite = isInsertOverwrite; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -150,7 +152,7 @@ public FileSinkDesc(final Path dirName, final TableDesc tableInfo, public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath); + partitionCols, dpCtx, destPath, isInsertOverwrite); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -164,6 +166,7 @@ public Object clone() throws CloneNotSupportedException { ret.setWriteType(writeType); ret.setTransactionId(txnId); ret.setStatsTmpDir(statsTmpDir); + ret.setInsertOverwrite(isInsertOverwrite); return ret; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java new file mode 100644 index 0000000..ad1f7fc --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Custom counters with limits (this will only work if the execution engine exposes this counter) + */ +public class CustomCounterLimit implements CounterLimit { + + private String counterName; + private long limit; + + public CustomCounterLimit(final String counterName, final long limit) { + this.counterName = counterName; + this.limit = limit; + } + + @Override + public String getName() { + return counterName; + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new CustomCounterLimit(counterName, limit); + } + + @Override + public String toString() { + return "counter: " + counterName + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * counterName.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof CustomCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + CustomCounterLimit otherVcl = (CustomCounterLimit) other; + return counterName.equalsIgnoreCase(otherVcl.counterName) && limit == otherVcl.limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java index f16125d..29f7c89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -95,8 +95,18 @@ public static Expression fromString(final String expression) { return createExpression(vertexCounterLimit); } } - // unable to create expression at this point, invalid expression - throw new IllegalArgumentException("Invalid expression! " + expression); + + // if nothing matches, try creating a custom counter + try { + counterValue = getCounterValue(counterValueStr, null); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + CustomCounterLimit customCounterLimit = new CustomCounterLimit(counterName, counterValue); + return createExpression(customCounterLimit); } private static long getCounterValue(final String counterValueStr, final Validator validator) throws diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a8d7c9c..1c27873 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -285,7 +285,7 @@ private FileSinkOperator getFileSink(AcidUtils.Operation writeType, partColMap.put(PARTCOL_NAME, null); DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); //todo: does this need the finalDestination? - desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null); + desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java index ce1dc6e..cd78545 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -80,6 +80,12 @@ public void testSimpleQueryTrigger() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); + + expression = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + trigger = new ExecutionTrigger("write_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(100000)); } @Test @@ -166,6 +172,12 @@ public void testExpressionFromString() { assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertEquals(expected, expression); assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_WRITE_OPS > 10000"); + expected = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); + assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); } @Test