diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da30b37..a029d5b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.conf; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -3396,6 +3397,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + HIVE_RULE_VALIDATION_INTERVAL_MS("hive.rule.validation.interval.ms", "500ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Interval for validating rules during execution of a query. Rules defined in resource plan will get validated\n" + + "for all SQL operations after every defined interval (default: 200ms) and corresponding action defined in\n" + + "the rule will be taken"), + SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + "If this is false, Hive will use source table stats to determine reducer\n" + @@ -3943,6 +3950,18 @@ public static long toSizeBytes(String value) { return new String[] {value.substring(0, i), value.substring(i)}; } + private static Set daysSet = ImmutableSet.of("d", "D", "day", "DAY", "days", "DAYS"); + private static Set hoursSet = ImmutableSet.of("h", "H", "hour", "HOUR", "hours", "HOURS"); + private static Set minutesSet = ImmutableSet.of("m", "M", "min", "MIN", "mins", "MINS", + "minute", "MINUTE", "minutes", "MINUTES"); + private static Set secondsSet = ImmutableSet.of("s", "S", "sec", "SEC", "secs", "SECS", + "second", "SECOND", "seconds", "SECONDS"); + private static Set millisSet = ImmutableSet.of("ms", "MS", "msec", "MSEC", "msecs", "MSECS", + "millisecond", "MILLISECOND", "milliseconds", "MILLISECONDS"); + private static Set microsSet = ImmutableSet.of("us", "US", "usec", "USEC", "usec", "USECS", + "microsecond", "MICROSECOND", "microseconds", "MICROSECONDS"); + private static Set nanosSet = ImmutableSet.of("ns", "NS", "nsec", "NSEC", "nsecs", "NSECS", + "nanosecond", "NANOSECOND", "nanoseconds", "NANOSECONDS"); public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { unit = unit.trim().toLowerCase(); if (unit.isEmpty() || unit.equals("l")) { @@ -3950,19 +3969,19 @@ public static TimeUnit unitFor(String unit, TimeUnit defaultUnit) { throw new IllegalArgumentException("Time unit is not specified"); } return defaultUnit; - } else if (unit.equals("d") || unit.startsWith("day")) { + } else if (daysSet.contains(unit)) { return TimeUnit.DAYS; - } else if (unit.equals("h") || unit.startsWith("hour")) { + } else if (hoursSet.contains(unit)) { return TimeUnit.HOURS; - } else if (unit.equals("m") || unit.startsWith("min")) { + } else if (minutesSet.contains(unit)) { return TimeUnit.MINUTES; - } else if (unit.equals("s") || unit.startsWith("sec")) { + } else if (secondsSet.contains(unit)) { return TimeUnit.SECONDS; - } else if (unit.equals("ms") || unit.startsWith("msec")) { + } else if (millisSet.contains(unit)) { return TimeUnit.MILLISECONDS; - } else if (unit.equals("us") || unit.startsWith("usec")) { + } else if (microsSet.contains(unit)) { return TimeUnit.MICROSECONDS; - } else if (unit.equals("ns") || unit.startsWith("nsec")) { + } else if (nanosSet.contains(unit)) { return TimeUnit.NANOSECONDS; } throw new IllegalArgumentException("Invalid time unit " + unit); diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index eeb6e58..1440983 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -204,6 +204,12 @@ ${hadoop.version} tests test + + + io.netty + netty-all + + org.apache.hadoop diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRulesTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRulesTezSessionPoolManager.java new file mode 100644 index 0000000..84321bb --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRulesTezSessionPoolManager.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.QueryRule; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestRulesTezSessionPoolManager { + private static MiniHS2 miniHS2 = null; + private static String dataFileDir; + private static Path kvDataFilePath; + private static String ruleTestTable = "testtab1"; + private static String ruleQuery5msSleep = "select sleep(t1.under_col, 5), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; + private static String ruleQuery500msSleep = "select sleep(t1.under_col, 500), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; + + private static HiveConf conf = null; + private Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + private Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { + Connection conn = DriverManager.getConnection(jdbcURL, user, pwd); + conn.createStatement().execute("set hive.support.concurrency = false"); + return conn; + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + private void createTestTable(String tableName) throws Exception { + createTestTable(null, tableName); + } + + private void createTestTable(String database, String tableName) throws Exception { + Statement stmt = hs2Conn.createStatement(); + + if (database != null) { + stmt.execute("CREATE DATABASE IF NOT EXISTS " + database); + stmt.execute("USE " + database); + } + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); + + // load data + stmt.execute("load data local inpath '" + + kvDataFilePath.toString() + "' into table " + tableName); + + ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(res.next()); + assertEquals("val_238", res.getString(2)); + res.close(); + stmt.close(); + } + + @Test(timeout = 60000) + public void testRuleSlowQueryElapsedTime() throws Exception { + Rule rule = new QueryRule("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery500msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleSlowQueryExecutionTime() throws Exception { + Rule rule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery5msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleHighShuffleBytes() throws Exception { + Rule rule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery5msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleHighBytesRead() throws Exception { + Rule rule = new QueryRule("big_read", ExpressionFactory.fromString("FILE_BYTES_READ > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery5msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleHighBytesWrite() throws Exception { + Rule rule = new QueryRule("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery5msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleRules1() throws Exception { + Rule shuffleRule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100000"), + Rule.Action.KILL_QUERY); + Rule execTimeRule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery5msSleep, Lists.newArrayList(shuffleRule, execTimeRule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleRules2() throws Exception { + Rule shuffleRule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), + Rule.Action.KILL_QUERY); + Rule execTimeRule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 100000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery5msSleep, Lists.newArrayList(shuffleRule, execTimeRule), "Query was cancelled"); + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + private static class StaticRulesFetcher implements RuleFetcher { + private List rules; + + public StaticRulesFetcher(final List rules) { + this.rules = rules; + } + + @Override + public List fetch(final String resourcePlanName) { + return rules; + } + } + + private void runQueryWithRule(final String query, final List rules, final String expect, int queryTimeout, + final Class exceptionClass) + throws Exception { + TezSessionPoolManager tezSessionPoolManager = TezSessionPoolManager.getInstance(); + StaticRulesFetcher staticRulesFetcher = new StaticRulesFetcher(rules); + Thread rulesRefresher = new Thread(() -> { + while (true) { + if (tezSessionPoolManager.getGlobalRulesFetcher() == null) { + tezSessionPoolManager.setGlobalRulesFetcher(staticRulesFetcher); + break; + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // ignore + } + } + }); + rulesRefresher.start(); + + Connection con = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + selStmt.setQueryTimeout(queryTimeout); + selStmt.executeQuery(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(exceptionClass, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + + private void runQueryWithRule(final String query, final List rules, final String expect) + throws Exception { + runQueryWithRule(query, rules, expect, 0, SQLException.class); + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRulesWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRulesWorkloadManager.java new file mode 100644 index 0000000..4b56a6f --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestRulesWorkloadManager.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.QueryRule; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestRulesWorkloadManager { + private static MiniHS2 miniHS2 = null; + private static String dataFileDir; + private static Path kvDataFilePath; + private static String ruleTestTable = "testtab1"; + private static String ruleQuery10msSleep = "select sleep(t1.under_col, 10), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; + private static String ruleQuery500msSleep = "select sleep(t1.under_col, 500), t1.under_col, t2.under_col " + "from " + + ruleTestTable + " t1 cross join " + ruleTestTable + " t2 on t1.under_col = t2.under_col"; + + private static HiveConf conf = null; + private static TestWorkloadManager.WorkloadManagerForTest wm; + private Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + conf.setBoolean("hive.test.workload.management", true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + private Connection getConnection(String jdbcURL, String user, String pwd) throws SQLException { + Connection conn = DriverManager.getConnection(jdbcURL, user, pwd); + conn.createStatement().execute("set hive.support.concurrency = false"); + return conn; + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + private void createTestTable(String tableName) throws Exception { + createTestTable(null, tableName); + } + + private void createTestTable(String database, String tableName) throws Exception { + Statement stmt = hs2Conn.createStatement(); + + if (database != null) { + stmt.execute("CREATE DATABASE IF NOT EXISTS " + database); + stmt.execute("USE " + database); + } + + // create table + stmt.execute("DROP TABLE IF EXISTS " + tableName); + stmt.execute("CREATE TABLE " + tableName + + " (under_col INT COMMENT 'the under column', value STRING) COMMENT ' test table'"); + + // load data + stmt.execute("load data local inpath '" + + kvDataFilePath.toString() + "' into table " + tableName); + + ResultSet res = stmt.executeQuery("SELECT * FROM " + tableName); + assertTrue(res.next()); + assertEquals("val_238", res.getString(2)); + res.close(); + stmt.close(); + } + + @Test(timeout = 60000) + public void testRuleSlowQueryElapsedTime() throws Exception { + Rule rule = new QueryRule("slow_query", ExpressionFactory.fromString("ELAPSED_TIME > 20000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery500msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleSlowQueryExecutionTime() throws Exception { + Rule rule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery10msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleHighShuffleBytes() throws Exception { + Rule rule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery10msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleHighBytesRead() throws Exception { + Rule rule = new QueryRule("big_read", ExpressionFactory.fromString("FILE_BYTES_READ > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery10msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testRuleHighBytesWrite() throws Exception { + Rule rule = new QueryRule("big_write", ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery10msSleep, Lists.newArrayList(rule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleRules1() throws Exception { + Rule shuffleRule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100000"), + Rule.Action.KILL_QUERY); + Rule execTimeRule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 1000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery10msSleep, Lists.newArrayList(shuffleRule, execTimeRule), "Query was cancelled"); + } + + @Test(timeout = 60000) + public void testMultipleRules2() throws Exception { + Rule shuffleRule = new QueryRule("big_shuffle", ExpressionFactory.fromString("SHUFFLE_BYTES > 100"), + Rule.Action.KILL_QUERY); + Rule execTimeRule = new QueryRule("slow_query", ExpressionFactory.fromString("EXECUTION_TIME > 100000"), + Rule.Action.KILL_QUERY); + runQueryWithRule(ruleQuery10msSleep, Lists.newArrayList(shuffleRule, execTimeRule), "Query was cancelled"); + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + private static class StaticRulesFetcher implements RuleFetcher { + private List rules; + + public StaticRulesFetcher(final List rules) { + this.rules = rules; + } + + @Override + public List fetch(final String resourcePlanName) { + return rules; + } + } + + private void runQueryWithRule(final String query, final List rules, final String expect, int queryTimeout, + final Class exceptionClass) + throws Exception { + StaticRulesFetcher staticRulesFetcher = new StaticRulesFetcher(rules); + WorkloadManager wm = WorkloadManager.getInstance(); + Thread rulesRefresher = new Thread(() -> { + while (true) { + if (wm.getGlobalRulesFetcher() == null) { + wm.setGlobalRulesFetcher(staticRulesFetcher); + break; + } + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // ignore + } + } + }); + rulesRefresher.start(); + + Connection con = hs2Conn; + createTestTable(ruleTestTable); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + selStmt.setQueryTimeout(queryTimeout); + selStmt.executeQuery(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(exceptionClass, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + + private void runQueryWithRule(final String query, final List rules, final String expect) + throws Exception { + runQueryWithRule(query, rules, expect, 0, SQLException.class); + } +} \ No newline at end of file diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index 0415169..9f3ec38 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -392,8 +392,12 @@ public void onRemoval(RemovalNotification arg) { this.hostProxies = cb.build(); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.token = token; - String tokenUser = getTokenUser(token); - this.tokenUser = tokenUser; + if (token != null) { + String tokenUser = getTokenUser(token); + this.tokenUser = tokenUser; + } else { + this.tokenUser = null; + } this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep( connectionTimeoutMs, retrySleepMs, TimeUnit.MILLISECONDS); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 2753f1f..d67af2e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.parse.QB; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.RuleContext; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -149,10 +150,20 @@ */ private Map insertBranchToNamePrefix = new HashMap<>(); private Operation operation = Operation.OTHER; + private RuleContext ruleContext; + public void setOperation(Operation operation) { this.operation = operation; } + public RuleContext getRuleContext() { + return ruleContext; + } + + public void setRuleContext(final RuleContext ruleContext) { + this.ruleContext = ruleContext; + } + /** * These ops require special handling in various places * (note that Insert into Acid table is in OTHER category) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 1943c6d..baff0ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.RuleContext; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; @@ -484,6 +485,9 @@ private int compile(String command, boolean resetTaskIds, boolean deferClose) { String queryId = queryState.getQueryId(); + if (ctx != null) { + setRuleContext(); + } //save some info for webUI for use after plan is freed this.queryDisplay.setQueryStr(queryStr); this.queryDisplay.setQueryId(queryId); @@ -528,6 +532,7 @@ public void run() { } if (ctx == null) { ctx = new Context(conf); + setRuleContext(); } ctx.setTryCount(getTryCount()); @@ -712,6 +717,11 @@ public void run() { } } + private void setRuleContext() { + RuleContext ruleContext = new RuleContext(queryInfo); + ctx.setRuleContext(ruleContext); + } + private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException { boolean shouldOpenImplicitTxn = !ctx.isExplainPlan(); //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index d978a25..ffd7d90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -95,7 +95,7 @@ protected int getExecutorCount(boolean allowUpdate) { public void updateSessionsAsync(double totalMaxAlloc, List sessionsToUpdate) { // Do not make a remote call unless we have no information at all. int totalCount = getExecutorCount(false); - int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc); + int totalToDistribute = (int) Math.round(totalCount * totalMaxAlloc); double lastDelta = 0; for (int i = 0; i < sessionsToUpdate.size(); ++i) { WmTezSession session = sessionsToUpdate.get(i); @@ -116,7 +116,7 @@ public void updateSessionsAsync(double totalMaxAlloc, List session if (roundedAlloc < 0) { roundedAlloc = 0; // Can this happen? Delta cannot exceed 0.5. } - intAlloc = (int)roundedAlloc; + intAlloc = (int) roundedAlloc; } // Make sure we don't give out more than allowed due to double/rounding artifacts. if (intAlloc > totalToDistribute) { @@ -136,7 +136,7 @@ private void updateSessionAsync(final WmTezSession session, final int intAlloc) // HS2 session pool paths, and this patch removes the last one (reopen). UpdateQueryRequestProto request = UpdateQueryRequestProto .newBuilder().setGuaranteedTaskCount(intAlloc).build(); - amCommunicator.sendUpdateQuery(request, (AmPluginNode)session, new UpdateCallback(session)); + amCommunicator.sendUpdateQuery(request, session, new UpdateCallback(session)); } private final class UpdateCallback implements ExecuteRequestCallback { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java index f6b1c1d..5b2d0a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RestrictedConfigChecker.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.List; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -30,7 +32,7 @@ class RestrictedConfigChecker { private final static Logger LOG = LoggerFactory.getLogger(RestrictedConfigChecker.class); - private final List restrictedHiveConf = new ArrayList<>(); + private final List restrictedHiveConf = new ArrayList<>(); private final List restrictedNonHiveConf = new ArrayList<>(); private final HiveConf initConf; @@ -45,7 +47,7 @@ confName = confName.toLowerCase(); ConfVars cv = confVars.get(confName); if (cv != null) { - restrictedHiveConf.add(cv); + restrictedHiveConf.add(cv.varname); } else { LOG.warn("A restricted config " + confName + " is not recognized as a Hive setting."); restrictedNonHiveConf.add(confName); @@ -53,13 +55,13 @@ } } - public void validate(HiveConf conf) throws HiveException { - for (ConfVars var : restrictedHiveConf) { - String userValue = HiveConf.getVarWithoutType(conf, var), - serverValue = HiveConf.getVarWithoutType(initConf, var); + public void validate(Configuration conf) throws HiveException { + for (String var : restrictedHiveConf) { + String userValue = HiveConf.getVarWithoutType(conf, ConfVars.valueOf(var)), + serverValue = HiveConf.getVarWithoutType(initConf, ConfVars.valueOf(var)); // Note: with some trickery, we could add logic for each type in ConfVars; for now the // potential spurious mismatches (e.g. 0 and 0.0 for float) should be easy to work around. - validateRestrictedConfigValues(var.varname, userValue, serverValue); + validateRestrictedConfigValues(var, userValue, serverValue); } for (String var : restrictedNonHiveConf) { String userValue = conf.get(var), serverValue = initConf.get(var); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RuleValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RuleValidatorRunnable.java new file mode 100644 index 0000000..7185d43 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RuleValidatorRunnable.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hive.ql.wm.JobCounterListener; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleContext; + +public class RuleValidatorRunnable implements Runnable { + private final List sessions; + private final List rules; + private final ReentrantReadWriteLock sessionsPoolsLock; + private final RuleViolationActionHandler actionHandler; + + RuleValidatorRunnable(final List sessions, + final List rules, final ReentrantReadWriteLock sessionsPoolsLock, + final RuleViolationActionHandler actionHandler) { + this.sessions = sessions; + this.rules = rules; + this.sessionsPoolsLock = sessionsPoolsLock; + this.actionHandler = actionHandler; + } + + @Override + public void run() { + Map violatedSessions = new HashMap<>(); + sessionsPoolsLock.readLock().lock(); + try { + if (sessions != null) { + sessions.forEach(s -> { + RuleContext ruleContext = s.getRuleContext(); + if (ruleContext != null) { + JobCounterListener jobCounterListener = ruleContext.getJobCounterListener(); + if (jobCounterListener != null) { + Map currentCounters = jobCounterListener.getCounters(); + if (rules != null && currentCounters != null) { + rules.forEach(r -> { + String desiredCounter = r.getExpression().getCounterLimit().getName(); + if (r.apply(currentCounters.get(desiredCounter))) { + violatedSessions.put(s, r); + } + }); + } + } + } + }); + } + } finally { + sessionsPoolsLock.readLock().unlock(); + } + + // TODO: should this go into separate threadpool? + if (actionHandler != null && !violatedSessions.isEmpty()) { + actionHandler.applyAction(violatedSessions); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RuleViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RuleViolationActionHandler.java new file mode 100644 index 0000000..d38936d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RuleViolationActionHandler.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RuleViolationActionHandler implements RuleActionHandler { + private static final Logger LOG = LoggerFactory.getLogger(RuleViolationActionHandler.class); + + @Override + public void applyAction(final Map queriesViolated) { + if (queriesViolated != null) { + queriesViolated.forEach((s, r) -> { + if (r.getAction().equals(Rule.Action.KILL_QUERY)) { + try { + String queryId = s.getRuleContext().getQueryId(); + LOG.info("Query: {} violated Rule: {}", queryId, r); + s.getKillQuery().killQuery(queryId); + } catch (HiveException e) { + LOG.warn("Unable to kill query {} for rule {} violation", s.getRuleContext().getQueryId(), r); + } + } else { + // TODO: add support for moving to different pools + } + }); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index edcecb0..0649931 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -20,13 +20,18 @@ import java.util.HashSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +44,7 @@ import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * This class is for managing multiple tez sessions particularly when @@ -50,6 +56,8 @@ public class TezSessionPoolManager implements SessionExpirationTracker.RestartImpl, Manager { + private RuleFetcher globalRulesFetcher; + private enum CustomQueueAllowed { TRUE, FALSE, @@ -57,7 +65,6 @@ } private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class); - static final Random rdm = new Random(); private Semaphore llapQueue; private HiveConf initConf = null; @@ -79,6 +86,9 @@ /** This is used to close non-default sessions, and also all sessions when stopping. */ private final List openSessions = new LinkedList<>(); + private List rules = new ArrayList<>(); + private final ReentrantReadWriteLock sessionsRulesLock = new ReentrantReadWriteLock(); + private boolean isRuleValidatorStarted = false; /** Note: this is not thread-safe. */ public static TezSessionPoolManager getInstance() throws Exception { @@ -91,6 +101,9 @@ public static TezSessionPoolManager getInstance() throws Exception { } protected TezSessionPoolManager() { + if (!isRuleValidatorStarted) { + setupRuleValidator(); + } } public void startPool() throws Exception { @@ -100,6 +113,9 @@ public void startPool() throws Exception { if (expirationTracker != null) { expirationTracker.start(); } + if (!isRuleValidatorStarted) { + setupRuleValidator(); + } } public void setupPool(HiveConf conf) throws InterruptedException { @@ -131,6 +147,10 @@ public void setupPool(HiveConf conf) throws InterruptedException { ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED.varname); } + if (!isRuleValidatorStarted) { + setupRuleValidator(); + } + restrictedConfig = new RestrictedConfigChecker(conf); // Only creates the expiration tracker if expiration is configured. expirationTracker = SessionExpirationTracker.create(conf, this); @@ -160,6 +180,20 @@ public void setupPool(HiveConf conf) throws InterruptedException { } } + private void setupRuleValidator() { + if (initConf != null) { + long ruleValidationIntervalMs = HiveConf.getTimeVar(initConf, ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, TimeUnit + .MILLISECONDS); + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RuleValidator").build()); + RuleValidatorRunnable ruleValidatorRunnable = new RuleValidatorRunnable(openSessions, rules, sessionsRulesLock, + new RuleViolationActionHandler()); + scheduledExecutorService.scheduleWithFixedDelay(ruleValidatorRunnable, ruleValidationIntervalMs, + ruleValidationIntervalMs, TimeUnit.MILLISECONDS); + isRuleValidatorStarted = true; + } + } + // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration private TezSessionPoolSession createAndInitSession( String queue, boolean isDefault, HiveConf conf) { @@ -295,17 +329,18 @@ public void stop() throws Exception { if ((instance == null) || !this.hasInitialSessions) { return; } - - List sessionsToClose = null; + List sessionsToClose; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList<>(openSessions); } + // we can just stop all the sessions for (TezSessionState sessionState : sessionsToClose) { if (sessionState.isDefault()) { sessionState.close(false); } } + if (expirationTracker != null) { expirationTracker.stop(); } @@ -423,12 +458,14 @@ public TezSessionState reopen(TezSessionState sessionState, public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { List sessionsToClose = null; - synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); - } - for (TezSessionPoolSession sessionState : sessionsToClose) { - System.err.println("Shutting down tez session."); - closeIfNotDefault(sessionState, keepTmpDir); + sessionsRulesLock.readLock().lock(); + try { + for (TezSessionPoolSession sessionState : sessionsToClose) { + System.err.println("Shutting down tez session."); + closeIfNotDefault(sessionState, keepTmpDir); + } + } finally { + sessionsRulesLock.readLock().unlock(); } } @@ -447,8 +484,11 @@ public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws E /** Called by TezSessionPoolSession when opened. */ @Override public void registerOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { + sessionsRulesLock.writeLock().lock(); + try { openSessions.add(session); + } finally { + sessionsRulesLock.writeLock().unlock(); } } @@ -458,8 +498,11 @@ public void unregisterOpenSession(TezSessionPoolSession session) { if (LOG.isDebugEnabled()) { LOG.debug("Closed a pool session [" + this + "]"); } - synchronized (openSessions) { + sessionsRulesLock.writeLock().lock(); + try { openSessions.remove(session); + } finally { + sessionsRulesLock.writeLock().unlock(); } } @@ -467,4 +510,30 @@ public void unregisterOpenSession(TezSessionPoolSession session) { public SessionExpirationTracker getExpirationTracker() { return expirationTracker; } + + + public void refreshRules() { + RuleFetcher globalRulesFetcher = getGlobalRulesFetcher(); + if (globalRulesFetcher != null) { + sessionsRulesLock.writeLock().lock(); + try { + this.rules.clear(); + List newRules = globalRulesFetcher.fetch("global"); + if (newRules != null) { + this.rules.addAll(newRules); + } + } finally { + sessionsRulesLock.writeLock().unlock(); + } + } + } + + public void setGlobalRulesFetcher(final RuleFetcher globalRulesFetcher) { + this.globalRulesFetcher = globalRulesFetcher; + refreshRules(); + } + + public RuleFetcher getGlobalRulesFetcher() { + return globalRulesFetcher; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 363443d..435cab0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -57,8 +57,10 @@ import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.RuleContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -112,6 +114,9 @@ private final Set localizedResources = new HashSet(); private boolean doAsEnabled; private boolean isLegacyLlapMode; + private RuleContext ruleContext; + private Set desiredCounters; + private KillQuery killQuery; /** * Constructor. We do not automatically connect, because we only want to @@ -533,8 +538,8 @@ void close(boolean keepTmpDir) throws Exception { console = null; tezScratchDir = null; appJarLr = null; - additionalFilesNotFromConf.clear(); - localizedResources.clear(); + getAdditionalFilesNotFromConf().clear(); + getLocalizedResources().clear(); } public Set getAdditionalFilesNotFromConf() { @@ -551,9 +556,11 @@ private void closeClient(TezClient client) throws TezException, } public void cleanupScratchDir () throws IOException { - FileSystem fs = tezScratchDir.getFileSystem(conf); - fs.delete(tezScratchDir, true); - tezScratchDir = null; + if (tezScratchDir != null) { + FileSystem fs = tezScratchDir.getFileSystem(conf); + fs.delete(tezScratchDir, true); + tezScratchDir = null; + } } public String getSessionId() { @@ -751,4 +758,36 @@ public void destroy() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().destroy(this); } + + public RuleContext getRuleContext() { + return ruleContext; + } + + public void setRuleContext(final RuleContext ruleContext) { + this.ruleContext = ruleContext; + } + + public void setKillQuery(final KillQuery killQuery) { + this.killQuery = killQuery; + } + + public KillQuery getKillQuery() { + return killQuery; + } + + public void setDesiredCounters(final Set desiredCounters) { + this.desiredCounters = desiredCounters; + } + + public void mergeDesiredCounters(final Set moreCounters) { + if (this.desiredCounters == null) { + this.desiredCounters = moreCounters; + } else { + desiredCounters.addAll(moreCounters); + } + } + + public Set getDesiredCounters() { + return desiredCounters; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 28d91cc..45664bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -31,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -43,9 +42,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -59,6 +60,10 @@ import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.JobCounterListener; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleContext; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; @@ -84,7 +89,6 @@ import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.json.JSONObject; -import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import com.google.common.annotations.VisibleForTesting; @@ -149,14 +153,35 @@ public int execute(DriverContext driverContext) { if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } + RuleFetcher globalRuleFetcher; if (WorkloadManager.isInUse(ss.getConf())) { + WorkloadManager wm = WorkloadManager.getInstance(); // TODO: in future, we may also pass getUserIpAddress. // Note: for now this will just block to wait for a session based on parallelism. - session = WorkloadManager.getInstance().getSession(session, ss.getUserName(), conf); + session = wm.getSession(session, ss.getUserName(), ctx); + // if WM enabled, rules are bound to pools. There can be rules that have scope outside of LLAP. + globalRuleFetcher = wm.getGlobalRulesFetcher(); } else { - session = TezSessionPoolManager.getInstance().getSession( - session, conf, false, getWork().getLlapMode()); + TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); + session = pm.getSession(session, conf, false, getWork().getLlapMode()); + globalRuleFetcher = pm.getGlobalRulesFetcher(); + } + + if (globalRuleFetcher != null) { + List globalRules = globalRuleFetcher.fetch("global"); // FIXME: correct RP? + if (globalRules != null) { + Set desiredCounters = globalRules.stream().map(r -> r.getExpression().getCounterLimit().getName()) + .collect(Collectors.toSet()); + session.mergeDesiredCounters(desiredCounters); + } + } + JobCounterListener jcl = new JobCounterListenerImpl(session.getDesiredCounters()); + RuleContext ruleContext = ctx.getRuleContext(); + if (ruleContext != null) { + ruleContext.setJobCounterListener(jcl); + session.setRuleContext(ruleContext); } + session.setKillQuery(ss.getKillQuery()); ss.setTezSession(session); try { // jobConf will hold all the configuration for hadoop, tez, and hive @@ -219,7 +244,7 @@ public int execute(DriverContext driverContext) { } // finally monitor will print progress until the job is done - TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),dagClient, conf, dag, ctx); + TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), dagClient, conf, dag, ctx); rc = monitor.monitorExecution(); if (rc != 0) { @@ -781,4 +806,29 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set } } } + + + static class JobCounterListenerImpl implements JobCounterListener { + private Set counters; + private Map currentCounters; + + public JobCounterListenerImpl(final Set subscribedCounters) { + this.counters = subscribedCounters; + } + + @Override + public Set getSubscribedCounters() { + return counters; + } + + @Override + public void updateCounters(final Map counters) { + this.currentCounters = counters; + } + + @Override + public Map getCounters() { + return currentCounters; + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 3f62127..3185e29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -17,20 +17,27 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.util.concurrent.TimeoutException; - -import java.util.concurrent.TimeUnit; - import java.util.ArrayList; import java.util.HashMap; -import java.util.IdentityHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.MetastoreResourcePlanRulesFetcher; +import org.apache.hadoop.hive.ql.wm.Rule; +import org.apache.hadoop.hive.ql.wm.RuleFetcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -42,6 +49,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** Workload management entry point for HS2. */ @@ -57,20 +65,44 @@ private final RestrictedConfigChecker restrictedConfig; private final QueryAllocationManager allocationManager; private final String yarnQueue; - // TODO: it's not clear that we need to track this - unlike PoolManager we don't have non-pool - // sessions, so the pool itself could internally track the sessions it gave out, since - // calling close on an unopened session is probably harmless. - private final IdentityHashMap openSessions = - new IdentityHashMap<>(); - /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ - private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); + private final List openSessions = new LinkedList<>(); private final HashMap pools = new HashMap<>(); private final int amRegistryTimeoutMs; - - private static class PoolState { + // rules are usually updated less frequently. + // sessions can be updated much more frequently (as and when a new DAG/query is submitted). + // any updates to rules or sessions will have to be validated to make sure the rules are not violated. + // lock to avoid simultaneous updates to rules and sessions during rule validation + private final ReentrantReadWriteLock sessionsRulesLock = new ReentrantReadWriteLock(); + private List activeRules = new ArrayList<>(); + private RuleFetcher resourcePlanRulesFetcher; + private RuleFetcher globalRulesFetcher; + private boolean isRuleValidatorStarted = false; + + public static class PoolState { // Add stuff here as WM is implemented. private final Object lock = new Object(); - private final List sessions = new ArrayList<>(); + private List sessions = new ArrayList<>(); + private List rules; + + public PoolState(final String poolName) { + updateCounterNames(); + } + + private void updateCounterNames() { + if (rules != null && sessions != null) { + Set counters = rules.stream().map(r -> r.getExpression().getCounterLimit().getName()) + .collect(Collectors.toSet()); + sessions.forEach(s -> s.setDesiredCounters(counters)); + } + } + + public void setRules(final List rules) { + this.rules = rules; + } + + public List getRules() { + return rules; + } } // TODO: this is temporary before HiveServerEnvironment is merged. @@ -113,8 +145,8 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf) { QueryAllocationManager qam, Token amsToken) { this.yarnQueue = yarnQueue; this.conf = conf; + this.resourcePlanRulesFetcher = new MetastoreResourcePlanRulesFetcher(); initializeHivePools(); - this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); sessions = new TezSessionPool<>(conf, numSessions, true); @@ -125,21 +157,77 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf) { for (int i = 0; i < numSessions; i++) { sessions.addInitialSession(createSession()); } + if (!isRuleValidatorStarted) { + refreshRules(); + setupRuleValidator(); + } + } + + private void refreshRules() { + sessionsRulesLock.writeLock().lock(); + try { + if (pools != null) { + activeRules.clear(); + List newRules = pools.values() + .stream() + .map(PoolState::getRules) // get rules from PoolState + .filter(Objects::nonNull) // remove nulls + .flatMap(List::stream) // flatten multiple List to single List + .collect(Collectors.toList()); + activeRules.addAll(newRules); + if (globalRulesFetcher != null) { + activeRules.addAll(globalRulesFetcher.fetch("global")); + } + } + } finally { + sessionsRulesLock.writeLock().unlock(); + } + } + + private void setupRuleValidator() { + if (conf != null) { + long ruleValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars.HIVE_RULE_VALIDATION_INTERVAL_MS, TimeUnit + .MILLISECONDS); + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RuleValidator").build()); + RuleValidatorRunnable ruleValidatorRunnable = new RuleValidatorRunnable(openSessions, activeRules, sessionsRulesLock, + new RuleViolationActionHandler()); + scheduledExecutorService.scheduleWithFixedDelay(ruleValidatorRunnable, ruleValidationIntervalMs, + ruleValidationIntervalMs, TimeUnit.MILLISECONDS); + isRuleValidatorStarted = true; + } } private void initializeHivePools() { // TODO: real implementation - poolsLock.writeLock().lock(); + sessionsRulesLock.writeLock().lock(); try { - pools.put("llap", new PoolState()); + String poolName = "llap"; + PoolState poolState = new PoolState(poolName); + poolState.setRules(getResourcePlanRulesFetcher().fetch("resource-plan-name")); // FIXME: + pools.put(poolName, poolState); } finally { - poolsLock.writeLock().unlock(); + sessionsRulesLock.writeLock().unlock(); + } + } + + @VisibleForTesting + public void updateHivePools() { + sessionsRulesLock.writeLock().lock(); + try { + // TODO: updates any changes to rules in poolstate + String poolName = "llap"; + PoolState poolState = new PoolState(poolName); + poolState.setRules(getResourcePlanRulesFetcher().fetch("resource-plan-name")); // FIXME: + pools.put(poolName, poolState); + } finally { + sessionsRulesLock.writeLock().unlock(); } } public TezSessionState getSession( - TezSessionState session, String userName, HiveConf conf) throws Exception { - validateConfig(conf); + TezSessionState session, String userName, Context ctx) throws Exception { + validateConfig(ctx.getConf()); String poolName = mapSessionToPoolName(userName); // TODO: do query parallelism enforcement here based on the policies and pools. while (true) { @@ -174,7 +262,7 @@ private void redistributePoolAllocations( double totalAlloc = 0; assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); - poolsLock.readLock().lock(); + sessionsRulesLock.readLock().lock(); try { PoolState pool = pools.get(poolName); synchronized (pool.lock) { @@ -192,7 +280,7 @@ private void redistributePoolAllocations( sessionsToUpdate = new ArrayList<>(pool.sessions); } } finally { - poolsLock.readLock().unlock(); + sessionsRulesLock.readLock().unlock(); } allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); } @@ -233,13 +321,13 @@ private String mapSessionToPoolName(String userName) { return "llap"; } - private void validateConfig(HiveConf conf) throws HiveException { + private void validateConfig(Configuration conf) throws HiveException { String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); if ((queueName != null) && !queueName.isEmpty()) { LOG.warn("Ignoring " + TezConfiguration.TEZ_QUEUE_NAME + "=" + queueName); conf.set(TezConfiguration.TEZ_QUEUE_NAME, yarnQueue); } - if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { // Should this also just be ignored? Throw for now, doAs unlike queue is often set by admin. throw new HiveException(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is not supported"); } @@ -254,16 +342,21 @@ public void start() throws Exception { expirationTracker.start(); } allocationManager.start(); + if (!isRuleValidatorStarted) { + setupRuleValidator(); + } } public void stop() throws Exception { - List sessionsToClose = null; + List sessionsToClose; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); + sessionsToClose = new ArrayList<>(openSessions); } - for (TezSessionState sessionState : sessionsToClose) { + + for (TezSessionPoolSession sessionState : sessionsToClose) { sessionState.close(false); } + if (expirationTracker != null) { expirationTracker.stop(); } @@ -316,16 +409,22 @@ private WmTezSession ensureOwnedSession(TezSessionState oldSession) { /** Called by TezSessionPoolSession when opened. */ @Override public void registerOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { - openSessions.put(session, true); + sessionsRulesLock.writeLock().lock(); + try { + openSessions.add(session); + } finally { + sessionsRulesLock.writeLock().unlock(); } } /** Called by TezSessionPoolSession when closed. */ @Override public void unregisterOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { + sessionsRulesLock.writeLock().lock(); + try { openSessions.remove(session); + } finally { + sessionsRulesLock.writeLock().unlock(); } } @@ -366,4 +465,22 @@ public void destroy(TezSessionState session) throws Exception { protected final HiveConf getConf() { return conf; } + + public RuleFetcher getResourcePlanRulesFetcher() { + return resourcePlanRulesFetcher; + } + + public void setResourcePlanRulesFetcher(final RuleFetcher resourcePlanRulesFetcher) { + this.resourcePlanRulesFetcher = resourcePlanRulesFetcher; + refreshRules(); + } + + public void setGlobalRulesFetcher(final RuleFetcher globalRulesFetcher) { + this.globalRulesFetcher = globalRulesFetcher; + refreshRules(); + } + + public RuleFetcher getGlobalRulesFetcher() { + return globalRulesFetcher; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 0de9de5..7785817 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -26,11 +26,14 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.hadoop.hive.ql.wm.JobCounterListener; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.common.log.ProgressMonitor; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.RuleContext; +import org.apache.hadoop.hive.ql.wm.TimeCounterLimit; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; @@ -41,15 +44,20 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.tez.dag.api.client.Progress; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InterruptedIOException; import java.io.StringWriter; -import java.util.HashSet; +import java.util.EnumSet; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; @@ -61,6 +69,7 @@ Licensed to the Apache Software Foundation (ASF) under one public class TezJobMonitor { static final String CLASS_NAME = TezJobMonitor.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private static final int MIN_CHECK_INTERVAL = 200; private static final int MAX_CHECK_INTERVAL = 1000; private static final int MAX_RETRY_INTERVAL = 2500; @@ -146,7 +155,7 @@ public int monitorExecution() { DAGStatus.State lastState = null; boolean running = false; - int checkInterval = MIN_CHECK_INTERVAL; + long checkInterval = MIN_CHECK_INTERVAL; while (true) { try { @@ -154,7 +163,14 @@ public int monitorExecution() { context.checkHeartbeaterLockException(); } - status = dagClient.getDAGStatus(new HashSet(), checkInterval); + status = dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS), checkInterval); + RuleContext ruleContext = context.getRuleContext(); + if (ruleContext != null) { + JobCounterListener jobCounterListener = ruleContext.getJobCounterListener(); + if (jobCounterListener != null) { + publishCounterUpdates(status.getDAGCounters(), jobCounterListener); + } + } vertexProgressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); @@ -235,8 +251,7 @@ public int monitorExecution() { } catch (IOException | TezException tezException) { // best effort } - console - .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); + console.printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e)); rc = 1; done = true; } else { @@ -263,6 +278,36 @@ public int monitorExecution() { return rc; } + private void publishCounterUpdates(final TezCounters dagCounters, final JobCounterListener jobCounterListener) { + if (dagCounters != null && jobCounterListener != null && jobCounterListener.getSubscribedCounters() != null) { + Set subscribedCounters = jobCounterListener.getSubscribedCounters(); + Map updatedCounters = new HashMap<>(); + StreamSupport.stream(dagCounters.spliterator(), false) + .forEach(cg -> cg.getUnderlyingGroup().forEach( + cgb -> { + String counterName = cgb.getName(); + if (subscribedCounters.contains(counterName)) { + updatedCounters.put(counterName, cgb.getValue()); + } + } + )); + + if (subscribedCounters.contains(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name())) { + updatedCounters.put(TimeCounterLimit.TimeCounter.ELAPSED_TIME.name(), context.getRuleContext() + .getQueryInfo().getElapsedTime()); + } + + if (subscribedCounters.contains(TimeCounterLimit.TimeCounter.EXECUTION_TIME.name()) && executionStartTime > 0) { + updatedCounters.put(TimeCounterLimit.TimeCounter.EXECUTION_TIME.name(), + System.currentTimeMillis() - executionStartTime); + } + + if (!updatedCounters.isEmpty()) { + jobCounterListener.updateCounters(updatedCounters); + } + } + } + private void printSummary(boolean success, Map progressMap) { if (isProfilingEnabled() && success && progressMap != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java new file mode 100644 index 0000000..0fc5eaa --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CounterLimit.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Counter limit interface for defining counter name and its limit exceeding which action defined in trigger will get + * executed. + */ +public interface CounterLimit { + /** + * Get the name of the counter. + * + * @return name + */ + String getName(); + + /** + * Get the threshold value for the counter + * + * @return limit + */ + long getLimit(); + + /** + * Return cloned copy of this counter limit + * + * @return cloned copy + */ + CounterLimit clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java new file mode 100644 index 0000000..add933f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Expression that is defined in triggers. + * Most expressions will get triggered only after exceeding a limit. As a result, only greater than (>) expression + * is supported. + */ +public interface Expression { + + enum Predicate { + GREATER_THAN(">"); + + String symbol; + + Predicate(final String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + } + + interface Builder { + Builder greaterThan(CounterLimit counter); + + Expression build(); + } + + /** + * Evaluate current value against this expression. Return true if expression evaluates to true (current > limit) + * else false otherwise + * + * @param current - current value against which expression will be evaluated + * @return + */ + boolean evaluate(final long current); + + /** + * Return counter limit + * + * @return counter limit + */ + CounterLimit getCounterLimit(); + + /** + * Return predicate defined in the expression. + * + * @return predicate + */ + Predicate getPredicate(); + + /** + * Return cloned copy of this expression. + * + * @return cloned copy + */ + Expression clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java new file mode 100644 index 0000000..f19bf82 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.Validator; + +/** + * Factory to create expressions + */ +public class ExpressionFactory { + + public static Expression fromString(final String expression) { + if (expression == null || expression.isEmpty()) { + return null; + } + + // TODO: Only ">" predicate is supported right now, this has to be extended to support expression tree when + // multiple conditions are required. HIVE-17622 + + String[] tokens = expression.split(Expression.Predicate.GREATER_THAN.getSymbol()); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid predicate in expression"); + } + + final String counterName = tokens[0].trim(); + final String counterValueStr = tokens[1].trim(); + if (counterName.isEmpty()) { + throw new IllegalArgumentException("Counter name cannot be empty!"); + } + + long counterValue; + for (FileSystemCounterLimit.FSCounter fsCounter : FileSystemCounterLimit.FSCounter.values()) { + if (counterName.toUpperCase().endsWith(fsCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, new Validator.SizeValidator()); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + // this is file system counter, valid and create counter + FileSystemCounterLimit fsCounterLimit = FileSystemCounterLimit.fromName(counterName, counterValue); + return createExpression(fsCounterLimit); + } + } + + for (TimeCounterLimit.TimeCounter timeCounter : TimeCounterLimit.TimeCounter.values()) { + if (counterName.equalsIgnoreCase(timeCounter.name())) { + try { + counterValue = getCounterValue(counterValueStr, new Validator.TimeValidator(TimeUnit.MILLISECONDS)); + if (counterValue < 0) { + throw new IllegalArgumentException("Illegal value for counter limit. Expected a positive long value."); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid counter value: " + counterValueStr); + } + TimeCounterLimit timeCounterLimit = new TimeCounterLimit( + TimeCounterLimit.TimeCounter.valueOf(counterName.toUpperCase()), counterValue); + return createExpression(timeCounterLimit); + } + } + + // unable to create expression at this point, invalid expression + throw new IllegalArgumentException("Invalid expression! " + expression); + } + + private static long getCounterValue(final String counterValueStr, final Validator validator) throws + NumberFormatException { + long counter; + try { + counter = Long.parseLong(counterValueStr); + } catch (NumberFormatException e) { + if (validator instanceof Validator.SizeValidator) { + counter = HiveConf.toSizeBytes(counterValueStr); + } else if (validator instanceof Validator.TimeValidator) { + counter = HiveConf.toTime(counterValueStr, TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS); + } else { + throw e; + } + } + return counter; + } + + static Expression createExpression(CounterLimit counterLimit) { + return new TriggerExpression(counterLimit, Expression.Predicate.GREATER_THAN); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java new file mode 100644 index 0000000..45d6bf8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * File system specific counters with defined limits + */ +public class FileSystemCounterLimit implements CounterLimit { + + public enum FSCounter { + BYTES_READ, + BYTES_WRITTEN, + SHUFFLE_BYTES + } + + private String scheme; + private FSCounter fsCounter; + private long limit; + + public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase(); + this.fsCounter = fsCounter; + this.limit = limit; + } + + public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + String counterNameStr = counterName.toUpperCase(); + for (FSCounter fsCounter : FSCounter.values()) { + if (counterNameStr.endsWith(fsCounter.name())) { + int startIdx = counterNameStr.indexOf(fsCounter.name()); + if (startIdx == 0) { // exact match + return new FileSystemCounterLimit(null, FSCounter.valueOf(counterName), limit); + } else { + String scheme = counterNameStr.substring(0, startIdx - 1); + // schema/counter name validation will be done in grammar as part of HIVE-17622 + return new FileSystemCounterLimit(scheme, FSCounter.valueOf(fsCounter.name()), limit); + } + } + } + + throw new IllegalArgumentException("Invalid counter name specified " + counterName.toUpperCase() + ""); + } + + @Override + public String getName() { + return scheme.isEmpty() ? fsCounter.name() : scheme.toUpperCase() + "_" + fsCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new FileSystemCounterLimit(scheme, fsCounter, limit); + } + + @Override + public String toString() { + return "counter: " + getName() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * scheme.hashCode(); + hash += 31 * fsCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof FileSystemCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return scheme.equals(((FileSystemCounterLimit) other).scheme) && fsCounter.equals(( + (FileSystemCounterLimit) other).fsCounter) && limit == ((FileSystemCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/JobCounterListener.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/JobCounterListener.java new file mode 100644 index 0000000..9d82243 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/JobCounterListener.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Map; +import java.util.Set; + +/** + * + */ +public interface JobCounterListener { + Set getSubscribedCounters(); + + void updateCounters(Map counters); + + Map getCounters(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalRulesFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalRulesFetcher.java new file mode 100644 index 0000000..3fc7f6b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalRulesFetcher.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; + +/** + * Fetch global (non-llap) rules from metastore + */ +public class MetastoreGlobalRulesFetcher implements RuleFetcher { + + @Override + public List fetch(final String resourcePlanName) { + // TODO: implement after integration + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanRulesFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanRulesFetcher.java new file mode 100644 index 0000000..ee5a73e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanRulesFetcher.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; + +/** + * Fetch pool specific rules from metastore + */ +public class MetastoreResourcePlanRulesFetcher implements RuleFetcher { + + @Override + public List fetch(final String resourcePlanName) { + // TODO: implement after integration. + return null; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java new file mode 100644 index 0000000..8a0e554 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/QueryRule.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Rule with query level scope that contains a name, trigger expression violating which defined action will be executed. + */ +public class QueryRule implements Rule { + private String name; + private Expression expression; + private Action action; + + public QueryRule(final String name, final Expression expression, final Action action) { + this.name = name; + this.expression = expression; + this.action = action; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Action getAction() { + return action; + } + + @Override + public Rule clone() { + return new QueryRule(name, expression.clone(), action); + } + + @Override + public boolean apply(final long current) { + if (expression.evaluate(current)) { + switch (action) { + case KILL_QUERY: + return true; + case MOVE_TO_POOL: + // FIXME: act upon the Action + return true; + } + } + return false; + } + + @Override + public String toString() { + return "{ name: " + name + ", expression: " + expression + ", action: " + action + " }"; + } + + @Override + public int hashCode() { + int hash = name == null ? 31 : 31 * name.hashCode(); + hash += expression == null ? 31 * hash : 31 * hash * expression.hashCode(); + hash += action == null ? 31 * hash : 31 * hash * action.hashCode(); + return hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof QueryRule)) { + return false; + } + + if (other == this) { + return true; + } + + QueryRule otherQR = (QueryRule) other; + return Objects.equals(name, otherQR.name) && + Objects.equals(expression, otherQR.expression) && + Objects.equals(action, otherQR.action); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java new file mode 100644 index 0000000..7e95e13 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Rule.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Rule interface which gets mapped to CREATE RULE .. queries. A rule can have a name, trigger and action. + * Trigger is a simple expression which gets evaluated during the lifecycle of query and executes an action + * if the expression defined in trigger evaluates to true. + */ +public interface Rule { + + enum Action { + KILL_QUERY(""), + MOVE_TO_POOL(""); + + String poolName; + + Action(final String poolName) { + this.poolName = poolName; + } + + public Action setPoolName(final String poolName) { + this.poolName = poolName; + return this; + } + + public String getPoolName() { + return poolName; + } + } + + /** + * Based on current value, returns true if rule is applied else false. + * + * @param current - current value + * @return true if rule got applied false otherwise + */ + boolean apply(long current); + + /** + * Get trigger expression + * + * @return expression + */ + Expression getExpression(); + + /** + * Return the name of the rule + * + * @return rule name + */ + String getName(); + + /** + * Return the action that will get executed when trigger expression evaluates to true + * + * @return action + */ + Action getAction(); + + /** + * Return cloned copy of this rule + * + * @return clone copy + */ + Rule clone(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleActionHandler.java new file mode 100644 index 0000000..cb55e7f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleActionHandler.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; + +/** + * Interface for handling rule violations by queries and for performing actions defined in the rules. + */ +public interface RuleActionHandler { + /** + * Applies the action defined in the rule for the specified queries + * + * @param queriesViolated - violated queries and the rule it violated + */ + void applyAction(Map queriesViolated); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleContext.java new file mode 100644 index 0000000..5d62f89 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleContext.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import org.apache.hadoop.hive.ql.QueryInfo; + +/** + * Some context information that are required for rule evaluation. + */ +public class RuleContext { + private JobCounterListener jobCounterListener; + private QueryInfo queryInfo; + + public RuleContext(final QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } + + public void setJobCounterListener(final JobCounterListener jobCounterListener) { + this.jobCounterListener = jobCounterListener; + } + + public JobCounterListener getJobCounterListener() { + return jobCounterListener; + } + + public QueryInfo getQueryInfo() { + return queryInfo; + } + + public void setQueryInfo(final QueryInfo queryInfo) { + this.queryInfo = queryInfo; + } + + public String getQueryId() { + return queryInfo.getQueryDisplay().getQueryId(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFetcher.java new file mode 100644 index 0000000..8549366 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/RuleFetcher.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; + +/** + * Interface to fetch rules + */ +public interface RuleFetcher { + List fetch(final String resourcePlanName); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java new file mode 100644 index 0000000..687fa5e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +/** + * Time based counters with limits + */ +public class TimeCounterLimit implements CounterLimit { + public enum TimeCounter { + ELAPSED_TIME, + EXECUTION_TIME + } + + private TimeCounter timeCounter; + private long limit; + + public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + this.timeCounter = timeCounter; + this.limit = limit; + } + + @Override + public String getName() { + return timeCounter.name(); + } + + @Override + public long getLimit() { + return limit; + } + + @Override + public CounterLimit clone() { + return new TimeCounterLimit(timeCounter, limit); + } + + @Override + public String toString() { + return "counter: " + timeCounter.name() + " limit: " + limit; + } + + @Override + public int hashCode() { + int hash = 31 * timeCounter.hashCode(); + hash += 31 * limit; + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TimeCounterLimit)) { + return false; + } + + if (other == this) { + return true; + } + + return timeCounter.equals(((TimeCounterLimit) other).timeCounter) && limit == ((TimeCounterLimit) other).limit; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java new file mode 100644 index 0000000..065ab79 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Simple trigger expression for a rule. + */ +public class TriggerExpression implements Expression { + private CounterLimit counterLimit; + private Predicate predicate; + + public TriggerExpression(final CounterLimit counter, final Predicate predicate) { + this.counterLimit = counter; + this.predicate = predicate; + } + + @Override + public boolean evaluate(final long current) { + if (counterLimit.getLimit() > 0) { + if (predicate.equals(Predicate.GREATER_THAN)) { + return current > counterLimit.getLimit(); + } + } + return false; + } + + @Override + public CounterLimit getCounterLimit() { + return counterLimit; + } + + @Override + public Predicate getPredicate() { + return predicate; + } + + @Override + public Expression clone() { + return new TriggerExpression(counterLimit.clone(), predicate); + } + + @Override + public String toString() { + return counterLimit.getName() + " " + predicate.getSymbol() + " " + counterLimit.getLimit(); + } + + @Override + public int hashCode() { + int hash = counterLimit == null ? 31 : 31 * counterLimit.hashCode(); + hash += predicate == null ? 31 * hash : 31 * hash * predicate.hashCode(); + return 31 * hash; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof TriggerExpression)) { + return false; + } + + if (other == this) { + return true; + } + + return Objects.equals(counterLimit, ((TriggerExpression) other).counterLimit) && + Objects.equals(predicate, ((TriggerExpression) other).predicate); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 7adf895..d73759d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -18,19 +18,27 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.tez.dag.api.TezConfiguration; - -import java.util.List; - import org.junit.Test; public class TestWorkloadManager { - private static class MockQam implements QueryAllocationManager { + public static class MockQam implements QueryAllocationManager { boolean isCalled = false; @Override @@ -52,10 +60,10 @@ void assertWasCalled() { } } - private static class WorkloadManagerForTest extends WorkloadManager { + public static class WorkloadManagerForTest extends WorkloadManager { - WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam) { + public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, + QueryAllocationManager qam) { super(yarnQueue, conf, numSessions, qam, null); } @@ -73,39 +81,41 @@ protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { @Test(timeout = 10000) public void testReuse() throws Exception { HiveConf conf = createConf(); + Context ctx = createMockContext(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); TezSessionState nonPool = mock(TezSessionState.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, null, conf); + TezSessionState session = wm.getSession(nonPool, null, ctx); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); when(diffPool.getConf()).thenReturn(conf); doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, null, conf); + session = wm.getSession(diffPool, null, ctx); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, null, conf); + TezSessionState session2 = wm.getSession(session, null, ctx); assertSame(session, session2); } @Test(timeout = 10000) public void testQueueName() throws Exception { HiveConf conf = createConf(); + Context ctx = createMockContext(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, null, conf); + TezSessionState session = wm.getSession(null, null, ctx); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); - session = wm.getSession(session, null, conf); + session = wm.getSession(session, null, ctx); assertEquals("test", session.getQueueName()); } @@ -117,10 +127,11 @@ public void testQueueName() throws Exception { public void testReopen() throws Exception { // We should always get a different object, and cluster fraction should be propagated. HiveConf conf = createConf(); + Context ctx = createMockContext(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, null, ctx); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); WmTezSession session2 = (WmTezSession) session.reopen(conf, null); @@ -134,13 +145,14 @@ public void testReopen() throws Exception { public void testDestroyAndReturn() throws Exception { // Session should not be lost; however the fraction should be discarded. HiveConf conf = createConf(); + Context ctx = createMockContext(conf); MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + WmTezSession session = (WmTezSession) wm.getSession(null, null, ctx); assertEquals(1.0, session.getClusterFraction(), EPSILON); qam.assertWasCalled(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf); + WmTezSession session2 = (WmTezSession) wm.getSession(null, null, ctx); assertEquals(0.5, session.getClusterFraction(), EPSILON); assertEquals(0.5, session2.getClusterFraction(), EPSILON); qam.assertWasCalled(); @@ -151,7 +163,7 @@ public void testDestroyAndReturn() throws Exception { qam.assertWasCalled(); // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, null, conf); + session = (WmTezSession) wm.getSession(null, null, ctx); session.returnToSessionManager(); assertEquals(1.0, session2.getClusterFraction(), EPSILON); assertEquals(0.0, session.getClusterFraction(), EPSILON); @@ -170,4 +182,10 @@ private HiveConf createConf() { conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, ""); return conf; } + + private Context createMockContext(HiveConf hiveConf) throws IOException { + Context ctx = mock(Context.class); + when(ctx.getConf()).thenReturn(hiveConf); + return ctx; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java new file mode 100644 index 0000000..011b018 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestRule.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * + */ +public class TestRule { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testSimpleQueryRule() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = new QueryRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = new QueryRule("hdfs_write_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + rule = new QueryRule("local_read_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + rule = new QueryRule("local_write_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + rule = new QueryRule("shuffle_heavy", expression, Rule.Action.KILL_QUERY); + assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(1025)); + + expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 10000)); + rule = new QueryRule("slow_query", expression, Rule.Action.MOVE_TO_POOL.setPoolName("fake_pool")); + assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString()); + assertFalse(rule.apply(1000)); + assertTrue(rule.apply(100000)); + } + + @Test + public void testExpressionFromString() { + Expression expression = ExpressionFactory.fromString("BYTES_READ>1024"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString("BYTES_READ > 1024"); + assertEquals(expected, expression); + + expression = ExpressionFactory.fromString(expected.toString()); + assertEquals(expected.toString(), expression.toString()); + + assertEquals(expected.hashCode(), expression.hashCode()); + expression = ExpressionFactory.fromString(" BYTES_READ > 1024 "); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString("BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_READ > 1024 "); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" HDFS_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_READ > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" S3A_BYTES_WRITTEN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" s3a_ByTeS_WRiTTeN > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("s3a", + FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1024"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" EXECUTION_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .EXECUTION_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testSizeValidationInRule() { + Expression expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100MB"); + Expression expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 gB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 1 TB"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024L * 1024 * 1024 * 1024)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100 B"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" SHUFFLE_BYTES > 100bytes"); + expected = ExpressionFactory.createExpression(new FileSystemCounterLimit("", + FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 100)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalSizeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300GiB"); + } + + @Test + public void testIllegalSizeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString(" SHUFFLE_BYTES > 300 foo"); + } + + @Test + public void testTimeValidationInRule() { + Expression expression = ExpressionFactory.fromString(" elapsed_TIME > 300 s"); + Expression expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300 sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300s"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300seconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300sec"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 300000000 microseconds"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 300000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + + expression = ExpressionFactory.fromString(" elapsed_TIME > 1DAY"); + expected = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter + .ELAPSED_TIME, 24 * 60 * 60 * 1000)); + assertEquals(expected, expression); + assertEquals(expected.hashCode(), expression.hashCode()); + } + + @Test + public void testIllegalTimeCounterValue1() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300 light years"); + } + + @Test + public void testIllegalTimeCounterValue2() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid time unit"); + ExpressionFactory.fromString(" elapsed_TIME > 300secTOR"); + } + + @Test + public void testRuleClone() { + Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", + FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); + Rule rule = new QueryRule("hdfs_read_heavy", expression, Rule.Action.KILL_QUERY); + Rule clonedRule = rule.clone(); + assertNotEquals(System.identityHashCode(rule), System.identityHashCode(clonedRule)); + assertNotEquals(System.identityHashCode(rule.getExpression()), System.identityHashCode(clonedRule.getExpression())); + assertNotEquals(System.identityHashCode(rule.getExpression().getCounterLimit()), + System.identityHashCode(clonedRule.getExpression().getCounterLimit())); + assertEquals(rule, clonedRule); + assertEquals(rule.hashCode(), clonedRule.hashCode()); + + expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); + rule = new QueryRule("slow_query", expression, Rule.Action.KILL_QUERY); + clonedRule = rule.clone(); + assertNotEquals(System.identityHashCode(rule), System.identityHashCode(clonedRule)); + assertNotEquals(System.identityHashCode(rule.getExpression()), System.identityHashCode(clonedRule.getExpression())); + assertNotEquals(System.identityHashCode(rule.getExpression().getCounterLimit()), + System.identityHashCode(clonedRule.getExpression().getCounterLimit())); + assertEquals(rule, clonedRule); + assertEquals(rule.hashCode(), clonedRule.hashCode()); + } + + @Test + public void testIllegalExpressionsUnsupportedPredicate() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ < 1024"); + } + + @Test + public void testIllegalExpressionsMissingLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ >"); + } + + @Test + public void testIllegalExpressionsMissingCounter() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Counter name cannot be empty!"); + ExpressionFactory.fromString("> 1024"); + } + + @Test + public void testIllegalExpressionsMultipleLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ > 1024 > 1025"); + } + + @Test + public void testIllegalExpressionsMultipleCounters() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid predicate in expression"); + ExpressionFactory.fromString("BYTES_READ > BYTES_READ > 1025"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPost() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid size unit"); + ExpressionFactory.fromString("BYTES_READ > 1024aaaa"); + } + + @Test + public void testIllegalExpressionsInvalidLimitPre() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid counter value"); + ExpressionFactory.fromString("BYTES_READ > foo1024"); + } + + @Test + public void testIllegalExpressionsInvalidNegativeLimit() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Illegal value for counter limit. Expected a positive long value."); + ExpressionFactory.fromString("BYTES_READ > -1024"); + } +} diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 1cf4392..8224bcc 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -210,7 +210,9 @@ private void addOperation(Operation operation) { private Operation removeOperation(OperationHandle opHandle) { Operation operation = handleToOperation.remove(opHandle); - queryIdOperation.remove(getQueryId(operation)); + String queryId = getQueryId(operation); + queryIdOperation.remove(queryId); + LOG.info("Removed queryId: {} corresponding to operation: {}", queryId, opHandle); if (operation instanceof SQLOperation) { removeSafeQueryInfo(opHandle); } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 5cb973c..3ada6b2 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -107,6 +107,7 @@ private CuratorFramework zooKeeperClient; private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens private HttpServer webServer; // Web UI + private TezSessionPoolManager tezSessionPoolManager; private WorkloadManager wm; public HiveServer2() { @@ -121,6 +122,11 @@ public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { MetricsFactory.init(hiveConf); } + + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + tezSessionPoolManager.setupPool(hiveConf); + } } catch (Throwable t) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } @@ -520,6 +526,24 @@ public synchronized void start() { throw new ServiceException(e); } } + if (tezSessionPoolManager != null) { + try { + tezSessionPoolManager.startPool(); + LOG.info("Started tez session pool manager.."); + } catch (Exception e) { + LOG.error("Error starting tez session pool manager: ", e); + throw new ServiceException(e); + } + } + if (wm != null) { + try { + wm.start(); + LOG.info("Started workload manager.."); + } catch (Exception e) { + LOG.error("Error starting workload manager", e); + throw new ServiceException(e); + } + } } @Override @@ -554,9 +578,10 @@ public synchronized void stop() { } // There should already be an instance of the session pool manager. // If not, ignoring is fine while stopping HiveServer2. - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS) && + tezSessionPoolManager != null) { try { - TezSessionPoolManager.getInstance().stop(); + tezSessionPoolManager.stop(); } catch (Exception e) { LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " + "Shutting down HiveServer2 anyway.", e); @@ -606,13 +631,6 @@ private static void startHiveServer2() throws Throwable { TimeUnit.MILLISECONDS); HiveServer2 server = null; try { - // Initialize the pool before we start the server; don't start yet. - TezSessionPoolManager sessionPool = null; - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - sessionPool = TezSessionPoolManager.getInstance(); - sessionPool.setupPool(hiveConf); - } - // Cleanup the scratch dir before starting ServerUtils.cleanUpScratchDir(hiveConf); // Schedule task to cleanup dangling scratch dir periodically, @@ -632,13 +650,6 @@ private static void startHiveServer2() throws Throwable { "warned upon.", t); } - if (sessionPool != null) { - sessionPool.startPool(); - } - if (server.wm != null) { - server.wm.start(); - } - if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { SparkSessionManagerImpl.getInstance().setup(hiveConf); } @@ -668,6 +679,19 @@ private static void startHiveServer2() throws Throwable { } } + private static TezSessionPoolManager initTezSessionPool(final HiveConf hiveConf) throws Exception { + TezSessionPoolManager sessionPool = null; + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + sessionPool = TezSessionPoolManager.getInstance(); + sessionPool.setupPool(hiveConf); + } + return sessionPool; + } + + private void startTezSessionPool(final TezSessionPoolManager sessionPool) throws Exception { + + } + /** * Remove all znodes corresponding to the given version number from ZooKeeper *