diff --git a/data/conf/hive-log4j2.properties b/data/conf/hive-log4j2.properties index 73c7d90..e5bb166 100644 --- a/data/conf/hive-log4j2.properties +++ b/data/conf/hive-log4j2.properties @@ -50,7 +50,7 @@ appender.DRFA.strategy.type = DefaultRolloverStrategy appender.DRFA.strategy.max = 30 # list of all loggers -loggers = HadoopIPC, HadoopSecurity, Hdfs, HdfsServer, HadoopMetrics2, Mortbay, Yarn, YarnServer, Tez, HadoopConf, Zookeeper, ServerCnxn, NIOServerCnxn, ClientCnxn, ClientCnxnSocket, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, Operator, Serde2Lazy, ObjectStore, CalcitePlanner, AmazonAws, ApacheHttp +loggers = HadoopIPC, HadoopSecurity, Hdfs, HdfsServer, HadoopMetrics2, Mortbay, Yarn, YarnServer, Tez, HadoopConf, Zookeeper, ServerCnxn, NIOServerCnxn, ClientCnxn, ClientCnxnSocket, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, Operator, Serde2Lazy, ObjectStore, CalcitePlanner, AmazonAws, ApacheHttp, Thrift, Jetty, BlockStateChange logger.HadoopIPC.name = org.apache.hadoop.ipc logger.HadoopIPC.level = WARN @@ -127,6 +127,15 @@ logger.AmazonAws.level = INFO logger.ApacheHttp.name=org.apache.http logger.ApacheHttp.level = INFO +logger.Thrift.name = org.apache.thrift +logger.Thrift.level = WARN + +logger.Jetty.name = org.eclipse.jetty +logger.Jetty.level = WARN + +logger.BlockStateChange.name = BlockStateChange +logger.BlockStateChange.level = WARN + # root logger rootLogger.level = ${sys:hive.log.level} rootLogger.appenderRefs = root, console diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java index 68d57ca..235e6c3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; @@ -102,7 +103,7 @@ public static void afterTest() throws Exception { } } - private void createSleepUDF() throws SQLException { + void createSleepUDF() throws SQLException { String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); Connection con = hs2Conn; Statement stmt = con.createStatement(); @@ -110,7 +111,7 @@ private void createSleepUDF() throws SQLException { stmt.close(); } - protected void runQueryWithTrigger(final String query, final List setCmds, + void runQueryWithTrigger(final String query, final List setCmds, final String expect) throws Exception { @@ -149,7 +150,7 @@ protected void runQueryWithTrigger(final String query, final List setCmd abstract void setupTriggers(final List triggers) throws Exception; - protected List getConfigs(String... more) { + List getConfigs(String... more) { List setCmds = new ArrayList<>(); setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); setCmds.add("set mapred.min.split.size=100"); @@ -161,4 +162,11 @@ protected void runQueryWithTrigger(final String query, final List setCmd } return setCmds; } + + WMTrigger wmTriggerFromTrigger(Trigger trigger) { + WMTrigger result = new WMTrigger("rp", trigger.getName()); + result.setTriggerExpression(trigger.getExpression().toString()); + result.setActionExpression(trigger.getAction().toString()); + return result; + } } \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java new file mode 100644 index 0000000..a983855 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.plan; +import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.pool; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.Action; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest { + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + conf.setBoolean("hive.test.workload.management", true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Test(timeout = 60000) + public void testTriggerMoveAndKill() throws Exception { + Expression moveExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000"); + Trigger moveTrigger = new ExecutionTrigger("slow_query_move", moveExpression, + new Action(Action.Type.MOVE_TO_POOL, "ETL")); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression, + new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, killTrigger + " violated"); + } + + @Test(timeout = 60000) + public void testTriggerMoveEscapeKill() throws Exception { + Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000"); + Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression, + new Action(Action.Type.MOVE_TO_POOL, "ETL")); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression, + new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList()); + String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col==t2.under_col"; + runQueryWithTrigger(query, null, null); + } + + @Test(timeout = 60000) + public void testTriggerMoveConflictKill() throws Exception { + Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Expression killExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression, + new Action(Action.Type.MOVE_TO_POOL, "ETL")); + Trigger killTrigger = new ExecutionTrigger("kill_big_read", killExpression, + new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList()); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, killTrigger + " violated"); + } + + @Override + protected void setupTriggers(final List triggers) throws Exception { + setupTriggers(triggers, new ArrayList<>()); + } + + private void setupTriggers(final List biTriggers, final List etlTriggers) throws Exception { + WorkloadManager wm = WorkloadManager.getInstance(); + WMPool biPool = pool("BI", 1, 0.8f); + WMPool etlPool = pool("ETL", 1, 0.2f); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(biPool, etlPool)); + plan.getPlan().setDefaultPoolPath("BI"); + + for (Trigger trigger : biTriggers) { + plan.addToTriggers(wmTriggerFromTrigger(trigger)); + plan.addToPoolTriggers(new WMPoolTrigger("BI", trigger.getName())); + } + + for (Trigger trigger : etlTriggers) { + plan.addToTriggers(wmTriggerFromTrigger(trigger)); + plan.addToPoolTriggers(new WMPoolTrigger("ETL", trigger.getName())); + } + wm.updateResourcePlanAsync(plan).get(10, TimeUnit.SECONDS); + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java index fb3af932..bcce3dc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; @@ -36,7 +37,7 @@ @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); - Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -46,7 +47,7 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { @Test(timeout = 60000) public void testTriggerTotalTasks() throws Exception { Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50"); - Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index 7b87a65..b377275 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; @@ -37,7 +38,7 @@ @Test(timeout = 60000) public void testTriggerSlowQueryElapsedTime() throws Exception { Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 20000"); - Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -47,7 +48,7 @@ public void testTriggerSlowQueryElapsedTime() throws Exception { @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); - Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -57,7 +58,7 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { @Test(timeout = 60000) public void testTriggerHighShuffleBytes() throws Exception { Expression expression = ExpressionFactory.fromString("SHUFFLE_BYTES > 100"); - Trigger trigger = new ExecutionTrigger("big_shuffle", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("big_shuffle", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); List cmds = new ArrayList<>(); cmds.add("set hive.auto.convert.join=false"); @@ -72,7 +73,7 @@ public void testTriggerHighShuffleBytes() throws Exception { @Test(timeout = 60000) public void testTriggerHighBytesRead() throws Exception { Expression expression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); - Trigger trigger = new ExecutionTrigger("big_read", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("big_read", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -82,7 +83,7 @@ public void testTriggerHighBytesRead() throws Exception { @Test(timeout = 60000) public void testTriggerHighBytesWrite() throws Exception { Expression expression = ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"); - Trigger trigger = new ExecutionTrigger("big_write", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("big_write", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -92,7 +93,7 @@ public void testTriggerHighBytesWrite() throws Exception { @Test(timeout = 60000) public void testTriggerTotalTasks() throws Exception { Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50"); - Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -102,11 +103,11 @@ public void testTriggerTotalTasks() throws Exception { @Test(timeout = 60000) public void testTriggerCustomReadOps() throws Exception { Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50"); - Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated"); } @Test(timeout = 120000) @@ -114,20 +115,20 @@ public void testTriggerCustomCreatedFiles() throws Exception { List cmds = getConfigs(); Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); - Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "create table testtab2 as select * from " + tableName; - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); // partitioned insert expression = ExpressionFactory.fromString("CREATED_FILES > 10"); - trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); cmds.add("drop table src3"); cmds.add("create table src3 (key int) partitioned by (value string)"); query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName + " where under_col < 100"; - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); } @Test(timeout = 240000) @@ -140,9 +141,9 @@ public void testTriggerCustomCreatedDynamicPartitions() throws Exception { String query = "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20"); - Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); cmds = getConfigs(); // let it create 57 partitions without any triggers @@ -154,9 +155,9 @@ public void testTriggerCustomCreatedDynamicPartitions() throws Exception { // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30"); - trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); // let it create 64 more partitions (total 57 + 64 = 121) without any triggers query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; @@ -166,7 +167,7 @@ public void testTriggerCustomCreatedDynamicPartitions() throws Exception { // re-run insert into but this time no new partitions will be created, so there will be no violation query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10"); - trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); runQueryWithTrigger(query, cmds, null); } @@ -184,9 +185,9 @@ public void testTriggerCustomCreatedDynamicPartitionsMultiInsert() throws Except " insert overwrite table src2 partition (value) select * where under_col < 100 " + " insert overwrite table src3 partition (value) select * where under_col >= 100 and under_col < 200"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); - Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); } @Test(timeout = 60000) @@ -203,15 +204,15 @@ public void testTriggerCustomCreatedDynamicPartitionsUnionAll() throws Exception "union all " + "select * from " + tableName + " where under_col >= 100 and under_col < 200) temps"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); - Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); } @Test(timeout = 60000) public void testTriggerCustomNonExistent() throws Exception { Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50"); - Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("non_existent", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col"; @@ -221,9 +222,9 @@ public void testTriggerCustomNonExistent() throws Exception { @Test(timeout = 60000) public void testMultipleTriggers1() throws Exception { Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000"); - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY)); Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -233,9 +234,9 @@ public void testMultipleTriggers1() throws Exception { @Test(timeout = 60000) public void testMultipleTriggers2() throws Exception { Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY)); Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 100000"); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 79ba1f4..0506f67 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,44 +16,37 @@ package org.apache.hive.jdbc; -import org.slf4j.Logger; - -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; import java.io.File; import java.net.URL; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMMapping; import org.apache.hadoop.hive.metastore.api.WMPool; import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; import org.apache.hadoop.hive.metastore.api.WMResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; import org.junit.BeforeClass; +import com.google.common.collect.Lists; + public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManager { - private final static Logger LOG = LoggerFactory.getLogger(TestTriggersWorkloadManager.class); @BeforeClass public static void beforeTest() throws Exception { Class.forName(MiniHS2.getJdbcDriverName()); String confDir = "../../data/conf/llap/"; - if (confDir != null && !confDir.isEmpty()) { - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); - } + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); @@ -73,7 +66,7 @@ public static void beforeTest() throws Exception { dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - Map confOverlay = new HashMap(); + Map confOverlay = new HashMap<>(); miniHS2.start(confOverlay); miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); } @@ -93,12 +86,4 @@ protected void setupTriggers(final List triggers) throws Exception { } wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS); } - - private WMTrigger wmTriggerFromTrigger(Trigger trigger) { - WMTrigger result = new WMTrigger("rp", trigger.getName()); - result.setTriggerExpression(trigger.getExpression().toString()); // TODO: hmm - result.setActionExpression(trigger.getAction().toString()); // TODO: hmm - LOG.debug("Produced " + result + " from " + trigger); - return result; - } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java new file mode 100644 index 0000000..076b4eb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KillMoveTriggerActionHandler implements TriggerActionHandler { + private static final Logger LOG = LoggerFactory.getLogger(KillMoveTriggerActionHandler.class); + private final WorkloadManager wm; + + KillMoveTriggerActionHandler(final WorkloadManager wm) { + this.wm = wm; + } + + @Override + public void applyAction(final Map queriesViolated) { + TezSessionState sessionState; + Map> moveFutures = new HashMap<>(queriesViolated.size()); + for (Map.Entry entry : queriesViolated.entrySet()) { + switch (entry.getValue().getAction().getType()) { + case KILL_QUERY: + sessionState = entry.getKey(); + String queryId = sessionState.getTriggerContext().getQueryId(); + try { + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); + } catch (HiveException e) { + LOG.warn("Unable to kill query {} for trigger violation"); + } + break; + case MOVE_TO_POOL: + sessionState = entry.getKey(); + if (sessionState instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) sessionState; + String destPoolName = entry.getValue().getAction().getPoolName(); + Future moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName); + if (moveFuture != null) { + moveFutures.put(wmTezSession, moveFuture); + } + } else { + throw new RuntimeException("WmTezSession is expected. Got: " + sessionState.getClass().getSimpleName() + + ". SessionId: " + sessionState.getSessionId()); + } + break; + default: + throw new RuntimeException("Unsupported action: " + entry.getValue()); + } + } + + for (Map.Entry> entry : moveFutures.entrySet()) { + WmTezSession wmTezSession = entry.getKey(); + Future moveFuture = entry.getValue(); + try { + // block to make sure move happened successfully + if (moveFuture.get()) { + LOG.info("Moved session {} to pool {}", wmTezSession.getSessionId(), wmTezSession.getPoolName()); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception while moving session {}", wmTezSession.getSessionId(), e); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 474fae9..8c60b6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,9 +32,9 @@ private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); @Override - public void applyAction(final Map queriesViolated) { - for (Map.Entry entry : queriesViolated.entrySet()) { - switch (entry.getValue()) { + public void applyAction(final Map queriesViolated) { + for (Map.Entry entry : queriesViolated.entrySet()) { + switch (entry.getValue().getAction().getType()) { case KILL_QUERY: TezSessionState sessionState = entry.getKey(); String queryId = sessionState.getTriggerContext().getQueryId(); @@ -42,7 +42,7 @@ public void applyAction(final Map queriesViolat KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java new file mode 100644 index 0000000..8f29197 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerPoolTriggerValidatorRunnable implements Runnable { + protected static transient Logger LOG = LoggerFactory.getLogger(PerPoolTriggerValidatorRunnable.class); + private final Map sessionTriggerProviders; + private final TriggerActionHandler triggerActionHandler; + private final Map poolValidators; + private final long triggerValidationIntervalMs; + + PerPoolTriggerValidatorRunnable(final Map sessionTriggerProviders, + final TriggerActionHandler triggerActionHandler, + final long triggerValidationIntervalMs) { + this.sessionTriggerProviders = sessionTriggerProviders; + this.triggerActionHandler = triggerActionHandler; + this.poolValidators = new HashMap<>(); + this.triggerValidationIntervalMs = triggerValidationIntervalMs; + } + + @Override + public void run() { + try { + ScheduledExecutorService validatorExecutorService = Executors + .newScheduledThreadPool(sessionTriggerProviders.size()); + for (Map.Entry entry : sessionTriggerProviders.entrySet()) { + String poolName = entry.getKey(); + if (!poolValidators.containsKey(poolName)) { + LOG.info("Creating trigger validator for pool: {}", poolName); + TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler); + validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + poolValidators.put(poolName, poolValidator); + } + } + } catch (Throwable t) { + // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch + LOG.warn(PerPoolTriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index be8b9a4..ad8a5c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,11 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; -import java.net.URISyntaxException; -import javax.security.auth.login.LoginException; -import org.apache.tez.dag.api.TezException; - import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -30,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -193,10 +189,12 @@ public void initTriggers(final HiveConf conf) throws HiveException { } if (triggerValidatorRunnable == null) { + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars + .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch()); triggerActionHandler = new KillTriggerActionHandler(); - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - startTriggerValidator(conf); + triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler); + startTriggerValidator(triggerValidationIntervalMs); } } @@ -370,16 +368,6 @@ public void destroy(TezSessionState tezSessionState) throws Exception { } @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; - } - - @Override TriggerValidatorRunnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -520,8 +508,8 @@ public void registerOpenSession(TezSessionPoolSession session) { private void updateSessionsTriggers() { if (sessionTriggerProvider != null && globalTriggersFetcher != null) { - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); + sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions)); + sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); } } @@ -552,7 +540,7 @@ public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlo public List getTriggerCounterNames() { List counterNames = new ArrayList<>(); if (sessionTriggerProvider != null) { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); + List activeTriggers = sessionTriggerProvider.getTriggers(); for (Trigger trigger : activeTriggers) { counterNames.add(trigger.getExpression().getCounterLimit().getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 6887d7a..db2848c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -73,18 +73,12 @@ TezSessionState reopen(TezSessionState session, Configuration conf, } public static abstract class AbstractTriggerValidator { - abstract SessionTriggerProvider getSessionTriggerProvider(); + abstract Runnable getTriggerValidatorRunnable(); - abstract TriggerActionHandler getTriggerActionHandler(); - - abstract TriggerValidatorRunnable getTriggerValidatorRunnable(); - - public void startTriggerValidator(Configuration conf) { - long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + public void startTriggerValidator(long triggerValidationIntervalMs) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, triggerValidationIntervalMs, TimeUnit.MILLISECONDS); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 33bccbe..5821659 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; @@ -41,29 +42,51 @@ @Override public void run() { try { - Map violatedSessions = new HashMap<>(); - final List sessions = sessionTriggerProvider.getOpenSessions(); - final List triggers = sessionTriggerProvider.getActiveTriggers(); - for (TezSessionState s : sessions) { - TriggerContext triggerContext = s.getTriggerContext(); - if (triggerContext != null && !triggerContext.isQueryCompleted()) { + Map violatedSessions = new HashMap<>(); + final List sessions = sessionTriggerProvider.getSessions(); + final List triggers = sessionTriggerProvider.getTriggers(); + for (TezSessionState sessionState : sessions) { + TriggerContext triggerContext = sessionState.getTriggerContext(); + if (triggerContext != null && !triggerContext.isQueryCompleted() + && !triggerContext.getCurrentCounters().isEmpty()) { Map currentCounters = triggerContext.getCurrentCounters(); - for (Trigger t : triggers) { - String desiredCounter = t.getExpression().getCounterLimit().getName(); + for (Trigger currentTrigger : triggers) { + String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check if (currentCounters.containsKey(desiredCounter)) { long currentCounterValue = currentCounters.get(desiredCounter); - if (t.apply(currentCounterValue)) { - String queryId = s.getTriggerContext().getQueryId(); - LOG.info("Query {} violated trigger {}. Current counter value: {}. Going to apply action {}", queryId, - t, currentCounterValue, t.getAction()); - Trigger.Action action = t.getAction(); - String msg = "Trigger " + t + " violated. Current value: " + currentCounterValue; - action.setMsg(msg); - violatedSessions.put(s, action); + if (currentTrigger.apply(currentCounterValue)) { + String queryId = sessionState.getTriggerContext().getQueryId(); + if (violatedSessions.containsKey(sessionState)) { + // session already has a violation + Trigger existingTrigger = violatedSessions.get(sessionState); + // KILL always takes priority over MOVE + if (existingTrigger.getAction().getType().equals(Action.Type.MOVE_TO_POOL) && + currentTrigger.getAction().getType().equals(Action.Type.KILL_QUERY)) { + currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " + + currentCounterValue); + violatedSessions.put(sessionState, currentTrigger); + LOG.info("KILL trigger replacing MOVE for query {}", queryId); + } else { + // if multiple MOVE happens, only first move will be chosen + LOG.warn("Conflicting MOVE triggers ({} and {}). Choosing the first MOVE trigger: {}", + existingTrigger, currentTrigger, existingTrigger.getName()); + } + } else { + // first violation for the session + currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " + + currentCounterValue); + violatedSessions.put(sessionState, currentTrigger); + } } } } + + Trigger chosenTrigger = violatedSessions.get(sessionState); + if (chosenTrigger != null) { + LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(), + chosenTrigger.getViolationMsg()); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java deleted file mode 100644 index c46569b..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.tez; - -import java.util.Map; - -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.wm.Trigger; -import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TriggerViolationActionHandler implements TriggerActionHandler { - private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class); - - @Override - public void applyAction(final Map queriesViolated) { - for (Map.Entry entry : queriesViolated.entrySet()) { - switch (entry.getValue()) { - case KILL_QUERY: - TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getTriggerContext().getQueryId(); - try { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); - } catch (HiveException e) { - LOG.warn("Unable to kill query {} for trigger violation"); - } - break; - default: - throw new RuntimeException("Unsupported action: " + entry.getValue()); - } - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 0dd1433..6cf2aad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -20,8 +20,6 @@ import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.hive.ql.exec.Utilities; - import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -188,6 +186,11 @@ void setIsIrrelevantForWm(String killReason) { this.killReason = killReason; } + @Override + public String toString() { + return super.toString() + ", poolName: " + poolName + ", clusterFraction: " + clusterFraction; + } + private final class TimeoutRunnable implements Runnable { @Override public void run() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 16f5dce..96a4b08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -1,4 +1,5 @@ /** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,12 +18,6 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -33,17 +28,17 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.lang3.StringUtils; + +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -56,19 +51,25 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; -import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; -import org.apache.hadoop.hive.ql.wm.Trigger.Action; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Workload management entry point for HS2. */ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator - implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { + implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); private static final char POOL_SEPARATOR = '.'; private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR; @@ -83,7 +84,7 @@ // sessions, so the pool itself could internally track the sessions it gave out, since // calling close on an unopened session is probably harmless. private final IdentityHashMap openSessions = - new IdentityHashMap<>(); + new IdentityHashMap<>(); private final int amRegistryTimeoutMs; // Note: pools can only be modified by the master thread. @@ -94,9 +95,7 @@ // We index the get requests to make sure there are no ordering artifacts when we requeue. private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE); - private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; - private TriggerValidatorRunnable triggerValidatorRunnable; + private PerPoolTriggerValidatorRunnable triggerValidatorRunnable; // Note: we could use RW lock to allow concurrent calls for different sessions, however all // those calls do is add elements to lists and maps; and we'd need to sync those separately @@ -108,6 +107,7 @@ private final EventState one = new EventState(), two = new EventState(); private boolean hasChanges = false; private EventState current = one; + private Map perPoolProviders = new ConcurrentHashMap<>(); /** The master thread the processes the events from EventState. */ @VisibleForTesting @@ -133,6 +133,7 @@ public void onFailure(Throwable t) { // TODO: this is temporary before HiveServerEnvironment is merged. private static volatile WorkloadManager INSTANCE; + public static WorkloadManager getInstance() { return INSTANCE; } @@ -156,52 +157,29 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso LOG.info("Initializing with " + totalQueryParallelism + " total query parallelism"); this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( - conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); + conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true, - new TezSessionPool.SessionObjectFactory() { - @Override - public WmTezSession create(WmTezSession oldSession) { - return createSession(oldSession == null ? null : oldSession.getConf()); - } - }); + oldSession -> createSession(oldSession == null ? null : oldSession.getConf())); restrictedConfig = new RestrictedConfigChecker(conf); allocationManager = qam; // Only creates the expiration tracker if expiration is configured. expirationTracker = SessionExpirationTracker.create(conf, this); - ThreadFactory workerFactory = new ThreadFactory() { - private final AtomicInteger threadNumber = new AtomicInteger(-1); - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Workload management worker " + threadNumber.incrementAndGet()); - t.setDaemon(true); - return t; - } - }; - workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, - ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), workerFactory); - ThreadFactory timeoutFactory = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Workload management timeout thread"); - t.setDaemon(true); - return t; - } - }; - timeoutPool = Executors.newScheduledThreadPool(1, timeoutFactory); - wmThread = new Thread(new Runnable() { - @Override - public void run() { - runWmThread(); - } - }, "Workload management master"); + workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management worker %d").build()); + + timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Workload management timeout thread").build()); + + wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); - // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers) - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new TriggerViolationActionHandler(); - triggerValidatorRunnable = new TriggerValidatorRunnable( - getSessionTriggerProvider(), getTriggerActionHandler()); - startTriggerValidator(conf); + + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); + triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, + triggerValidationIntervalMs); + startTriggerValidator(triggerValidationIntervalMs); } private int determineQueryParallelism(WMFullResourcePlan plan) { @@ -243,18 +221,59 @@ public void stop() throws Exception { INSTANCE = null; } + private void updateSessionTriggerProvidersOnMasterThread() { + if (pools != null) { + for (Map.Entry entry : pools.entrySet()) { + String poolName = entry.getKey(); + PoolState poolState = entry.getValue(); + final List triggers = Collections.unmodifiableList(poolState.getTriggers()); + final List sessionStates = Collections.unmodifiableList(poolState.getSessions()); + if (perPoolProviders.containsKey(poolName)) { + perPoolProviders.get(poolName).setTriggers(triggers); + perPoolProviders.get(poolName).setSessions(sessionStates); + } else { + SessionTriggerProvider sessionTriggerProvider = new SessionTriggerProvider(sessionStates, triggers); + perPoolProviders.put(poolName, sessionTriggerProvider); + } + } + } + } + + @VisibleForTesting + public Map getAllSessionTriggerProviders() { + return perPoolProviders; + } + /** Represent a single iteration of work for the master thread. */ private final static class EventState { private final Set toReturn = Sets.newIdentityHashSet(), - toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet(); + toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet(); private final LinkedList initResults = new LinkedList<>(); private final IdentityHashMap> toReopen = - new IdentityHashMap<>(); + new IdentityHashMap<>(); private final LinkedList getRequests = new LinkedList<>(); private final IdentityHashMap toReuse = new IdentityHashMap<>(); private WMFullResourcePlan resourcePlanToApply = null; private boolean hasClusterStateChanged = false; private SettableFuture testEvent, applyRpFuture; + private final Set moveSessions = Sets.newIdentityHashSet(); + } + + private final static class MoveSession { + private final WmTezSession srcSession; + private final String destPool; + private final SettableFuture future; + + public MoveSession(final WmTezSession srcSession, final String destPool) { + this.srcSession = srcSession; + this.destPool = destPool; + this.future = SettableFuture.create(); + } + + @Override + public String toString() { + return srcSession.getSessionId() + " moving from " + srcSession.getPoolName() + " to " + destPool; + } } /** @@ -263,7 +282,7 @@ public void stop() throws Exception { */ private final static class WmThreadSyncWork { private LinkedList toRestartInUse = new LinkedList<>(), - toDestroyNoRestart = new LinkedList<>(); + toDestroyNoRestart = new LinkedList<>(); } private void runWmThread() { @@ -289,6 +308,7 @@ private void runWmThread() { LOG.info("Processing current events"); processCurrentEvents(currentEvents, syncWork); scheduleWork(syncWork); + updateSessionTriggerProvidersOnMasterThread(); } catch (InterruptedException ex) { LOG.warn("WM thread was interrupted and will now exit"); return; @@ -363,7 +383,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } LOG.info("Destroying {}", sessionToDestroy); Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread( - e, sessionToDestroy, poolsToRedistribute); + e, sessionToDestroy, poolsToRedistribute); if (shouldReturn == null || shouldReturn) { // Restart if this session is still relevant, even if there's an internal error. syncWork.toRestartInUse.add(sessionToDestroy); @@ -375,7 +395,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw for (WmTezSession sessionToReturn: e.toReturn) { LOG.info("Returning {}", sessionToReturn); Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread( - e, sessionToReturn, poolsToRedistribute); + e, sessionToReturn, poolsToRedistribute); if (shouldReturn == null) { // Restart if there's an internal error. syncWork.toRestartInUse.add(sessionToReturn); @@ -393,7 +413,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw for (Map.Entry> entry : e.toReopen.entrySet()) { LOG.info("Reopening {}", entry.getKey()); handeReopenRequestOnMasterThread( - e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork); + e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork); } e.toReopen.clear(); @@ -414,7 +434,21 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } e.resourcePlanToApply = null; - // 6. Handle all the get/reuse requests. We won't actually give out anything here, but merely + // 6. Handle any move session requests. The way move session works right now is + // a) sessions get moved to destination pool if there is capacity in destination pool + // b) if there is no capacity in destination pool, the session gets killed (since we cannot pause a query) + // TODO: in future this the process of killing can be delayed until the point where a session is actually required. + // We could consider delaying the move (when destination capacity is full) until there is claim in src pool. + // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long + // as possible + if (e.moveSessions != null) { + for (MoveSession moveSession : e.moveSessions) { + handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute); + } + } + e.moveSessions.clear(); + + // 7. Handle all the get/reuse requests. We won't actually give out anything here, but merely // map all the requests and place them in an appropriate order in pool queues. The only // exception is the reuse without queue contention; can be granted immediately. If we can't // reuse the session immediately, we will convert the reuse to a normal get, because we @@ -426,14 +460,14 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } e.toReuse.clear(); - // 7. If there was a cluster state change, make sure we redistribute all the pools. + // 8. If there was a cluster state change, make sure we redistribute all the pools. if (e.hasClusterStateChanged) { LOG.info("Processing a cluster state change"); poolsToRedistribute.addAll(pools.keySet()); e.hasClusterStateChanged = false; } - // 8. Finally, for all the pools that have changes, promote queued queries and rebalance. + // 9. Finally, for all the pools that have changes, promote queued queries and rebalance. for (String poolName : poolsToRedistribute) { if (LOG.isDebugEnabled()) { LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName)); @@ -441,7 +475,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues); } - // 9. Notify tests and global async ops. + // 10. Notify tests and global async ops. if (e.testEvent != null) { e.testEvent.set(true); e.testEvent = null; @@ -452,10 +486,65 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } } + private void handleMoveSessionOnMasterThread(final MoveSession moveSession, + final WmThreadSyncWork syncWork, + final HashSet poolsToRedistribute) { + String destPoolName = moveSession.destPool; + LOG.info("Handling move session event: {}", moveSession); + if (validMove(moveSession.srcSession, destPoolName)) { + // remove from src pool + Boolean removed = checkAndRemoveSessionFromItsPool(moveSession.srcSession, poolsToRedistribute); + if (removed != null && removed) { + // check if there is capacity in dest pool, if so move else kill the session + Boolean capacityAvailable = checkCapacityDestination(destPoolName); + if (capacityAvailable != null && capacityAvailable) { + // add to destination pool + Boolean added = checkAndAddSessionToAnotherPool(moveSession.srcSession, destPoolName, poolsToRedistribute); + if (added != null && added) { + moveSession.future.set(true); + return; + } else { + LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession); + } + } else if (capacityAvailable != null && !capacityAvailable) { + moveSession.srcSession.clearWm(); + moveSession.srcSession.setIsIrrelevantForWm("Destination pool " + destPoolName + " is full. Killing query."); + syncWork.toRestartInUse.add(moveSession.srcSession); + } else { + LOG.error("Failed to move session: {}. Destination pool {} is removed.", moveSession, destPoolName); + } + } else { + LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession); + } + } else { + LOG.error("Validation failed for move session: {}. Invalid move or session/pool got removed.", moveSession); + } + + moveSession.future.set(false); + } + + private Boolean checkCapacityDestination(final String destPoolName) { + PoolState destPool = pools.get(destPoolName); + if (destPool != null) { + return destPool.getTotalActiveSessions() < destPool.queryParallelism; + } + return null; + } + + private boolean validMove(final WmTezSession srcSession, final String destPool) { + return srcSession != null && + destPool != null && + !srcSession.isIrrelevantForWm() && + srcSession.getPoolName() != null && + pools.containsKey(srcSession.getPoolName()) && + pools.containsKey(destPool) && + !srcSession.getPoolName().equalsIgnoreCase(destPool); + } + // ========= Master thread methods private void handleInitResultOnMasterThread( - SessionInitContext sw, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + SessionInitContext sw, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { // For the failures, the users have been notified, we just need to clean up. There's no // session here (or it's unused), so no conflicts are possible. We just remove it. // For successes, the user has also been notified, so various requests are also possible; @@ -480,7 +569,7 @@ private void handleInitResultOnMasterThread( if (pool == null || !pool.initializingSessions.remove(sw)) { // Query parallelism might be fubar. LOG.error("Cannot remove initializing session from the pool " - + sw.poolName + " - internal error"); + + sw.poolName + " - internal error"); } poolsToRedistribute.add(sw.poolName); if (session != null) { @@ -495,7 +584,7 @@ private void handleInitResultOnMasterThread( } private Boolean handleReturnedInUseSessionOnMasterThread( - EventState e, WmTezSession session, HashSet poolsToRedistribute) { + EventState e, WmTezSession session, HashSet poolsToRedistribute) { // This handles the common logic for destroy and return - everything except // the invalid combination of destroy and return themselves, as well as the actual // statement that destroys or returns it. @@ -514,8 +603,8 @@ private Boolean handleReturnedInUseSessionOnMasterThread( } private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session, - SettableFuture future, HashSet poolsToRedistribute, - WmThreadSyncWork syncWork) throws Exception { + SettableFuture future, HashSet poolsToRedistribute, + WmThreadSyncWork syncWork) throws Exception { if (e.updateErrors.remove(session)) { LOG.info("Ignoring an update error for a session being reopened"); } @@ -536,7 +625,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session return; } else if (!isRemoved) { future.setException(new RuntimeException("WM killed this session during reopen: " - + session.getReasonForKill())); + + session.getReasonForKill())); return; // No longer relevant for WM - bail. } // If pool didn't exist, removeSessionFromItsPool would have returned null. @@ -550,7 +639,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session } private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError, - EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { GetRequest reuseRequest = e.toReuse.remove(sessionWithUpdateError); if (reuseRequest != null) { // This session is bad, so don't allow reuse; just convert it to normal get. @@ -573,7 +662,7 @@ private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError } private void applyNewResourcePlanOnMasterThread( - EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { int totalQueryParallelism = 0; // FIXME: Add Triggers from metastore to poolstate // Note: we assume here that plan has been validated beforehand, so we don't verify @@ -623,9 +712,7 @@ private void applyNewResourcePlanOnMasterThread( if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) { Map triggers = new HashMap<>(); for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) { - // TODO: parse trigger.getActionExpression() correctly; right now the Action enum is invalid. - ExecutionTrigger execTrigger = new ExecutionTrigger(trigger.getTriggerName(), - ExpressionFactory.fromString(trigger.getTriggerExpression()), Action.KILL_QUERY); + ExecutionTrigger execTrigger = ExecutionTrigger.fromWMTrigger(trigger); triggers.put(trigger.getTriggerName(), execTrigger); } for (WMPoolTrigger poolTrigger : e.resourcePlanToApply.getPoolTriggers()) { @@ -658,7 +745,7 @@ private void applyNewResourcePlanOnMasterThread( } if (deltaSessions != 0) { failOnFutureFailure(tezAmPool.resizeAsync( - deltaSessions, syncWork.toDestroyNoRestart)); + deltaSessions, syncWork.toDestroyNoRestart)); } } @@ -714,6 +801,7 @@ private void queueGetRequestOnMasterThread( req.future.set(req.sessionToReuse); return; } + // Otherwise, queue the session and make sure we update this pool. pool.queue.addLast(req); poolsToRedistribute.add(poolName); @@ -721,13 +809,14 @@ private void queueGetRequestOnMasterThread( private void processPoolChangesOnMasterThread( - String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception { + String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception { PoolState pool = pools.get(poolName); if (pool == null) return; // Might be from before the new resource plan. // 1. First, start the queries from the queue. int queriesToStart = Math.min(pool.queue.size(), - pool.queryParallelism - pool.getTotalActiveSessions()); + pool.queryParallelism - pool.getTotalActiveSessions()); + if (queriesToStart > 0) { LOG.info("Starting {} queries in pool {}", queriesToStart, pool); } @@ -763,18 +852,10 @@ private void processPoolChangesOnMasterThread( // Note: If allocation manager does not have cluster state, it won't update anything. When the // cluster state changes, it will notify us, and we'd update the queries again. allocationManager.updateSessionsAsync(totalAlloc, pool.sessions); - - // 3. Update triggers for this pool. - // TODO: need to merge with per-pool enforcement, it will only work for one pool for now. - if (sessionTriggerProvider != null) { - sessionTriggerProvider.setOpenSessions( - Collections.unmodifiableList(pool.sessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(pool.triggers)); - } } private void returnSessionOnFailedReuse( - GetRequest req, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + GetRequest req, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { if (req.sessionToReuse == null) return; if (poolsToRedistribute != null) { Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute); @@ -795,7 +876,7 @@ private void returnSessionOnFailedReuse( * in WM but wasn't found in the requisite pool (internal error?). */ private Boolean checkAndRemoveSessionFromItsPool( - WmTezSession session, HashSet poolsToRedistribute) { + WmTezSession session, HashSet poolsToRedistribute) { // It is possible for some request to be queued after a main thread has decided to kill this // session; on the next iteration, we'd be processing that request with an irrelevant session. if (session.isIrrelevantForWm()) { @@ -803,16 +884,37 @@ private Boolean checkAndRemoveSessionFromItsPool( } // If we did not kill this session we expect everything to be present. String poolName = session.getPoolName(); - session.clearWm(); if (poolName != null) { poolsToRedistribute.add(poolName); PoolState pool = pools.get(poolName); - if (pool != null && pool.sessions.remove(session)) return true; + session.clearWm(); + if (pool != null && pool.sessions.remove(session)) { + return true; + } } LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session); return null; } + private Boolean checkAndAddSessionToAnotherPool( + WmTezSession session, String destPoolName, HashSet poolsToRedistribute) { + if (session.isIrrelevantForWm()) { + // This is called only during move session handling, removing session already checks this. + // So this is not expected as remove failing will not even invoke this method + LOG.error("Unexpected during add session to another pool. If remove failed this should not have been called."); + return false; + } + + PoolState destPool = pools.get(destPoolName); + if (destPool != null && destPool.sessions.add(session)) { + session.setPoolName(destPoolName); + poolsToRedistribute.add(destPoolName); + return true; + } + LOG.error("Session {} was not not added to pool {}", session, destPoolName); + return null; + } + // ===== EVENT METHODS public ListenableFuture updateResourcePlanAsync(WMFullResourcePlan plan) { @@ -824,7 +926,7 @@ private Boolean checkAndRemoveSessionFromItsPool( if (current.resourcePlanToApply != null) { LOG.warn("Several resource plans are being applied at the same time; using the latest"); current.applyRpFuture.setException( - new HiveException("Another plan was applied in parallel")); + new HiveException("Another plan was applied in parallel")); } current.resourcePlanToApply = plan; current.applyRpFuture = applyRpFuture; @@ -835,6 +937,20 @@ private Boolean checkAndRemoveSessionFromItsPool( return applyRpFuture; } + public Future applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) { + currentLock.lock(); + MoveSession moveSession; + try { + moveSession = new MoveSession(srcSession, destPoolName); + current.moveSessions.add(moveSession); + LOG.info("Queued move session: {}", moveSession); + notifyWmThreadUnderLock(); + } finally { + currentLock.unlock(); + } + return moveSession.future; + } + private final static class GetRequest { public static final Comparator ORDER_COMPARATOR = new Comparator() { @Override @@ -976,12 +1092,13 @@ public void notifyInitializationCompleted(SessionInitContext initCtx) { @Override public TezSessionState reopen(TezSessionState session, Configuration conf, - String[] additionalFiles) throws Exception { + String[] additionalFiles) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); HiveConf sessionConf = wmTezSession.getConf(); if (sessionConf == null) { LOG.warn("Session configuration is null for " + wmTezSession); sessionConf = new HiveConf(conf, WorkloadManager.class); + } // TODO: ideally, we should handle reopen the same way no matter what. However, the cases // with additional files will have to wait until HIVE-17827 is unfucked, because it's @@ -1111,28 +1228,27 @@ protected final HiveConf getConf() { return conf; } - public List getTriggerCounterNames() { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); - List counterNames = new ArrayList<>(); - for (Trigger trigger : activeTriggers) { - counterNames.add(trigger.getExpression().getCounterLimit().getName()); + public List getTriggerCounterNames(final TezSessionState session) { + if (session instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) session; + String poolName = wmTezSession.getPoolName(); + PoolState poolState = pools.get(poolName); + if (poolState != null) { + List counterNames = new ArrayList<>(); + List triggers = poolState.getTriggers(); + if (triggers != null) { + for (Trigger trigger : triggers) { + counterNames.add(trigger.getExpression().getCounterLimit().getName()); + } + } + return counterNames; + } } - return counterNames; - } - - - @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; + return null; } @Override - TriggerValidatorRunnable getTriggerValidatorRunnable() { + Runnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -1163,7 +1279,7 @@ public int getTotalActiveSessions() { } public void update(int queryParallelism, double fraction, - List toKill, EventState e) { + List toKill, EventState e) { this.finalFraction = this.finalFractionRemaining = fraction; this.queryParallelism = queryParallelism; // TODO: two possible improvements @@ -1183,7 +1299,7 @@ public void update(int queryParallelism, double fraction, } public void destroy(List toKill, LinkedList globalQueue, - IdentityHashMap toReuse) { + IdentityHashMap toReuse) { extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill); // All the pending get requests should just be requeued elsewhere. // Note that we never queue session reuse so sessionToReuse would be null. @@ -1202,16 +1318,20 @@ public double updateAllocationPercentages() { return finalFractionRemaining - allocation * initializingSessions.size(); } + public LinkedList getSessions() { + return sessions; + } + @Override public String toString() { return "[" + fullName + ", query parallelism " + queryParallelism - + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " - + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() - + ", initializing sessions " + initializingSessions.size() + "]"; + + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " + + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() + + ", initializing sessions " + initializingSessions.size() + "]"; } private void extractAllSessionsToKill(String killReason, - IdentityHashMap toReuse, List toKill) { + IdentityHashMap toReuse, List toKill) { for (WmTezSession sessionToKill : sessions) { resetRemovedSession(sessionToKill, killReason, toReuse); toKill.add(sessionToKill); @@ -1231,7 +1351,7 @@ private void extractAllSessionsToKill(String killReason, } private void resetRemovedSession(WmTezSession sessionToKill, String killReason, - IdentityHashMap toReuse) { + IdentityHashMap toReuse) { assert killReason != null; sessionToKill.setIsIrrelevantForWm(killReason); sessionToKill.clearWm(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index c7ed534..315a2dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -48,7 +48,7 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, // Note: this may just block to wait for a session based on parallelism. LOG.info("Getting a WM session for " + input); TezSessionState result = wm.getSession(session, input, conf); - desiredCounters.addAll(wm.getTriggerCounterNames()); + desiredCounters.addAll(wm.getTriggerCounterNames(result)); return result; } catch (WorkloadManager.NoPoolMappingException ex) { return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Action.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Action.java new file mode 100644 index 0000000..921ad54 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Action.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Action that gets invoked for trigger violations. + */ +public class Action { + + public enum Type { + KILL_QUERY("KILL"), + MOVE_TO_POOL("MOVE TO"); + + String displayName; + + Type(final String displayName) { + this.displayName = displayName; + } + + public String getDisplayName() { + return displayName; + } + + @Override + public String toString() { + return displayName; + } + } + + private final Type type; + private final String poolName; + + public static Action fromMetastoreExpression(String metastoreActionExpression) { + if (metastoreActionExpression.equalsIgnoreCase(Type.KILL_QUERY.getDisplayName())) { + return new Action(Type.KILL_QUERY); + } else { + final String poolName = metastoreActionExpression.substring(Type.MOVE_TO_POOL.getDisplayName().length()).trim(); + if (poolName.isEmpty()) { + throw new IllegalArgumentException("Invalid move action expression (" + metastoreActionExpression + "). Pool " + + "name is empty"); + } else { + return new Action(Type.MOVE_TO_POOL, poolName); + } + } + } + + public Action(Type type) { + this(type, null); + } + + public Action(Type type, String poolName) { + this.type = type; + if (type == Type.MOVE_TO_POOL && (poolName == null || poolName.trim().isEmpty())) { + throw new IllegalArgumentException("Pool name cannot be null or empty for action type " + type); + } + this.poolName = poolName; + } + + public Type getType() { + return type; + } + + public String getPoolName() { + return poolName; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof Action)) { + return false; + } + + if (other == this) { + return true; + } + + Action otherAction = (Action) other; + return type == otherAction.type && Objects.equals(poolName, otherAction.poolName); + } + + @Override + public int hashCode() { + int hash = poolName == null ? 31 : 31 * poolName.hashCode(); + hash += type == null ? 31 * hash : 31 * hash * type.hashCode(); + return hash; + } + + @Override + public String toString() { + return type.getDisplayName() + (poolName == null ? "" : " " + poolName); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java index ad1f7fc..fae8dfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,7 +23,7 @@ private String counterName; private long limit; - public CustomCounterLimit(final String counterName, final long limit) { + CustomCounterLimit(final String counterName, final long limit) { this.counterName = counterName; this.limit = limit; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java index 3529011..05b3d3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,8 @@ import java.util.Objects; +import org.apache.hadoop.hive.metastore.api.WMTrigger; + /** * Trigger with query level scope that contains a name, trigger expression violating which defined action will be * executed. @@ -25,6 +27,7 @@ private String name; private Expression expression; private Action action; + private String violationMsg; public ExecutionTrigger(final String name, final Expression expression, final Action action) { this.name = name; @@ -53,6 +56,16 @@ public Trigger clone() { } @Override + public String getViolationMsg() { + return violationMsg; + } + + @Override + public void setViolationMsg(final String violationMsg) { + this.violationMsg = violationMsg; + } + + @Override public boolean apply(final long current) { return expression.evaluate(current); } @@ -85,4 +98,11 @@ public boolean equals(final Object other) { Objects.equals(expression, otherQR.expression) && Objects.equals(action, otherQR.action); } + + public static ExecutionTrigger fromWMTrigger(final WMTrigger trigger) { + final Action action = Action.fromMetastoreExpression(trigger.getActionExpression()); + ExecutionTrigger execTrigger = new ExecutionTrigger(trigger.getTriggerName(), + ExpressionFactory.fromString(trigger.getTriggerExpression()), action); + return execTrigger; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java index add933f..76753488 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -47,7 +47,7 @@ public String getSymbol() { * else false otherwise * * @param current - current value against which expression will be evaluated - * @return + * @return true if current value exceeds limit */ boolean evaluate(final long current); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java index 29f7c89..953faa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java index 656747e..e6ea997 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,13 +30,13 @@ private FSCounter fsCounter; private long limit; - public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase(); this.fsCounter = fsCounter; this.limit = limit; } - public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + static FileSystemCounterLimit fromName(final String counterName, final long limit) { String counterNameStr = counterName.toUpperCase(); for (FSCounter fsCounter : FSCounter.values()) { if (counterNameStr.endsWith(fsCounter.name())) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java index db1a037..87c007f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ */ package org.apache.hadoop.hive.ql.wm; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -22,21 +23,16 @@ /** * Fetch global (non-llap) rules from metastore */ -public class MetastoreGlobalTriggersFetcher implements TriggersFetcher { - public static final String GLOBAL_TRIGGER_NAME = "global"; - private final MetastoreResourcePlanTriggersFetcher rpTriggersFetcher; +public class MetastoreGlobalTriggersFetcher { + private static final String GLOBAL_TRIGGER_NAME = "global"; + private Hive db; public MetastoreGlobalTriggersFetcher(final Hive db) { - this.rpTriggersFetcher = new MetastoreResourcePlanTriggersFetcher(db); - } - - @Override - public List fetch(final String ignore) { - return fetch(); + this.db = db; } public List fetch() { - // TODO: - return rpTriggersFetcher.fetch(GLOBAL_TRIGGER_NAME); + // TODO: this entire class will go away, DDLTask will push RP to TezSessionPoolManager where triggers are available + return new ArrayList<>(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java deleted file mode 100644 index db390f2..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.Hive; - -/** - * Fetch pool specific rules from metastore - */ -public class MetastoreResourcePlanTriggersFetcher implements TriggersFetcher { - private final Hive db; - - public MetastoreResourcePlanTriggersFetcher(final Hive db) { - this.db = db; - } - - @Override - public List fetch(final String resourcePlanName) { - // TODO: implement after integration. - return new ArrayList<>(); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 408aa2d..06562cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,9 +15,6 @@ */ package org.apache.hadoop.hive.ql.wm; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; @@ -26,31 +23,27 @@ * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List openSessions = new ArrayList<>(); - private List activeTriggers = new ArrayList<>(); - - public SessionTriggerProvider() { - - } + private List sessions; + private List triggers; public SessionTriggerProvider(final List openSessions, final List triggers) { - this.openSessions = openSessions; - this.activeTriggers = triggers; + this.sessions = openSessions; + this.triggers = triggers; } - public void setOpenSessions(final List openSessions) { - this.openSessions = openSessions; + public List getSessions() { + return sessions; } - public void setActiveTriggers(final List activeTriggers) { - this.activeTriggers = activeTriggers; + public List getTriggers() { + return triggers; } - public List getOpenSessions() { - return Collections.unmodifiableList(openSessions); + public void setSessions(final List sessions) { + this.sessions = sessions; } - public List getActiveTriggers() { - return Collections.unmodifiableList(activeTriggers); + public void setTriggers(final List triggers) { + this.triggers = triggers; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java index 3c16e1d..89e61b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,7 +27,7 @@ private TimeCounter timeCounter; private long limit; - public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + TimeCounterLimit(final TimeCounter timeCounter, final long limit) { this.timeCounter = timeCounter; this.limit = limit; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java index 6299f2b..e41b460 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,35 +22,6 @@ */ public interface Trigger { - enum Action { - KILL_QUERY(""), - MOVE_TO_POOL(""); - - String poolName; - String msg; - - Action(final String poolName) { - this.poolName = poolName; - } - - public Action setPoolName(final String poolName) { - this.poolName = poolName; - return this; - } - - public String getPoolName() { - return poolName; - } - - public String getMsg() { - return msg; - } - - public void setMsg(final String msg) { - this.msg = msg; - } - } - /** * Based on current value, returns true if trigger is applied else false. * @@ -86,4 +57,19 @@ public void setMsg(final String msg) { * @return clone copy */ Trigger clone(); + + /** + * Set trigger violation message. When {@link #apply(long)} returns false, this can be used + * to set message for trigger violation which will be sent as response to clients. + * + * @param violationMsg violation message + */ + void setViolationMsg(String violationMsg); + + /** + * Get error message set during trigger violation. + * + * @return trigger violation message + */ + String getViolationMsg(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java index 5cd24d5..8b142da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,5 +28,5 @@ * * @param queriesViolated - violated queries and the rule it violated */ - void applyAction(Map queriesViolated); + void applyAction(Map queriesViolated); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java index a2c7a51..16072c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,6 @@ import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.ql.QueryInfo; - /** * Some context information that are required for rule evaluation. */ @@ -46,14 +44,6 @@ public void setQueryId(final String queryId) { this.queryId = queryId; } - public long getQueryStartTime() { - return queryStartTime; - } - - public void setQueryStartTime(final long queryStartTime) { - this.queryStartTime = queryStartTime; - } - public Set getDesiredCounters() { return desiredCounters; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java index 065ab79..ea25b68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java deleted file mode 100644 index c25ea3c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.List; - -/** - * Interface to fetch rules - */ -public interface TriggersFetcher { - List fetch(final String resourcePlanName); -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java index dd19ce6..7d6482a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,7 +26,7 @@ private VertexCounter vertexCounter; private long limit; - public VertexCounterLimit(final VertexCounter vertexCounter, final long limit) { + VertexCounterLimit(final VertexCounter vertexCounter, final long limit) { this.vertexCounter = vertexCounter; this.limit = limit; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 84a35cc..d69bb1d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,32 +19,45 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; +import static org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; -import org.apache.hadoop.hive.metastore.api.WMResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMMapping; -import org.apache.hadoop.hive.metastore.api.WMPool; -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; - -import com.google.common.util.concurrent.SettableFuture; - -import com.google.common.collect.Lists; import java.lang.Thread.State; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMMapping; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.tez.dag.api.TezConfiguration; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.SettableFuture; + public class TestWorkloadManager { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); @@ -624,6 +637,212 @@ public void testAmPoolInteractions() throws Exception { } @Test(timeout=10000) + public void testMoveSessions() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList( + pool("A", 1, 0.6f), pool("B", 2, 0.4f))); + plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + + // [A: 1, B: 0] + Map allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.6f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA1.getPoolName()); + + // [A: 0, B: 1] + Future future = wm.applyMoveSessionAsync(sessionA1, "B"); + assertNotNull(future.get()); + assertTrue(future.get()); + // future.get() gets resolved before updating cluster fractions (last step of event processing). so adding a delay + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("B", sessionA1.getPoolName()); + + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + // [A: 1, B: 1] + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA2.getPoolName()); + assertEquals("B", sessionA1.getPoolName()); + + // [A: 0, B: 2] + future = wm.applyMoveSessionAsync(sessionA2, "B"); + assertNotNull(future.get()); + assertTrue(future.get()); + // future.get() gets resolved before updating cluster fractions (last step of event processing). so adding a delay + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(2, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.2f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.2f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("B", sessionA2.getPoolName()); + assertEquals("B", sessionA1.getPoolName()); + + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + // [A: 1, B: 2] + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(2, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertEquals(0.6f, sessionA3.getClusterFraction(), EPSILON); + assertEquals(0.2f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.2f, sessionA1.getClusterFraction(), EPSILON); + assertEquals("A", sessionA3.getPoolName()); + assertEquals("B", sessionA2.getPoolName()); + assertEquals("B", sessionA1.getPoolName()); + + // B is maxed out on capacity, so this move should fail the session + future = wm.applyMoveSessionAsync(sessionA3, "B"); + assertNotNull(future.get()); + assertFalse(future.get()); + // future.get() gets resolved before closing a session + Thread.sleep(2000); + assertFalse(sessionA3.isOpen()); + assertNull(sessionA3.getPoolName()); + assertEquals("Destination pool B is full. Killing query.", sessionA3.getReasonForKill()); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(2, allSessionProviders.get("B").getSessions().size()); + } + + @Test(timeout=20000) + public void testMoveSessionsMultiPool() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList( + pool("A", 1, 0.4f), pool("B", 1, 0.4f), pool("B.x", 1, 0.2f), + pool("B.y", 1, 0.8f), pool("C", 1, 0.2f))); + plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"), mapping("C", "C"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + + // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] + Map allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertEquals("A", sessionA1.getPoolName()); + + // [A: 0, B: 1, B.x: 0, B.y: 0, C: 0] + Future future = wm.applyMoveSessionAsync(sessionA1, "B.y"); + assertNotNull(future.get()); + assertTrue(future.get()); + // future.get() gets resolved before updating cluster fractions (last step of event processing). so adding a delay + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(1, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.32f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("B.y").getSessions().contains(sessionA1)); + assertEquals("B.y", sessionA1.getPoolName()); + + // [A: 0, B: 0, B.x: 0, B.y: 0, C: 1] + future = wm.applyMoveSessionAsync(sessionA1, "C"); + assertNotNull(future.get()); + assertTrue(future.get()); + // future.get() gets resolved before updating cluster fractions (last step of event processing). so adding a delay + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(1, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.2f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("C").getSessions().contains(sessionA1)); + assertEquals("C", sessionA1.getPoolName()); + + // [A: 0, B: 0, B.x: 1, B.y: 0, C: 0] + future = wm.applyMoveSessionAsync(sessionA1, "B.x"); + assertNotNull(future.get()); + assertTrue(future.get()); + // future.get() gets resolved before updating cluster fractions (last step of event processing). so adding a delay + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(1, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.08f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); + assertEquals("B.x", sessionA1.getPoolName()); + + WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0] + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(1, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.4f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.08f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); + assertEquals("A", sessionA2.getPoolName()); + assertEquals("B.x", sessionA1.getPoolName()); + + // A is maxed out on capacity, so this move should fail the session + // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] + future = wm.applyMoveSessionAsync(sessionA1, "A"); + assertNotNull(future.get()); + assertFalse(future.get()); + // wait sometime for sync work to finish + Thread.sleep(2000); + assertFalse(sessionA1.isOpen()); + assertNull(sessionA1.getPoolName()); + assertEquals("Destination pool A is full. Killing query.", sessionA1.getReasonForKill()); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + + // return a loaned session goes back to tez am pool + // [A: 0, B: 0, B.x: 0, B.y: 0, C: 0] + wm.returnAfterUse(sessionA2); + // future.get() gets resolved before closing a session + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.0f, sessionA1.getClusterFraction(), EPSILON); + assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); + } + + @Test(timeout=10000) public void testAsyncSessionInitFailures() throws Exception { final HiveConf conf = createConf(); MockQam qam = new MockQam(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java index cd78545..b686783 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,9 +23,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -/** - * - */ public class TestTrigger { @org.junit.Rule public ExpectedException thrown = ExpectedException.none(); @@ -34,55 +31,55 @@ public void testSimpleQueryTrigger() { Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); - Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); - trigger = new ExecutionTrigger("hdfs_write_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("hdfs_write_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); - trigger = new ExecutionTrigger("local_read_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("local_read_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); - trigger = new ExecutionTrigger("local_write_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("local_write_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); - trigger = new ExecutionTrigger("shuffle_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("shuffle_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter .EXECUTION_TIME, 10000)); - trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.MOVE_TO_POOL.setPoolName("fake_pool")); + trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.MOVE_TO_POOL,"fake_pool")); assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); expression = ExpressionFactory.createExpression(new VertexCounterLimit(VertexCounterLimit.VertexCounter .TOTAL_TASKS,10000)); - trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); expression = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); - trigger = new ExecutionTrigger("write_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("write_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); @@ -293,10 +290,21 @@ public void testIllegalTimeCounterValue2() { } @Test + public void testActionFromMetastoreStr() { + assertEquals(Action.Type.KILL_QUERY, Action.fromMetastoreExpression("KILL").getType()); + assertEquals(Action.Type.MOVE_TO_POOL, Action.fromMetastoreExpression("MOVE TO bi").getType()); + assertEquals("MOVE TO etl", Action.fromMetastoreExpression("MOVE TO etl").toString()); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid move action expression (MOVE TO ). Pool name is empty"); + assertEquals(Action.Type.MOVE_TO_POOL, Action.fromMetastoreExpression("MOVE TO ").getType()); + } + + @Test public void testTriggerClone() { Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); - Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, new Action(Action.Type.KILL_QUERY)); Trigger clonedTrigger = trigger.clone(); assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger)); assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression())); @@ -306,7 +314,7 @@ public void testTriggerClone() { assertEquals(trigger.hashCode(), clonedTrigger.hashCode()); expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); - trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); clonedTrigger = trigger.clone(); assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger)); assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression()));