diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3025692..af53ab7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -3400,6 +3401,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + HIVE_TRIGGER_VALIDATION_INTERVAL_MS("hive.trigger.validation.interval.ms", "500ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" + + "validated for all SQL operations after every defined interval (default: 500ms) and corresponding action\n" + + "defined in the trigger will be taken"), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + "If this is false, Hive will use source table stats to determine reducer\n" + @@ -3947,6 +3954,18 @@ public static long toSizeBytes(String value) { return new String[] {value.substring(0, i), value.substring(i)}; } + private static Set daysSet = ImmutableSet.of("d", "D", "day", "DAY", "days", "DAYS"); + private static Set hoursSet = ImmutableSet.of("h", "H", "hour", "HOUR", "hours", "HOURS"); + private static Set minutesSet = ImmutableSet.of("m", "M", "min", "MIN", "mins", "MINS", + "minute", "MINUTE", "minutes", "MINUTES"); + private static Set secondsSet = ImmutableSet.of("s", "S", "sec", "SEC", "secs", "SECS", + "second", "SECOND", "seconds", "SECONDS"); + private static Set millisSet = ImmutableSet.of("ms", "MS", "msec", "MSEC", "msecs", "MSECS", + "millisecond", "MILLISECOND", "milliseconds", "MILLISECONDS"); + private static Set microsSet = ImmutableSet.of("us", "US", "usec", "USEC", "usecs", "USECS", + "microsecond", "MICROSECOND", "microseconds", "MICROSECONDS"); + private static Set nanosSet = ImmutableSet.of("ns", "NS", "nsec", "NSEC", "nsecs", "NSECS", + "nanosecond", "NANOSECOND", "nanoseconds", "NANOSECONDS"); public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { unit = unit.trim().toLowerCase(); if (unit.isEmpty() || unit.equals("l")) { @@ -3954,19 +3973,19 @@ public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { throw new IllegalArgumentException("Time unit is not specified"); } return defaultUnit; - } else if (unit.equals("d") || unit.startsWith("day")) { + } else if (daysSet.contains(unit)) { return TimeUnit.DAYS; - } else if (unit.equals("h") || unit.startsWith("hour")) { + } else if (hoursSet.contains(unit)) { return TimeUnit.HOURS; - } else if (unit.equals("m") || unit.startsWith("min")) { + } else if (minutesSet.contains(unit)) { return TimeUnit.MINUTES; - } else if (unit.equals("s") || unit.startsWith("sec")) { + } else if (secondsSet.contains(unit)) { return TimeUnit.SECONDS; - } else if (unit.equals("ms") || unit.startsWith("msec")) { + } else if (millisSet.contains(unit)) { return TimeUnit.MILLISECONDS; - } else if (unit.equals("us") || unit.startsWith("usec")) { + } else if (microsSet.contains(unit)) { return TimeUnit.MICROSECONDS; - } else if (unit.equals("ns") || unit.startsWith("nsec")) { + } else if (nanosSet.contains(unit)) { return TimeUnit.NANOSECONDS; } throw new IllegalArgumentException("Invalid time unit " + unit); diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java index 1fac2b0..d24668f 100644 --- a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java +++ b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java @@ -111,7 +111,7 @@ public void testUnitFor() throws Exception { Assert.assertEquals(TimeUnit.MILLISECONDS, HiveConf.unitFor("ms", null)); Assert.assertEquals(TimeUnit.MILLISECONDS, HiveConf.unitFor("msecs", null)); Assert.assertEquals(TimeUnit.MICROSECONDS, HiveConf.unitFor("us", null)); - Assert.assertEquals(TimeUnit.MICROSECONDS, HiveConf.unitFor("useconds", null)); + Assert.assertEquals(TimeUnit.MICROSECONDS, HiveConf.unitFor("usecs", null)); Assert.assertEquals(TimeUnit.NANOSECONDS, HiveConf.unitFor("ns", null)); Assert.assertEquals(TimeUnit.NANOSECONDS, HiveConf.unitFor("nsecs", null)); } diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index eeb6e58..1440983 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -204,6 +204,12 @@ ${hadoop.version} tests test + + + io.netty + netty-all + + org.apache.hadoop diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 91d0377..71aee8f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -101,7 +101,6 @@ private static String dataFileDir; private static Path kvDataFilePath; private static Path dataTypesFilePath; - private static final String tmpDir = System.getProperty("test.tmp.dir"); private static HiveConf conf = null; private Connection hs2Conn = null; @@ -138,7 +137,7 @@ public void setUp() throws Exception { hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); } - private Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { + public static Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { Connection conn = DriverManager.getConnection(jdbcURL, user, pwd); conn.createStatement().execute("set hive.support.concurrency = false"); return conn; @@ -158,11 +157,12 @@ public static void afterTest() throws Exception { } private void createTestTable(String tableName) throws Exception { - createTestTable(null, tableName); + createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString()); } - private void createTestTable(String database, String tableName) throws Exception { - Statement stmt = hs2Conn.createStatement(); + public static void createTestTable(Connection connection, String database, String tableName, String srcFile) throws + Exception { + Statement stmt = connection.createStatement(); if (database != null) { stmt.execute("CREATE DATABASE IF NOT EXISTS " + database); @@ -175,8 +175,7 @@ private void createTestTable(String database, String tableName) throws Exception + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); // load data - stmt.execute("load data local inpath '" - + kvDataFilePath.toString() + "' into table " + tableName); + stmt.execute("load data local inpath '" + srcFile + "' into table " + tableName); ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); assertTrue(res.next()); @@ -236,7 +235,7 @@ public void testLlapInputFormatEndToEnd() throws Exception { @Test(timeout = 60000) public void testNonAsciiStrings() throws Exception { - createTestTable("nonascii", "testtab_nonascii"); + createTestTable(hs2Conn, "nonascii", "testtab_nonascii", kvDataFilePath.toString()); RowCollector rowCollector = new RowCollector(); String nonAscii = "À côté du garçon"; diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java new file mode 100644 index 0000000..0a68f09 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestTriggersTezSessionPoolManager { + private static MiniHS2 miniHS2 = null; + private static String dataFileDir; + private static Path kvDataFilePath; + private static String triggerTestTable = "testtab1"; + private static String test5msSleepQuery = "select sleep(t1.under_col, 5), t1.value from " + triggerTestTable + + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; + private static String test500msSleepQuery = "select sleep(t1.under_col, 500), t1.value from " + triggerTestTable + + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; + + private static HiveConf conf = null; + private Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + @Test(timeout = 60000) + public void testTriggerSlowQueryElapsedTime() throws Exception { + Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test500msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerSlowQueryExecutionTime() throws Exception { + Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerHighShuffleBytes() throws Exception { + Trigger trigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerHighBytesRead() throws Exception { + Trigger trigger = new ExecutionTrigger("big_read", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerHighBytesWrite() throws Exception { + Trigger trigger = new ExecutionTrigger("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerTotalTasks() throws Exception { + Trigger trigger = new ExecutionTrigger("highly_parallel", ExpressionFactory.fromString("TOTAL_TASKS > 50"), + Trigger.Action.KILL_QUERY); + List setCmds = new ArrayList<>(); + setCmds.add("set mapred.min.split.size=100"); + setCmds.add("set mapred.max.split.size=100"); + setCmds.add("set tez.grouping.min-size=100"); + setCmds.add("set tez.grouping.max-size=100"); + runQueryWithTrigger(test5msSleepQuery, setCmds, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleTriggers1() throws Exception { + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > " + + "1000000"), + Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), "Query was " + + "cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleTriggers2() throws Exception { + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > " + + "100"), + Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", + ExpressionFactory.fromString("EXECUTION_TIME > 100000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), "Query was " + + "cancelled"); + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + private void runQueryWithTrigger(final String query, final List setCmds, final List triggers, + final String expect) + throws Exception { + MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); + when(triggersFetcher.fetch()).thenReturn(triggers); + TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + + Connection con = hs2Conn; + TestJdbcWithMiniLlap.createTestTable(con, null, triggerTestTable, kvDataFilePath.toString()); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + selStmt.executeQuery(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java new file mode 100644 index 0000000..86f5972 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestTriggersWorkloadManager { + private static MiniHS2 miniHS2 = null; + private static String dataFileDir; + private static Path kvDataFilePath; + private static String triggerTestTable = "testtab1"; + private static String test5msSleepQuery = "select sleep(t1.under_col, 5), t1.value from " + triggerTestTable + + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; + private static String test500msSleepQuery = "select sleep(t1.under_col, 500), t1.value from " + triggerTestTable + + " t1 join " + triggerTestTable + " t2 on t1.under_col>=t2.under_col"; + + private static HiveConf conf = null; + private static TestWorkloadManager.WorkloadManagerForTest wm; + private Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + conf.setBoolean("hive.test.workload.management", true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + @Test(timeout = 60000) + public void testTriggerSlowQueryElapsedTime() throws Exception { + Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test500msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerSlowQueryExecutionTime() throws Exception { + Trigger trigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerHighShuffleBytes() throws Exception { + Trigger trigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerHighBytesRead() throws Exception { + Trigger trigger = new ExecutionTrigger("big_read", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerHighBytesWrite() throws Exception { + Trigger trigger = new ExecutionTrigger("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testTriggerTotalTasks() throws Exception { + Trigger trigger = new ExecutionTrigger("highly_parallel", ExpressionFactory.fromString("TOTAL_TASKS > 50"), + Trigger.Action.KILL_QUERY); + List setCmds = new ArrayList<>(); + setCmds.add("set mapred.min.split.size=100"); + setCmds.add("set mapred.max.split.size=100"); + setCmds.add("set tez.grouping.min-size=100"); + setCmds.add("set tez.grouping.max-size=100"); + runQueryWithTrigger(test5msSleepQuery, setCmds, Lists.newArrayList(trigger), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleTriggers1() throws Exception { + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", + ExpressionFactory.fromString("HDFS_BYTES_READ > 100000"), + Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), + "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleTriggers2() throws Exception { + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", ExpressionFactory.fromString("HDFS_BYTES_READ > 100"), + Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", + ExpressionFactory.fromString("EXECUTION_TIME > 100000"), + Trigger.Action.KILL_QUERY); + runQueryWithTrigger(test5msSleepQuery, null, Lists.newArrayList(shuffleTrigger, execTimeTrigger), + "Query was cancelled"); + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + private void runQueryWithTrigger(final String query, final List setCmds, final List triggers, + final String expect) throws Exception { + WorkloadManager wm = WorkloadManager.getInstance(); + WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState()); + when(poolState.getTriggers()).thenReturn(triggers); + wm.getPools().put("llap", poolState); + + Connection con = hs2Conn; + TestJdbcWithMiniLlap.createTestTable(con, null, triggerTestTable, kvDataFilePath.toString()); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + selStmt.executeQuery(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } +} \ No newline at end of file diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index 0415169..9f3ec38 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -392,8 +392,12 @@ public void onRemoval(RemovalNotification arg) { this.hostProxies = cb.build(); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.token = token; - String tokenUser = getTokenUser(token); - this.tokenUser = tokenUser; + if (token != null) { + String tokenUser = getTokenUser(token); + this.tokenUser = tokenUser; + } else { + this.tokenUser = null; + } this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 2753f1f..aaa3102 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.TriggerContext; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -149,10 +150,20 @@ */ private Map insertBranchToNamePrefix = new HashMap<>(); private Operation operation = Operation.OTHER; + private TriggerContext triggerContext; + public void setOperation(Operation operation) { this.operation = operation; } + public TriggerContext getTriggerContext() { + return triggerContext; + } + + public void setTriggerContext(final TriggerContext triggerContext) { + this.triggerContext = triggerContext; + } + /** * These ops require special handling in various places * (note that Insert into Acid table is in OTHER category) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 1943c6d..927c572 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.TriggerContext; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; @@ -484,6 +485,9 @@ private int compile(String command, boolean resetTaskIds, boolean deferClose) { String queryId = queryState.getQueryId(); + if (ctx != null) { + setTriggerContext(); + } //save some info for webUI for use after plan is freed this.queryDisplay.setQueryStr(queryStr); this.queryDisplay.setQueryId(queryId); @@ -528,6 +532,7 @@ public void run() { } if (ctx == null) { ctx = new Context(conf); + setTriggerContext(); } ctx.setTryCount(getTryCount()); @@ -712,6 +717,11 @@ public void run() { } } + private void setTriggerContext() { + TriggerContext triggerContext = new TriggerContext(queryInfo); + ctx.setTriggerContext(triggerContext); + } + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java new file mode 100644 index 0000000..d299787 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles only Kill Action. + */ +public class KillTriggerActionHandler implements TriggerActionHandler { + private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); + + @Override + public void applyAction(final Map queriesViolated) { + for (Map.Entry entry : queriesViolated.entrySet()) { + switch (entry.getValue()) { + case KILL_QUERY: + TezSessionState sessionState = entry.getKey(); + String queryId = sessionState.getTriggerContext().getQueryId(); + try { + KillQuery killQuery = sessionState.getKillQuery(); + // if kill query is null then session might have been released to pool or closed already + if (killQuery != null) { + sessionState.getKillQuery().killQuery(queryId); + } + } catch (HiveException e) { + LOG.warn("Unable to kill query {} for trigger violation"); + } + break; + default: + LOG.warn("Unsupported action: {}", entry.getValue()); + break; + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index edcecb0..8b72b60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,25 +18,29 @@ package org.apache.hadoop.hive.ql.exec.tez; -import java.util.HashSet; - -import java.util.concurrent.Semaphore; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.Set; +import java.util.concurrent.Semaphore; -import org.apache.tez.dag.api.TezConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.dag.api.TezConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -47,8 +51,7 @@ * In case the user specifies a queue explicitly, a new session is created * on that queue and assigned to the session state. */ -public class TezSessionPoolManager - implements SessionExpirationTracker.RestartImpl, Manager { +public class TezSessionPoolManager extends Manager implements SessionExpirationTracker.RestartImpl { private enum CustomQueueAllowed { TRUE, @@ -57,7 +60,6 @@ } private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class); - static final Random rdm = new Random(); private Semaphore llapQueue; private HiveConf initConf = null; @@ -79,6 +81,10 @@ /** This is used to close non-default sessions, and also all sessions when stopping. */ private final List openSessions = new LinkedList<>(); + private MetastoreGlobalTriggersFetcher globalTriggersFetcher; + private SessionTriggerProvider sessionTriggerProvider; + private TriggerActionHandler triggerActionHandler; + private TriggerValidatorRunnable triggerValidatorRunnable; /** Note: this is not thread-safe. */ public static TezSessionPoolManager getInstance() throws Exception { @@ -102,7 +108,7 @@ public void startPool() throws Exception { } } - public void setupPool(HiveConf conf) throws InterruptedException { + public void setupPool(HiveConf conf) throws Exception { String[] defaultQueueList = HiveConf.getTrimmedStringsVar( conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); this.initConf = conf; @@ -122,6 +128,14 @@ public void setupPool(HiveConf conf) throws InterruptedException { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); + sessionTriggerProvider = new SessionTriggerProvider(); + triggerActionHandler = new KillTriggerActionHandler(); + // TODO: update runnable to handle per pool validation + triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + Hive db = Hive.get(conf); + globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); + startManager(conf); + String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); try { @@ -295,20 +309,23 @@ public void stop() throws Exception { if ((instance == null) || !this.hasInitialSessions) { return; } - List sessionsToClose = null; synchronized (openSessions) { sessionsToClose = new ArrayList(openSessions); } + // we can just stop all the sessions for (TezSessionState sessionState : sessionsToClose) { if (sessionState.isDefault()) { sessionState.close(false); } } + if (expirationTracker != null) { expirationTracker.stop(); } + + instance = null; } /** @@ -326,6 +343,21 @@ public void destroy(TezSessionState tezSessionState) throws Exception { tezSessionState.close(false); } + @Override + SessionTriggerProvider getSessionTriggerProvider() { + return sessionTriggerProvider; + } + + @Override + TriggerActionHandler getTriggerActionHandler() { + return triggerActionHandler; + } + + @Override + TriggerValidatorRunnable getTriggerValidatorRunnable() { + return triggerValidatorRunnable; + } + protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { return new TezSessionPoolSession(sessionId, this, expirationTracker, conf); } @@ -450,6 +482,12 @@ public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.add(session); } + updateSessionsTriggers(); + } + + private void updateSessionsTriggers() { + sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); + sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); } /** Called by TezSessionPoolSession when closed. */ @@ -461,10 +499,27 @@ public void unregisterOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.remove(session); } + updateSessionsTriggers(); } @VisibleForTesting public SessionExpirationTracker getExpirationTracker() { return expirationTracker; } + + + @VisibleForTesting + public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlobalTriggersFetcher) { + this.globalTriggersFetcher = metastoreGlobalTriggersFetcher; + updateSessionsTriggers(); + } + + public List getTriggerCounterNames() { + List activeTriggers = sessionTriggerProvider.getActiveTriggers(); + List counterNames = new ArrayList<>(); + for (Trigger trigger : activeTriggers) { + counterNames.add(trigger.getExpression().getCounterLimit().getName()); + } + return counterNames; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 694f15b..5f20183 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -18,26 +18,27 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.registry.impl.TezAmInstance; - -import org.apache.hadoop.security.token.Token; -import org.apache.tez.common.security.JobTokenIdentifier; - -import org.apache.hadoop.conf.Configuration; - import java.io.IOException; import java.net.URISyntaxException; import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.login.LoginException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.tez.dag.api.TezException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * TezSession that is aware of the session pool, and also keeps track of expiration and use. @@ -52,13 +53,27 @@ class TezSessionPoolSession extends TezSessionState { private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2; - interface Manager { - void registerOpenSession(TezSessionPoolSession session); - void unregisterOpenSession(TezSessionPoolSession session); - void returnAfterUse(TezSessionPoolSession session) throws Exception; - TezSessionState reopen(TezSessionState session, Configuration conf, - String[] inputOutputJars) throws Exception; - void destroy(TezSessionState session) throws Exception; + public static abstract class Manager { + abstract void registerOpenSession(TezSessionPoolSession session); + abstract void unregisterOpenSession(TezSessionPoolSession session); + abstract void returnAfterUse(TezSessionPoolSession session) throws Exception; + abstract TezSessionState reopen(TezSessionState session, Configuration conf, + String[] inputOutputJars) throws Exception; + abstract void destroy(TezSessionState session) throws Exception; + + abstract SessionTriggerProvider getSessionTriggerProvider(); + abstract TriggerActionHandler getTriggerActionHandler(); + abstract TriggerValidatorRunnable getTriggerValidatorRunnable(); + + public void startManager(Configuration conf) { + long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + } } private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 363443d..aab4ac4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -57,8 +57,10 @@ import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.TriggerContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -112,6 +114,8 @@ private final Set localizedResources = new HashSet(); private boolean doAsEnabled; private boolean isLegacyLlapMode; + private TriggerContext triggerContext; + private KillQuery killQuery; /** * Constructor. We do not automatically connect, because we only want to @@ -751,4 +755,20 @@ public void destroy() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().destroy(this); } + + public TriggerContext getTriggerContext() { + return triggerContext; + } + + public void setTriggerContext(final TriggerContext triggerContext) { + this.triggerContext = triggerContext; + } + + public void setKillQuery(final KillQuery killQuery) { + this.killQuery = killQuery; + } + + public KillQuery getKillQuery() { + return killQuery; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 28d91cc..101e079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,14 +18,13 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -43,9 +42,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.QueryInfo; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -59,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.TriggerContext; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -84,7 +87,6 @@ import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.json.JSONObject; -import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import com.google.common.annotations.VisibleForTesting; @@ -149,14 +151,23 @@ public int execute(DriverContext driverContext) { if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } + Set desiredCounters = new HashSet<>(); if (WorkloadManager.isInUse(ss.getConf())) { + WorkloadManager wm = WorkloadManager.getInstance(); // TODO: in future, we may also pass getUserIpAddress. // Note: for now this will just block to wait for a session based on parallelism. - session = WorkloadManager.getInstance().getSession(session, ss.getUserName(), conf); + session = wm.getSession(session, ss.getUserName(), conf); + desiredCounters.addAll(wm.getTriggerCounterNames()); } else { - session = TezSessionPoolManager.getInstance().getSession( - session, conf, false, getWork().getLlapMode()); + TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); + session = pm.getSession(session, conf, false, getWork().getLlapMode()); + desiredCounters.addAll(pm.getTriggerCounterNames()); } + + TriggerContext triggerContext = ctx.getTriggerContext(); + triggerContext.setDesiredCounters(desiredCounters); + session.setTriggerContext(triggerContext); + LOG.info("Subscribed to counters: {} for queryId: {}", desiredCounters, triggerContext.getQueryId()); ss.setTezSession(session); try { // jobConf will hold all the configuration for hadoop, tez, and hive @@ -219,7 +230,7 @@ public int execute(DriverContext driverContext) { } // finally monitor will print progress until the job is done - TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),dagClient, conf, dag, ctx); + TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx); rc = monitor.monitorExecution(); if (rc != 0) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java new file mode 100644 index 0000000..42cb3d8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TriggerValidatorRunnable implements Runnable { + protected static transient Logger LOG = LoggerFactory.getLogger(TriggerValidatorRunnable.class); + private final SessionTriggerProvider sessionTriggerProvider; + private final TriggerActionHandler triggerActionHandler; + + TriggerValidatorRunnable(final SessionTriggerProvider sessionTriggerProvider, + final TriggerActionHandler triggerActionHandler) { + this.sessionTriggerProvider = sessionTriggerProvider; + this.triggerActionHandler = triggerActionHandler; + } + + @Override + public void run() { + try { + Map violatedSessions = new HashMap<>(); + final List sessions = sessionTriggerProvider.getOpenSessions(); + final List triggers = sessionTriggerProvider.getActiveTriggers(); + for (TezSessionState s : sessions) { + TriggerContext triggerContext = s.getTriggerContext(); + if (triggerContext != null) { + Map currentCounters = triggerContext.getCurrentCounters(); + for (Trigger t : triggers) { + String desiredCounter = t.getExpression().getCounterLimit().getName(); + // there could be interval where desired counter value is not populated by the time we make this check + if (currentCounters.containsKey(desiredCounter)) { + if (t.apply(currentCounters.get(desiredCounter))) { + String queryId = s.getTriggerContext().getQueryId(); + LOG.info("Query {} violated trigger {}. Going to apply action {}", queryId, t, t.getAction()); + violatedSessions.put(s, t.getAction()); + } + } + } + } + } + + if (!violatedSessions.isEmpty()) { + triggerActionHandler.applyAction(violatedSessions); + } + } catch (Throwable t) { + // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch + LOG.warn(TriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java new file mode 100644 index 0000000..85b5049 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TriggerViolationActionHandler implements TriggerActionHandler { + private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class); + + @Override + public void applyAction(final Map queriesViolated) { + for (Map.Entry entry : queriesViolated.entrySet()) { + switch (entry.getValue()) { + case KILL_QUERY: + TezSessionState sessionState = entry.getKey(); + String queryId = sessionState.getTriggerContext().getQueryId(); + try { + sessionState.getKillQuery().killQuery(queryId); + } catch (HiveException e) { + LOG.warn("Unable to kill query {} for trigger violation"); + } + break; + case MOVE_TO_POOL: + // TODO + break; + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 3f62127..104b69f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -17,20 +17,23 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.util.concurrent.TimeoutException; - -import java.util.concurrent.TimeUnit; - import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.IdentityHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -45,8 +48,7 @@ /** Workload management entry point for HS2. */ -public class WorkloadManager - implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { +public class WorkloadManager extends TezSessionPoolSession.Manager implements SessionExpirationTracker.RestartImpl { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); // TODO: this is a temporary setting that will go away, so it's not in HiveConf. public static final String TEST_WM_CONFIG = "hive.test.workload.management"; @@ -57,20 +59,24 @@ private final RestrictedConfigChecker restrictedConfig; private final QueryAllocationManager allocationManager; private final String yarnQueue; - // TODO: it's not clear that we need to track this - unlike PoolManager we don't have non-pool - // sessions, so the pool itself could internally track the sessions it gave out, since - // calling close on an unopened session is probably harmless. - private final IdentityHashMap openSessions = - new IdentityHashMap<>(); /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); - private final HashMap pools = new HashMap<>(); + private final List openSessions = new LinkedList<>(); + private final Map pools = new HashMap<>(); private final int amRegistryTimeoutMs; + private SessionTriggerProvider sessionTriggerProvider; + private TriggerActionHandler triggerActionHandler; + private TriggerValidatorRunnable triggerValidatorRunnable; - private static class PoolState { + public static class PoolState { // Add stuff here as WM is implemented. private final Object lock = new Object(); private final List sessions = new ArrayList<>(); + private final List triggers = new ArrayList<>(); + + public List getTriggers() { + return triggers; + } } // TODO: this is temporary before HiveServerEnvironment is merged. @@ -114,7 +120,6 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf) { this.yarnQueue = yarnQueue; this.conf = conf; initializeHivePools(); - this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); sessions = new TezSessionPool<>(conf, numSessions, true); @@ -125,12 +130,18 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf) { for (int i = 0; i < numSessions; i++) { sessions.addInitialSession(createSession()); } + // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers) + sessionTriggerProvider = new SessionTriggerProvider(); + triggerActionHandler = new TriggerViolationActionHandler(); + triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + startManager(conf); } private void initializeHivePools() { // TODO: real implementation poolsLock.writeLock().lock(); try { + // FIXME: Add Triggers from metastore to poolstate pools.put("llap", new PoolState()); } finally { poolsLock.writeLock().unlock(); @@ -195,6 +206,7 @@ private void redistributePoolAllocations( poolsLock.readLock().unlock(); } allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); + updateSessionsTriggers(); } private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { @@ -259,15 +271,19 @@ public void start() throws Exception { public void stop() throws Exception { List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); + sessionsToClose = new ArrayList(openSessions); } - for (TezSessionState sessionState : sessionsToClose) { + + for (TezSessionPoolSession sessionState : sessionsToClose) { sessionState.close(false); } + if (expirationTracker != null) { expirationTracker.stop(); } allocationManager.stop(); + + INSTANCE = null; } private WmTezSession createSession() { @@ -317,8 +333,9 @@ private WmTezSession ensureOwnedSession(TezSessionState oldSession) { @Override public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { - openSessions.put(session, true); + openSessions.add(session); } + updateSessionsTriggers(); } /** Called by TezSessionPoolSession when closed. */ @@ -327,6 +344,18 @@ public void unregisterOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.remove(session); } + updateSessionsTriggers(); + } + + private void updateSessionsTriggers() { + List openSessions = new ArrayList<>(); + List activeTriggers = new ArrayList<>(); + for (PoolState poolState : pools.values()) { + activeTriggers.addAll(poolState.getTriggers()); + openSessions.addAll(poolState.sessions); + } + sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); + sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(activeTriggers)); } @VisibleForTesting @@ -363,7 +392,36 @@ public void destroy(TezSessionState session) throws Exception { redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); } + @Override + SessionTriggerProvider getSessionTriggerProvider() { + return sessionTriggerProvider; + } + + @Override + TriggerActionHandler getTriggerActionHandler() { + return triggerActionHandler; + } + + @Override + TriggerValidatorRunnable getTriggerValidatorRunnable() { + return triggerValidatorRunnable; + } + + @VisibleForTesting + public Map getPools() { + return pools; + } + protected final HiveConf getConf() { return conf; } + + public List getTriggerCounterNames() { + List activeTriggers = sessionTriggerProvider.getActiveTriggers(); + List counterNames = new ArrayList<>(); + for (Trigger trigger : activeTriggers) { + counterNames.add(trigger.getExpression().getCounterLimit().getName()); + } + return counterNames; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 0de9de5..ee63697 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -18,20 +18,35 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.hadoop.hive.ql.exec.tez.monitoring; -import com.google.common.base.Preconditions; +import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.StringWriter; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; -import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; +import org.apache.hadoop.hive.ql.wm.TriggerContext; +import org.apache.hadoop.hive.ql.wm.VertexCounterLimit; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; @@ -41,17 +56,10 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.StringWriter; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; +import com.google.common.base.Preconditions; /** * TezJobMonitor keeps track of a tez job while it's being executed. It will @@ -61,6 +69,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class TezJobMonitor { static final String CLASS_NAME = TezJobMonitor.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private static final int MIN_CHECK_INTERVAL = 200; private static final int MAX_CHECK_INTERVAL = 1000; private static final int MAX_RETRY_INTERVAL = 2500; @@ -146,7 +155,7 @@ public int monitorExecution() { DAGStatus.State lastState = null; boolean running = false; - int checkInterval = MIN_CHECK_INTERVAL; + long checkInterval = MIN_CHECK_INTERVAL; while (true) { try { @@ -154,8 +163,17 @@ public int monitorExecution() { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet(), checkInterval); + status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); + TezCounters dagCounters = status.getDAGCounters(); vertexProgressMap = status.getVertexProgress(); + TriggerContext triggerContext = context.getTriggerContext(); + if (dagCounters != null && triggerContext != null) { + Set desiredCounters = triggerContext.getDesiredCounters(); + if (desiredCounters != null) { + Map currentCounters = getCounterValues(dagCounters, vertexProgressMap, desiredCounters); + triggerContext.setCurrentCounters(currentCounters); + } + } DAGStatus.State state = status.getState(); failedCounter = 0; // AM is responsive again (recovery?) @@ -235,8 +253,7 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console - .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); rc = 1; done = true; } else { @@ -263,6 +280,50 @@ public int monitorExecution() { return rc; } + private Map getCounterValues(final TezCounters dagCounters, + final Map vertexProgressMap, + final Set desiredCounters) { + // DAG specific counters + Map updatedCounters = new HashMap<>(); + for (CounterGroup counterGroup : dagCounters) { + for (TezCounter tezCounter : counterGroup) { + String counterName = tezCounter.getName(); + if (desiredCounters.contains(counterName)) { + updatedCounters.put(counterName, tezCounter.getValue()); + } + } + } + + // Process per vertex counters. + String counterName = VertexCounterLimit.VertexCounter.TOTAL_TASKS.name(); + if (desiredCounters.contains(counterName) && vertexProgressMap != null) { + for (Map.Entry entry : vertexProgressMap.entrySet()) { + // TOTAL_TASKS counter is per vertex counter, but triggers are validated at query level + // looking for query level violations. So we always choose max TOTAL_TASKS among all vertices. + // Publishing TOTAL_TASKS for all vertices is not really useful from the context of triggers. + long currentMax = 0; + if (updatedCounters.containsKey(counterName)) { + currentMax = updatedCounters.get(counterName); + } + long totalTasks = Math.max(currentMax, entry.getValue().getTotalTaskCount()); + updatedCounters.put(counterName, totalTasks); + } + } + + // Time based counters + counterName = TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(); + if (desiredCounters.contains(counterName)) { + updatedCounters.put(counterName, context.getTriggerContext().getQueryInfo().getElapsedTime()); + } + + counterName = TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(); + if (desiredCounters.contains(counterName) && executionStartTime > 0) { + updatedCounters.put(counterName, System.currentTimeMillis() - executionStartTime); + } + + return updatedCounters; + } + private void printSummary(boolean success, Map progressMap) { if (isProfilingEnabled() && success && progressMap != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 4820fed..2bdb719 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -1830,12 +1830,14 @@ public void setTezSession(TezSessionState session) { if (tezSessionState == session) return; // The same object. if (tezSessionState != null) { tezSessionState.markFree(); + tezSessionState.setKillQuery(null); tezSessionState = null; } + tezSessionState = session; if (session != null) { session.markInUse(); + tezSessionState.setKillQuery(getKillQuery()); } - tezSessionState = session; } public String getUserName() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java new file mode 100644 index 0000000..0fc5eaa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Counter limit interface for defining counter name and its limit exceeding which action defined in trigger will get + * executed. + */ +public interface CounterLimit { + /** + * Get the name of the counter. + * + * @return name + */ + String getName(); + + /** + * Get the threshold value for the counter + * + * @return limit + */ + long getLimit(); + + /** + * Return cloned copy of this counter limit + * + * @return cloned copy + */ + CounterLimit clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java new file mode 100644 index 0000000..3529011 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Trigger with query level scope that contains a name, trigger expression violating which defined action will be + * executed. + */ +public class ExecutionTrigger implements Trigger { + private String name; + private Expression expression; + private Action action; + + public ExecutionTrigger(final String name, final Expression expression, final Action action) { + this.name = name; + this.expression = expression; + this.action = action; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Action getAction() { + return action; + } + + @Override + public Trigger clone() { + return new ExecutionTrigger(name, expression.clone(), action); + } + + @Override + public boolean apply(final long current) { + return expression.evaluate(current); + } + + @Override + public String toString() { + return "{ name: " + name + ", expression: " + expression + ", action: " + action + " }"; + } + + @Override + public int hashCode() { + int hash = name == null ? 31 : 31 * name.hashCode(); + hash += expression == null ? 31 * hash : 31 * hash * expression.hashCode(); + hash += action == null ? 31 * hash : 31 * hash * action.hashCode(); + return hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof ExecutionTrigger)) { + return false; + } + + if (other == this) { + return true; + } + + ExecutionTrigger otherQR = (ExecutionTrigger) other; + return Objects.equals(name, otherQR.name) && + Objects.equals(expression, otherQR.expression) && + Objects.equals(action, otherQR.action); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java new file mode 100644 index 0000000..add933f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Expression that is defined in triggers. + * Most expressions will get triggered only after exceeding a limit. As a result, only greater than (>) expression + * is supported. + */ +public interface Expression { + + enum Predicate { + GREATER_THAN(">"); + + String symbol; + + Predicate(final String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + } + + interface Builder { + Builder greaterThan(CounterLimit counter); + + Expression build(); + } + + /** + * Evaluate current value against this expression. Return true if expression evaluates to true (current > limit) + * else false otherwise + * + * @param current - current value against which expression will be evaluated + * @return + */ + boolean evaluate(final long current); + + /** + * Return counter limit + * + * @return counter limit + */ + CounterLimit getCounterLimit(); + + /** + * Return predicate defined in the expression. + * + * @return predicate + */ + Predicate getPredicate(); + + /** + * Return cloned copy of this expression. + * + * @return cloned copy + */ + Expression clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java new file mode 100644 index 0000000..f16125d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.Validator; + +/** + * Factory to create expressions + */ +public class ExpressionFactory { + + public static Expression fromString(final String expression) { + if (expression == null || expression.isEmpty()) { + return null; + } + + // TODO: Only ">" predicate is supported right now, this has to be extended to support expression tree when + // multiple conditions are required. HIVE-17622 + + String[] tokens = expression.split(Expression.Predicate.GREATER_THAN.getSymbol()); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid predicate in expression"); + } + + final String counterName = tokens[0].trim(); + final String counterValueStr = tokens[1].trim(); + if (counterName.isEmpty()) { + throw new IllegalArgumentException("Counter name cannot be empty!"); + } + + // look for matches in file system counters + long counterValue; + for (FileSystemCounterLimit.FSCounter fsCounter : FileSystemCounterLimit.FSCounter.values()) { + if (counterName.toUpperCase().endsWith(fsCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, new Validator.SizeValidator()); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + // this is file system counter, valid and create counter + FileSystemCounterLimit fsCounterLimit = FileSystemCounterLimit.fromName(counterName, counterValue); + return createExpression(fsCounterLimit); + } + } + + // look for matches in time based counters + for (TimeCounterLimit.TimeCounter timeCounter : TimeCounterLimit.TimeCounter.values()) { + if (counterName.equalsIgnoreCase(timeCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, new Validator.TimeValidator(TimeUnit.MILLISECONDS)); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + TimeCounterLimit timeCounterLimit = new TimeCounterLimit( + TimeCounterLimit.TimeCounter.valueOf(counterName.toUpperCase()), counterValue); + return createExpression(timeCounterLimit); + } + } + + // look for matches in vertex specific counters + for (VertexCounterLimit.VertexCounter vertexCounter : VertexCounterLimit.VertexCounter.values()) { + if (counterName.equalsIgnoreCase(vertexCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, null); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + VertexCounterLimit vertexCounterLimit = new VertexCounterLimit( + VertexCounterLimit.VertexCounter.valueOf(counterName.toUpperCase()), counterValue); + return createExpression(vertexCounterLimit); + } + } + // unable to create expression at this point, invalid expression + throw new IllegalArgumentException("Invalid expression! " + expression); + } + + private static long getCounterValue(final String counterValueStr, final Validator validator) throws + NumberFormatException { + long counter; + try { + counter = Long.parseLong(counterValueStr); + } catch (NumberFormatException e) { + if (validator != null) { + if (validator instanceof Validator.SizeValidator) { + return HiveConf.toSizeBytes(counterValueStr); + } else if (validator instanceof Validator.TimeValidator) { + return HiveConf.toTime(counterValueStr, TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS); + } + } + throw e; + } + return counter; + } + + static Expression createExpression(CounterLimit counterLimit) { + return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java new file mode 100644 index 0000000..656747e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * File system specific counters with defined limits + */ +public class FileSystemCounterLimit implements CounterLimit { + + public enum FSCounter { + BYTES_READ, + BYTES_WRITTEN, + SHUFFLE_BYTES + } + + private String scheme; + private FSCounter fsCounter; + private long limit; + + public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase(); + this.fsCounter = fsCounter; + this.limit = limit; + } + + public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + String counterNameStr = counterName.toUpperCase(); + for (FSCounter fsCounter : FSCounter.values()) { + if (counterNameStr.endsWith(fsCounter.name())) { + int startIdx = counterNameStr.indexOf(fsCounter.name()); + if (startIdx == 0) { // exact match + return new FileSystemCounterLimit(null, FSCounter.valueOf(counterName), limit); + } else { + String scheme = counterNameStr.substring(0, startIdx - 1); + // schema/counter name validation will be done in grammar as part of HIVE-17622 + return new FileSystemCounterLimit(scheme, FSCounter.valueOf(fsCounter.name()), limit); + } + } + } + + throw new IllegalArgumentException("Invalid counter name specified " + counterName.toUpperCase() + ""); + } + + @Override + public String getName() { + return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new FileSystemCounterLimit(scheme, fsCounter, limit); + } + + @Override + public String toString() { + return "counter: " + getName() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * scheme.hashCode(); + hash += 31 * fsCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof FileSystemCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + FileSystemCounterLimit otherFscl = (FileSystemCounterLimit) other; + return scheme.equals(otherFscl.scheme) && fsCounter.equals(otherFscl.fsCounter) && limit == otherFscl.limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java new file mode 100644 index 0000000..db1a037 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.Hive; + +/** + * Fetch global (non-llap) rules from metastore + */ +public class MetastoreGlobalTriggersFetcher implements TriggersFetcher { + public static final String GLOBAL_TRIGGER_NAME = "global"; + private final MetastoreResourcePlanTriggersFetcher rpTriggersFetcher; + + public MetastoreGlobalTriggersFetcher(final Hive db) { + this.rpTriggersFetcher = new MetastoreResourcePlanTriggersFetcher(db); + } + + @Override + public List fetch(final String ignore) { + return fetch(); + } + + public List fetch() { + // TODO: + return rpTriggersFetcher.fetch(GLOBAL_TRIGGER_NAME); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java new file mode 100644 index 0000000..db390f2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.ql.metadata.Hive; + +/** + * Fetch pool specific rules from metastore + */ +public class MetastoreResourcePlanTriggersFetcher implements TriggersFetcher { + private final Hive db; + + public MetastoreResourcePlanTriggersFetcher(final Hive db) { + this.db = db; + } + + @Override + public List fetch(final String resourcePlanName) { + // TODO: implement after integration. + return new ArrayList<>(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java new file mode 100644 index 0000000..408aa2d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; + +/** + * Implementation for providing current open sessions and active trigger. + */ +public class SessionTriggerProvider { + private List openSessions = new ArrayList<>(); + private List activeTriggers = new ArrayList<>(); + + public SessionTriggerProvider() { + + } + + public SessionTriggerProvider(final List openSessions, final List triggers) { + this.openSessions = openSessions; + this.activeTriggers = triggers; + } + + public void setOpenSessions(final List openSessions) { + this.openSessions = openSessions; + } + + public void setActiveTriggers(final List activeTriggers) { + this.activeTriggers = activeTriggers; + } + + public List getOpenSessions() { + return Collections.unmodifiableList(openSessions); + } + + public List getActiveTriggers() { + return Collections.unmodifiableList(activeTriggers); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java new file mode 100644 index 0000000..3c16e1d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Time based counters with limits + */ +public class TimeCounterLimit implements CounterLimit { + public enum TimeCounter { + ELAPSED_TIME, + EXECUTION_TIME + } + + private TimeCounter timeCounter; + private long limit; + + public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + this.timeCounter = timeCounter; + this.limit = limit; + } + + @Override + public String getName() { + return timeCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new TimeCounterLimit(timeCounter, limit); + } + + @Override + public String toString() { + return "counter: " + timeCounter.name() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * timeCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TimeCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + TimeCounterLimit otherTcl = (TimeCounterLimit) other; + return timeCounter.equals(otherTcl.timeCounter) && limit == otherTcl.limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java new file mode 100644 index 0000000..bed0ac1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Trigger interface which gets mapped to CREATE TRIGGER .. queries. A trigger can have a name, expression and action. + * Trigger is a simple expression which gets evaluated during the lifecycle of query and executes an action + * if the expression defined in trigger evaluates to true. + */ +public interface Trigger { + + enum Action { + KILL_QUERY(""), + MOVE_TO_POOL(""); + + String poolName; + + Action(final String poolName) { + this.poolName = poolName; + } + + public Action setPoolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public String getPoolName() { + return poolName; + } + } + + /** + * Based on current value, returns true if trigger is applied else false. + * + * @param current - current value + * @return true if trigger got applied false otherwise + */ + boolean apply(long current); + + /** + * Get trigger expression + * + * @return expression + */ + Expression getExpression(); + + /** + * Return the name of the trigger + * + * @return trigger name + */ + String getName(); + + /** + * Return the action that will get executed when trigger expression evaluates to true + * + * @return action + */ + Action getAction(); + + /** + * Return cloned copy of this trigger + * + * @return clone copy + */ + Trigger clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java new file mode 100644 index 0000000..5cd24d5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; + +/** + * Interface for handling rule violations by queries and for performing actions defined in the rules. + */ +public interface TriggerActionHandler { + /** + * Applies the action defined in the rule for the specified queries + * + * @param queriesViolated - violated queries and the rule it violated + */ + void applyAction(Map queriesViolated); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java new file mode 100644 index 0000000..d15283a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.ql.QueryInfo; + +/** + * Some context information that are required for rule evaluation. + */ +public class TriggerContext { + private Set desiredCounters = new HashSet<>(); + private Map currentCounters = new HashMap<>(); + private QueryInfo queryInfo; + + public TriggerContext(final QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public void setQueryInfo(final QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } + + public String getQueryId() { + return queryInfo.getQueryDisplay().getQueryId(); + } + + public Set getDesiredCounters() { + return desiredCounters; + } + + public void setDesiredCounters(final Set desiredCounters) { + this.desiredCounters = desiredCounters; + } + + public Map getCurrentCounters() { + return currentCounters; + } + + public void setCurrentCounters(final Map currentCounters) { + this.currentCounters = currentCounters; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java new file mode 100644 index 0000000..065ab79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Simple trigger expression for a rule. + */ +public class TriggerExpression implements Expression { + private CounterLimit counterLimit; + private Predicate predicate; + + public TriggerExpression(final CounterLimit counter, final Predicate predicate) { + this.counterLimit = counter; + this.predicate = predicate; + } + + @Override + public boolean evaluate(final long current) { + if (counterLimit.getLimit() > 0) { + if (predicate.equals(Predicate.GREATER_THAN)) { + return current > counterLimit.getLimit(); + } + } + return false; + } + + @Override + public CounterLimit getCounterLimit() { + return counterLimit; + } + + @Override + public Predicate getPredicate() { + return predicate; + } + + @Override + public Expression clone() { + return new TriggerExpression(counterLimit.clone(), predicate); + } + + @Override + public String toString() { + return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit(); + } + + @Override + public int hashCode() { + int hash = counterLimit == null ? 31 : 31 * counterLimit.hashCode(); + hash += predicate == null ? 31 * hash : 31 * hash * predicate.hashCode(); + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TriggerExpression)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(counterLimit, ((TriggerExpression) other).counterLimit) && + Objects.equals(predicate, ((TriggerExpression) other).predicate); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java new file mode 100644 index 0000000..c25ea3c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; + +/** + * Interface to fetch rules + */ +public interface TriggersFetcher { + List fetch(final String resourcePlanName); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java new file mode 100644 index 0000000..dd19ce6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Vertex specific counters with limits + */ +public class VertexCounterLimit implements CounterLimit { + public enum VertexCounter { + TOTAL_TASKS + } + + private VertexCounter vertexCounter; + private long limit; + + public VertexCounterLimit(final VertexCounter vertexCounter, final long limit) { + this.vertexCounter = vertexCounter; + this.limit = limit; + } + + @Override + public String getName() { + return vertexCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new VertexCounterLimit(vertexCounter, limit); + } + + @Override + public String toString() { + return "counter: " + vertexCounter.name() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * vertexCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof VertexCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + VertexCounterLimit otherVcl = (VertexCounterLimit) other; + return vertexCounter.equals(otherVcl.vertexCounter) && limit == otherVcl.limit; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index 05eb761..829ea8c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -48,7 +48,7 @@ public TestTezSessionPoolManager() { } @Override - public void setupPool(HiveConf conf) throws InterruptedException { + public void setupPool(HiveConf conf) throws Exception { conf.setVar(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME, ""); super.setupPool(conf); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 7adf895..17c62cf 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -18,19 +18,27 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.tez.dag.api.TezConfiguration; - -import java.util.List; - import org.junit.Test; public class TestWorkloadManager { - private static class MockQam implements QueryAllocationManager { + public static class MockQam implements QueryAllocationManager { boolean isCalled = false; @Override @@ -52,10 +60,10 @@ void assertWasCalled() { } } - private static class WorkloadManagerForTest extends WorkloadManager { + public static class WorkloadManagerForTest extends WorkloadManager { - WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam) { + public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, + QueryAllocationManager qam) { super(yarnQueue, conf, numSessions, qam, null); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java new file mode 100644 index 0000000..ce1dc6e --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * + */ +public class TestTrigger { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testSimpleQueryTrigger() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + trigger = new ExecutionTrigger("hdfs_write_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + trigger = new ExecutionTrigger("local_read_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + trigger = new ExecutionTrigger("local_write_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + trigger = new ExecutionTrigger("shuffle_heavy", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(1025)); + + expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 10000)); + trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.MOVE_TO_POOL.setPoolName("fake_pool")); + assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(100000)); + + expression = ExpressionFactory.createExpression(new VertexCounterLimit(VertexCounterLimit.VertexCounter + .TOTAL_TASKS,10000)); + trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); + assertFalse(trigger.apply(1000)); + assertTrue(trigger.apply(100000)); + } + + @Test + public void testExpressionFromString() { + Expression expression = ExpressionFactory.fromString("BYTES_READ>1024"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString("BYTES_READ > 1024"); + assertEquals(expected, expression); + + expression = ExpressionFactory.fromString(expected.toString()); + assertEquals(expected.toString(), expression.toString()); + + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString(" BYTES_READ > 1024 "); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString("BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_READ > 1024 "); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_READ > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" s3a_ByTeS_WRiTTeN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" EXECUTION_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" TOTAL_TASKS > 10000"); + expected = ExpressionFactory.createExpression(new VertexCounterLimit(VertexCounterLimit.VertexCounter + .TOTAL_TASKS,10000)); + assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testSizeValidationInTrigger() { + Expression expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100MB"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 gB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 TB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024L * 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100 B"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100bytes"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalSizeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300GiB"); + } + + @Test + public void testIllegalSizeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300 foo"); + } + + @Test + public void testTimeValidationInTrigger() { + Expression expression = ExpressionFactory.fromString(" elapsed_TIME > 300 s"); + Expression expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300s"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300000000 microseconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 1DAY"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 24 * 60 * 60 * 1000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalTimeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300 light years"); + } + + @Test + public void testIllegalTimeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300secTOR"); + } + + @Test + public void testTriggerClone() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY); + Trigger clonedTrigger = trigger.clone(); + assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger)); + assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression())); + assertNotEquals(System.identityHashCode(trigger.getExpression().getCounterLimit()), + System.identityHashCode(clonedTrigger.getExpression().getCounterLimit())); + assertEquals(trigger, clonedTrigger); + assertEquals(trigger.hashCode(), clonedTrigger.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + clonedTrigger = trigger.clone(); + assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger)); + assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression())); + assertNotEquals(System.identityHashCode(trigger.getExpression().getCounterLimit()), + System.identityHashCode(clonedTrigger.getExpression().getCounterLimit())); + assertEquals(trigger, clonedTrigger); + assertEquals(trigger.hashCode(), clonedTrigger.hashCode()); + } + + @Test + public void testIllegalExpressionsUnsupportedPredicate() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ < 1024"); + } + + @Test + public void testIllegalExpressionsMissingLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ >"); + } + + @Test + public void testIllegalExpressionsMissingCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Counter name cannot be empty!"); + ExpressionFactory.fromString("> 1024"); + } + + @Test + public void testIllegalExpressionsMultipleLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ > 1024 > 1025"); + } + + @Test + public void testIllegalExpressionsMultipleCounters() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ > BYTES_READ > 1025"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPost() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString("BYTES_READ > 1024aaaa"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPre() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid counter value"); + ExpressionFactory.fromString("BYTES_READ > foo1024"); + } + + @Test + public void testIllegalExpressionsInvalidNegativeLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a positive long value."); + ExpressionFactory.fromString("BYTES_READ > -1024"); + } +} diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 1cf4392..8224bcc 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -210,7 +210,9 @@ private void addOperation(Operation operation) { private Operation removeOperation(OperationHandle opHandle) { Operation operation = handleToOperation.remove(opHandle); - queryIdOperation.remove(getQueryId(operation)); + String queryId = getQueryId(operation); + queryIdOperation.remove(queryId); + LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index ec6657d..b121a06 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -112,6 +112,7 @@ private CuratorFramework zooKeeperClient; private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens private HttpServer webServer; // Web UI + private TezSessionPoolManager tezSessionPoolManager; private WorkloadManager wm; public HiveServer2() { @@ -126,6 +127,11 @@ public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { MetricsFactory.init(hiveConf); } + + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + tezSessionPoolManager.setupPool(hiveConf); + } } catch (Throwable t) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } @@ -528,6 +534,24 @@ public synchronized void start() { throw new ServiceException(e); } } + if (tezSessionPoolManager != null) { + try { + tezSessionPoolManager.startPool(); + LOG.info("Started tez session pool manager.."); + } catch (Exception e) { + LOG.error("Error starting tez session pool manager: ", e); + throw new ServiceException(e); + } + } + if (wm != null) { + try { + wm.start(); + LOG.info("Started workload manager.."); + } catch (Exception e) { + LOG.error("Error starting workload manager", e); + throw new ServiceException(e); + } + } } @Override @@ -562,9 +586,10 @@ public synchronized void stop() { } // There should already be an instance of the session pool manager. // If not, ignoring is fine while stopping HiveServer2. - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) && + tezSessionPoolManager != null) { try { - TezSessionPoolManager.getInstance().stop(); + tezSessionPoolManager.stop(); } catch (Exception e) { LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + "Shutting down HiveServer2 anyway.", e); @@ -614,13 +639,6 @@ private static void startHiveServer2() throws Throwable { TimeUnit.MILLISECONDS); HiveServer2 server = null; try { - // Initialize the pool before we start the server; don't start yet. - TezSessionPoolManager sessionPool = null; - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - sessionPool = TezSessionPoolManager.getInstance(); - sessionPool.setupPool(hiveConf); - } - // Cleanup the scratch dir before starting ServerUtils.cleanUpScratchDir(hiveConf); // Schedule task to cleanup dangling scratch dir periodically, @@ -640,13 +658,6 @@ private static void startHiveServer2() throws Throwable { "warned upon.", t); } - if (sessionPool != null) { - sessionPool.startPool(); - } - if (server.wm != null) { - server.wm.start(); - } - if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { SparkSessionManagerImpl.getInstance().setup(hiveConf); }