diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java index 68d57ca..b155244 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -110,7 +110,7 @@ private void createSleepUDF() throws SQLException { stmt.close(); } - protected void runQueryWithTrigger(final String query, final List setCmds, + void runQueryWithTrigger(final String query, final List setCmds, final String expect) throws Exception { @@ -149,7 +149,7 @@ protected void runQueryWithTrigger(final String query, final List setCmd abstract void setupTriggers(final List triggers) throws Exception; - protected List getConfigs(String... more) { + List getConfigs(String... more) { List setCmds = new ArrayList<>(); setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); setCmds.add("set mapred.min.split.size=100"); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java new file mode 100644 index 0000000..6cdd1cb --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersMoveWorkloadManager.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.plan; +import static org.apache.hadoop.hive.ql.exec.tez.TestWorkloadManager.pool; + +import java.io.File; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMMapping; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; +import org.apache.hadoop.hive.metastore.api.WMTrigger; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.Action; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class TestTriggersMoveWorkloadManager extends AbstractJdbcTriggersTest { + private final static Logger LOG = LoggerFactory.getLogger(TestTriggersMoveWorkloadManager.class); + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + conf.setBoolean("hive.test.workload.management", true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Test(timeout = 60000) + public void testTriggerMoveAndKill() throws Exception { + Expression moveExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000"); + Trigger moveTrigger = new ExecutionTrigger("slow_query_move", moveExpression, + new Action(Action.Type.MOVE_TO_POOL, "ETL")); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression, + new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList(killTrigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, killTrigger + " violated"); + } + + @Test(timeout = 60000) + public void testTriggerMoveEscapeKill() throws Exception { + Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Expression killExpression = ExpressionFactory.fromString("EXECUTION_TIME > 5000"); + Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression, + new Action(Action.Type.MOVE_TO_POOL, "ETL")); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", killExpression, + new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList()); + String query = "select sleep(t1.under_col, 1), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col==t2.under_col"; + runQueryWithTrigger(query, null, null); + } + + @Test(timeout = 60000) + public void testTriggerMoveConflictKill() throws Exception { + Expression moveExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Expression killExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); + Trigger moveTrigger = new ExecutionTrigger("move_big_read", moveExpression, + new Action(Action.Type.MOVE_TO_POOL, "ETL")); + Trigger killTrigger = new ExecutionTrigger("kill_big_read", killExpression, + new Action(Action.Type.KILL_QUERY)); + setupTriggers(Lists.newArrayList(moveTrigger, killTrigger), Lists.newArrayList()); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, killTrigger + " violated"); + } + + @Override + protected void setupTriggers(final List triggers) throws Exception { + setupTriggers(triggers, new ArrayList<>()); + } + + private void setupTriggers(final List biTriggers, final List etlTriggers) throws Exception { + WorkloadManager wm = WorkloadManager.getInstance(); + WMPool biPool = pool("BI", 1, 0.8f); + WMPool etlPool = pool("ETL", 1, 0.2f); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(biPool, etlPool)); + plan.getPlan().setDefaultPoolPath("BI"); + + for (Trigger trigger : biTriggers) { + plan.addToTriggers(wmTriggerFromTrigger(trigger)); + plan.addToPoolTriggers(new WMPoolTrigger("BI", trigger.getName())); + } + + for (Trigger trigger : etlTriggers) { + plan.addToTriggers(wmTriggerFromTrigger(trigger)); + plan.addToPoolTriggers(new WMPoolTrigger("ETL", trigger.getName())); + } + wm.updateResourcePlanAsync(plan).get(10, TimeUnit.SECONDS); + } + + private WMTrigger wmTriggerFromTrigger(Trigger trigger) { + WMTrigger result = new WMTrigger("rp", trigger.getName()); + result.setTriggerExpression(trigger.getExpression().toString()); + result.setActionExpression(trigger.getAction().toString()); + LOG.debug("Produced " + result + " from " + trigger); + return result; + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java index fb3af932..bcce3dc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; @@ -36,7 +37,7 @@ @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); - Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -46,7 +47,7 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { @Test(timeout = 60000) public void testTriggerTotalTasks() throws Exception { Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50"); - Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index 7b87a65..b377275 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; @@ -37,7 +38,7 @@ @Test(timeout = 60000) public void testTriggerSlowQueryElapsedTime() throws Exception { Expression expression = ExpressionFactory.fromString("ELAPSED_TIME > 20000"); - Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 500), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -47,7 +48,7 @@ public void testTriggerSlowQueryElapsedTime() throws Exception { @Test(timeout = 60000) public void testTriggerSlowQueryExecutionTime() throws Exception { Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); - Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -57,7 +58,7 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { @Test(timeout = 60000) public void testTriggerHighShuffleBytes() throws Exception { Expression expression = ExpressionFactory.fromString("SHUFFLE_BYTES > 100"); - Trigger trigger = new ExecutionTrigger("big_shuffle", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("big_shuffle", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); List cmds = new ArrayList<>(); cmds.add("set hive.auto.convert.join=false"); @@ -72,7 +73,7 @@ public void testTriggerHighShuffleBytes() throws Exception { @Test(timeout = 60000) public void testTriggerHighBytesRead() throws Exception { Expression expression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); - Trigger trigger = new ExecutionTrigger("big_read", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("big_read", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -82,7 +83,7 @@ public void testTriggerHighBytesRead() throws Exception { @Test(timeout = 60000) public void testTriggerHighBytesWrite() throws Exception { Expression expression = ExpressionFactory.fromString("FILE_BYTES_WRITTEN > 100"); - Trigger trigger = new ExecutionTrigger("big_write", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("big_write", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -92,7 +93,7 @@ public void testTriggerHighBytesWrite() throws Exception { @Test(timeout = 60000) public void testTriggerTotalTasks() throws Exception { Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50"); - Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -102,11 +103,11 @@ public void testTriggerTotalTasks() throws Exception { @Test(timeout = 60000) public void testTriggerCustomReadOps() throws Exception { Expression expression = ExpressionFactory.fromString("HDFS_READ_OPS > 50"); - Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + runQueryWithTrigger(query, getConfigs(), trigger + " violated"); } @Test(timeout = 120000) @@ -114,20 +115,20 @@ public void testTriggerCustomCreatedFiles() throws Exception { List cmds = getConfigs(); Expression expression = ExpressionFactory.fromString("CREATED_FILES > 5"); - Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "create table testtab2 as select * from " + tableName; - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); // partitioned insert expression = ExpressionFactory.fromString("CREATED_FILES > 10"); - trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); cmds.add("drop table src3"); cmds.add("create table src3 (key int) partitioned by (value string)"); query = "insert overwrite table src3 partition (value) select sleep(under_col, 10), value from " + tableName + " where under_col < 100"; - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); } @Test(timeout = 240000) @@ -140,9 +141,9 @@ public void testTriggerCustomCreatedDynamicPartitions() throws Exception { String query = "insert overwrite table src2 partition (value) select * from " + tableName + " where under_col < 100"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 20"); - Trigger trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); cmds = getConfigs(); // let it create 57 partitions without any triggers @@ -154,9 +155,9 @@ public void testTriggerCustomCreatedDynamicPartitions() throws Exception { // query will try to add 64 more partitions to already existing 57 partitions but will get cancelled for violation query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 30"); - trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); // let it create 64 more partitions (total 57 + 64 = 121) without any triggers query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; @@ -166,7 +167,7 @@ public void testTriggerCustomCreatedDynamicPartitions() throws Exception { // re-run insert into but this time no new partitions will be created, so there will be no violation query = "insert into table src2 partition (value) select * from " + tableName + " where under_col < 200"; expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 10"); - trigger = new ExecutionTrigger("high_read_ops", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("high_read_ops", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); runQueryWithTrigger(query, cmds, null); } @@ -184,9 +185,9 @@ public void testTriggerCustomCreatedDynamicPartitionsMultiInsert() throws Except " insert overwrite table src2 partition (value) select * where under_col < 100 " + " insert overwrite table src3 partition (value) select * where under_col >= 100 and under_col < 200"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); - Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); } @Test(timeout = 60000) @@ -203,15 +204,15 @@ public void testTriggerCustomCreatedDynamicPartitionsUnionAll() throws Exception "union all " + "select * from " + tableName + " where under_col >= 100 and under_col < 200) temps"; Expression expression = ExpressionFactory.fromString("CREATED_DYNAMIC_PARTITIONS > 70"); - Trigger trigger = new ExecutionTrigger("high_partitions", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("high_partitions", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); - runQueryWithTrigger(query, cmds, "Query was cancelled"); + runQueryWithTrigger(query, cmds, trigger + " violated"); } @Test(timeout = 60000) public void testTriggerCustomNonExistent() throws Exception { Expression expression = ExpressionFactory.fromString("OPEN_FILES > 50"); - Trigger trigger = new ExecutionTrigger("non_existent", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("non_existent", expression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(trigger)); String query = "select l.under_col, l.value from " + tableName + " l join " + tableName + " r on l.under_col>=r.under_col"; @@ -221,9 +222,9 @@ public void testTriggerCustomNonExistent() throws Exception { @Test(timeout = 60000) public void testMultipleTriggers1() throws Exception { Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 1000000"); - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY)); Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; @@ -233,9 +234,9 @@ public void testMultipleTriggers1() throws Exception { @Test(timeout = 60000) public void testMultipleTriggers2() throws Exception { Expression shuffleExpression = ExpressionFactory.fromString("HDFS_BYTES_READ > 100"); - Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, Trigger.Action.KILL_QUERY); + Trigger shuffleTrigger = new ExecutionTrigger("big_shuffle", shuffleExpression, new Action(Action.Type.KILL_QUERY)); Expression execTimeExpression = ExpressionFactory.fromString("EXECUTION_TIME > 100000"); - Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, Trigger.Action.KILL_QUERY); + Trigger execTimeTrigger = new ExecutionTrigger("slow_query", execTimeExpression, new Action(Action.Type.KILL_QUERY)); setupTriggers(Lists.newArrayList(shuffleTrigger, execTimeTrigger)); String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + " t2 on t1.under_col>=t2.under_col"; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java new file mode 100644 index 0000000..de3e4cf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.SettableFuture; + +public class KillMoveTriggerActionHandler implements TriggerActionHandler { + private static final Logger LOG = LoggerFactory.getLogger(KillMoveTriggerActionHandler.class); + private final WorkloadManager wm; + + KillMoveTriggerActionHandler(final WorkloadManager wm) { + this.wm = wm; + } + + @Override + public void applyAction(final Map queriesViolated) { + TezSessionState sessionState; + for (Map.Entry entry : queriesViolated.entrySet()) { + switch (entry.getValue().getAction().getType()) { + case KILL_QUERY: + sessionState = entry.getKey(); + String queryId = sessionState.getTriggerContext().getQueryId(); + try { + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); + } catch (HiveException e) { + LOG.warn("Unable to kill query {} for trigger violation"); + } + break; + case MOVE_TO_POOL: + sessionState = entry.getKey(); + if (sessionState instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) sessionState; + String destPoolName = entry.getValue().getAction().getPoolName(); + String srcPoolName = wmTezSession.getCurrentPoolName(); + Future moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName); + try { + // block to make sure move happened successfully + if (moveFuture.get()) { + LOG.info("Moved session {} to pool {}", wmTezSession.getSessionId(), wmTezSession.getCurrentPoolName()); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception while moving session {} from pool {} to {}.", wmTezSession.getSessionId(), + srcPoolName, destPoolName, e); + } + } + break; + default: + throw new RuntimeException("Unsupported action: " + entry.getValue()); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index 474fae9..8c60b6f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,9 +32,9 @@ private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); @Override - public void applyAction(final Map queriesViolated) { - for (Map.Entry entry : queriesViolated.entrySet()) { - switch (entry.getValue()) { + public void applyAction(final Map queriesViolated) { + for (Map.Entry entry : queriesViolated.entrySet()) { + switch (entry.getValue().getAction().getType()) { case KILL_QUERY: TezSessionState sessionState = entry.getKey(); String queryId = sessionState.getTriggerContext().getQueryId(); @@ -42,7 +42,7 @@ public void applyAction(final Map queriesViolat KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); + sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java new file mode 100644 index 0000000..212894c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerPoolTriggerValidatorRunnable implements Runnable { + protected static transient Logger LOG = LoggerFactory.getLogger(PerPoolTriggerValidatorRunnable.class); + private final WorkloadManager wm; + private final TriggerActionHandler triggerActionHandler; + private final Map poolValidators; + private final long triggerValidationIntervalMs; + + PerPoolTriggerValidatorRunnable(final WorkloadManager wm, final TriggerActionHandler triggerActionHandler, + final long triggerValidationIntervalMs) { + this.wm = wm; + this.triggerActionHandler = triggerActionHandler; + this.poolValidators = new HashMap<>(); + this.triggerValidationIntervalMs = triggerValidationIntervalMs; + } + + @Override + public void run() { + try { + Map poolStates = wm.getAllSessionTriggerProviders(); + ScheduledExecutorService validatorExecutorService = Executors.newScheduledThreadPool(poolStates.size()); + for (Map.Entry entry : poolStates.entrySet()) { + String poolName = entry.getKey(); + if (!poolValidators.containsKey(poolName)) { + LOG.info("Creating trigger validator for pool: {}", poolName); + TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(entry.getValue(), triggerActionHandler); + validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + poolValidators.put(poolName, poolValidator); + } + } + } catch (Throwable t) { + // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch + LOG.warn(PerPoolTriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index be8b9a4..ad8a5c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,11 +18,6 @@ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; -import java.net.URISyntaxException; -import javax.security.auth.login.LoginException; -import org.apache.tez.dag.api.TezException; - import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -30,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -193,10 +189,12 @@ public void initTriggers(final HiveConf conf) throws HiveException { } if (triggerValidatorRunnable == null) { + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, ConfVars + .HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch()); triggerActionHandler = new KillTriggerActionHandler(); - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - startTriggerValidator(conf); + triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler); + startTriggerValidator(triggerValidationIntervalMs); } } @@ -370,16 +368,6 @@ public void destroy(TezSessionState tezSessionState) throws Exception { } @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; - } - - @Override TriggerValidatorRunnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -520,8 +508,8 @@ public void registerOpenSession(TezSessionPoolSession session) { private void updateSessionsTriggers() { if (sessionTriggerProvider != null && globalTriggersFetcher != null) { - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); + sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions)); + sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); } } @@ -552,7 +540,7 @@ public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlo public List getTriggerCounterNames() { List counterNames = new ArrayList<>(); if (sessionTriggerProvider != null) { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); + List activeTriggers = sessionTriggerProvider.getTriggers(); for (Trigger trigger : activeTriggers) { counterNames.add(trigger.getExpression().getCounterLimit().getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 6887d7a..db2848c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -73,18 +73,12 @@ TezSessionState reopen(TezSessionState session, Configuration conf, } public static abstract class AbstractTriggerValidator { - abstract SessionTriggerProvider getSessionTriggerProvider(); + abstract Runnable getTriggerValidatorRunnable(); - abstract TriggerActionHandler getTriggerActionHandler(); - - abstract TriggerValidatorRunnable getTriggerValidatorRunnable(); - - public void startTriggerValidator(Configuration conf) { - long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + public void startTriggerValidator(long triggerValidationIntervalMs) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, triggerValidationIntervalMs, TimeUnit.MILLISECONDS); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 33bccbe..f9fe438 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.wm.Action; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; @@ -41,29 +42,48 @@ @Override public void run() { try { - Map violatedSessions = new HashMap<>(); - final List sessions = sessionTriggerProvider.getOpenSessions(); - final List triggers = sessionTriggerProvider.getActiveTriggers(); - for (TezSessionState s : sessions) { - TriggerContext triggerContext = s.getTriggerContext(); - if (triggerContext != null && !triggerContext.isQueryCompleted()) { + Map violatedSessions = new HashMap<>(); + final List sessions = sessionTriggerProvider.getSessions(); + final List triggers = sessionTriggerProvider.getTriggers(); + for (TezSessionState sessionState : sessions) { + TriggerContext triggerContext = sessionState.getTriggerContext(); + if (triggerContext != null && !triggerContext.isQueryCompleted() + && !triggerContext.getCurrentCounters().isEmpty()) { Map currentCounters = triggerContext.getCurrentCounters(); - for (Trigger t : triggers) { - String desiredCounter = t.getExpression().getCounterLimit().getName(); + for (Trigger currentTrigger : triggers) { + String desiredCounter = currentTrigger.getExpression().getCounterLimit().getName(); // there could be interval where desired counter value is not populated by the time we make this check if (currentCounters.containsKey(desiredCounter)) { long currentCounterValue = currentCounters.get(desiredCounter); - if (t.apply(currentCounterValue)) { - String queryId = s.getTriggerContext().getQueryId(); - LOG.info("Query {} violated trigger {}. Current counter value: {}. Going to apply action {}", queryId, - t, currentCounterValue, t.getAction()); - Trigger.Action action = t.getAction(); - String msg = "Trigger " + t + " violated. Current value: " + currentCounterValue; - action.setMsg(msg); - violatedSessions.put(s, action); + if (currentTrigger.apply(currentCounterValue)) { + String queryId = sessionState.getTriggerContext().getQueryId(); + if (violatedSessions.containsKey(sessionState)) { + // session already has a violation + Trigger existingTrigger = violatedSessions.get(sessionState); + // KILL always takes priority over MOVE + if (existingTrigger.getAction().getType().equals(Action.Type.MOVE_TO_POOL) && + currentTrigger.getAction().getType().equals(Action.Type.KILL_QUERY)) { + currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " + + currentCounterValue); + violatedSessions.put(sessionState, currentTrigger); + LOG.info("KILL trigger replacing MOVE for query {}", queryId); + } + // if multiple MOVE happens, only first move will be chosen + } else { + // first violation for the session + currentTrigger.setViolationMsg("Trigger " + currentTrigger + " violated. Current value: " + + currentCounterValue); + violatedSessions.put(sessionState, currentTrigger); + } } } } + + Trigger chosenTrigger = violatedSessions.get(sessionState); + if (chosenTrigger != null) { + LOG.info("Query: {}. {}. Applying action.", sessionState.getTriggerContext().getQueryId(), + chosenTrigger.getViolationMsg()); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java deleted file mode 100644 index c46569b..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.tez; - -import java.util.Map; - -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.wm.Trigger; -import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TriggerViolationActionHandler implements TriggerActionHandler { - private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class); - - @Override - public void applyAction(final Map queriesViolated) { - for (Map.Entry entry : queriesViolated.entrySet()) { - switch (entry.getValue()) { - case KILL_QUERY: - TezSessionState sessionState = entry.getKey(); - String queryId = sessionState.getTriggerContext().getQueryId(); - try { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getMsg()); - } catch (HiveException e) { - LOG.warn("Unable to kill query {} for trigger violation"); - } - break; - default: - throw new RuntimeException("Unsupported action: " + entry.getValue()); - } - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 0dd1433..30f7e50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -20,8 +20,6 @@ import com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.hive.ql.exec.Utilities; - import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; @@ -32,7 +30,8 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance; public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { - private String poolName; + private String currentPoolName; + private String originPoolName; private double clusterFraction; private String killReason = null; @@ -111,12 +110,20 @@ public AmPluginInfo getAmPluginInfo() { return amPluginInfo; // Only has final fields, no artifacts from the absence of sync. } - void setPoolName(String poolName) { - this.poolName = poolName; + void setCurrentPoolName(String currentPoolName) { + this.currentPoolName = currentPoolName; + } + + String getCurrentPoolName() { + return currentPoolName; + } + + public void setOriginPoolName(final String originPoolName) { + this.originPoolName = originPoolName; } - String getPoolName() { - return poolName; + public String getOriginPoolName() { + return originPoolName; } void setClusterFraction(double fraction) { @@ -124,7 +131,7 @@ void setClusterFraction(double fraction) { } void clearWm() { - this.poolName = null; + this.currentPoolName = null; this.clusterFraction = 0f; } @@ -188,6 +195,12 @@ void setIsIrrelevantForWm(String killReason) { this.killReason = killReason; } + @Override + public String toString() { + return super.toString() + ", currentPoolName: " + currentPoolName + ", originPoolName: " + originPoolName + ", " + + "clusterFraction: " + clusterFraction; + } + private final class TimeoutRunnable implements Runnable { @Override public void run() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 039881f..1587e76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,31 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; -import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import static org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; -import org.apache.hadoop.hive.ql.wm.Trigger.Action; - -import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; - -import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; - -import org.apache.hadoop.hive.metastore.api.WMTrigger; - -import org.apache.commons.lang3.StringUtils; - -import org.apache.hadoop.hive.metastore.api.WMPool; - -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; - -import org.apache.hadoop.hive.ql.session.SessionState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -52,20 +29,27 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; +import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; @@ -73,10 +57,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** Workload management entry point for HS2. */ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator - implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { + implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); // TODO: this is a temporary setting that will go away, so it's not in HiveConf. public static final String TEST_WM_CONFIG = "hive.test.workload.management"; @@ -93,7 +85,7 @@ // sessions, so the pool itself could internally track the sessions it gave out, since // calling close on an unopened session is probably harmless. private final IdentityHashMap openSessions = - new IdentityHashMap<>(); + new IdentityHashMap<>(); private final int amRegistryTimeoutMs; // Note: pools can only be modified by the master thread. @@ -104,9 +96,7 @@ // We index the get requests to make sure there are no ordering artifacts when we requeue. private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE); - private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; - private TriggerValidatorRunnable triggerValidatorRunnable; + private PerPoolTriggerValidatorRunnable triggerValidatorRunnable; // Note: we could use RW lock to allow concurrent calls for different sessions, however all // those calls do is add elements to lists and maps; and we'd need to sync those separately @@ -118,6 +108,7 @@ private final EventState one = new EventState(), two = new EventState(); private boolean hasChanges = false; private EventState current = one; + private Map perPoolProviders = new ConcurrentHashMap<>(); /** The master thread the processes the events from EventState. */ @VisibleForTesting @@ -143,6 +134,7 @@ public void onFailure(Throwable t) { // TODO: this is temporary before HiveServerEnvironment is merged. private static volatile WorkloadManager INSTANCE; + public static WorkloadManager getInstance() { return INSTANCE; } @@ -170,52 +162,29 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso LOG.info("Initializing with " + totalQueryParallelism + " total query parallelism"); this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( - conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); + conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true, - new TezSessionPool.SessionObjectFactory() { - @Override - public WmTezSession create(WmTezSession oldSession) { - return createSession(oldSession == null ? null : oldSession.getConf()); - } - }); + oldSession -> createSession(oldSession == null ? null : oldSession.getConf())); restrictedConfig = new RestrictedConfigChecker(conf); allocationManager = qam; // Only creates the expiration tracker if expiration is configured. expirationTracker = SessionExpirationTracker.create(conf, this); - ThreadFactory workerFactory = new ThreadFactory() { - private final AtomicInteger threadNumber = new AtomicInteger(-1); - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Workload management worker " + threadNumber.incrementAndGet()); - t.setDaemon(true); - return t; - } - }; - workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, - ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), workerFactory); - ThreadFactory timeoutFactory = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "Workload management timeout thread"); - t.setDaemon(true); - return t; - } - }; - timeoutPool = Executors.newScheduledThreadPool(1, timeoutFactory); - wmThread = new Thread(new Runnable() { - @Override - public void run() { - runWmThread(); - } - }, "Workload management master"); + workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management worker %d").build()); + + timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Workload management timeout thread").build()); + + wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); - // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers) - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new TriggerViolationActionHandler(); - triggerValidatorRunnable = new TriggerValidatorRunnable( - getSessionTriggerProvider(), getTriggerActionHandler()); - startTriggerValidator(conf); + + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); + triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(this, triggerActionHandler, + triggerValidationIntervalMs); + startTriggerValidator(triggerValidationIntervalMs); } private int determineQueryParallelism(WMFullResourcePlan plan) { @@ -257,18 +226,62 @@ public void stop() throws Exception { INSTANCE = null; } + private void updateSessionTriggerProviders() { + currentLock.lock(); + try { + if (pools != null) { + for (Map.Entry entry : pools.entrySet()) { + String poolName = entry.getKey(); + PoolState poolState = entry.getValue(); + final List triggers = Collections.unmodifiableList(poolState.getTriggers()); + final List sessionStates = Collections.unmodifiableList(poolState.getSessions()); + if (perPoolProviders.containsKey(poolName)) { + perPoolProviders.get(poolName).setTriggers(triggers); + perPoolProviders.get(poolName).setSessions(sessionStates); + } else { + SessionTriggerProvider sessionTriggerProvider = new SessionTriggerProvider(sessionStates, triggers); + perPoolProviders.put(poolName, sessionTriggerProvider); + } + } + } + } finally { + currentLock.unlock(); + } + } + + public Map getAllSessionTriggerProviders() { + return perPoolProviders; + } + /** Represent a single iteration of work for the master thread. */ private final static class EventState { private final Set toReturn = Sets.newIdentityHashSet(), - toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet(); + toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet(); private final LinkedList initResults = new LinkedList<>(); private final IdentityHashMap> toReopen = - new IdentityHashMap<>(); + new IdentityHashMap<>(); private final LinkedList getRequests = new LinkedList<>(); private final IdentityHashMap toReuse = new IdentityHashMap<>(); private WMFullResourcePlan resourcePlanToApply = null; private boolean hasClusterStateChanged = false; private SettableFuture testEvent, applyRpFuture; + private MoveSession moveSession = null; + private SettableFuture moveSessionFuture = null; + } + + private final static class MoveSession { + private final WmTezSession srcSession; + private final String destPool; + + public MoveSession(final WmTezSession srcSession, final String destPool) { + this.srcSession = srcSession; + this.destPool = destPool; + } + + @Override + public String toString() { + return srcSession.getSessionId() + " moving from " + srcSession.getCurrentPoolName() + " to " + destPool; + } } /** @@ -277,7 +290,7 @@ public void stop() throws Exception { */ private final static class WmThreadSyncWork { private LinkedList toRestartInUse = new LinkedList<>(), - toDestroyNoRestart = new LinkedList<>(); + toDestroyNoRestart = new LinkedList<>(); } private void runWmThread() { @@ -303,6 +316,7 @@ private void runWmThread() { LOG.debug("Processing current events"); processCurrentEvents(currentEvents, syncWork); scheduleWork(syncWork); + updateSessionTriggerProviders(); } catch (InterruptedException ex) { LOG.warn("WM thread was interrupted and will now exit"); return; @@ -333,6 +347,7 @@ private void scheduleWork(WmThreadSyncWork context) { try { // Note: sessions in toRestart are always in use, so they cannot expire in parallel. tezAmPool.replaceSession(toRestart, false, null); + updateSessionTriggerProviders(); } catch (Exception ex) { LOG.error("Failed to restart an old session; ignoring " + ex.getMessage()); } @@ -345,6 +360,7 @@ private void scheduleWork(WmThreadSyncWork context) { workPool.submit(() -> { try { toDestroy.close(false); + updateSessionTriggerProviders(); } catch (Exception ex) { LOG.error("Failed to close an old session; ignoring " + ex.getMessage()); } @@ -377,7 +393,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } LOG.debug("Destroying {}", sessionToDestroy); Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread( - e, sessionToDestroy, poolsToRedistribute); + e, sessionToDestroy, poolsToRedistribute); if (shouldReturn == null || shouldReturn) { // Restart if this session is still relevant, even if there's an internal error. syncWork.toRestartInUse.add(sessionToDestroy); @@ -389,7 +405,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw for (WmTezSession sessionToReturn: e.toReturn) { LOG.debug("Returning {}", sessionToReturn); Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread( - e, sessionToReturn, poolsToRedistribute); + e, sessionToReturn, poolsToRedistribute); if (shouldReturn == null) { // Restart if there's an internal error. syncWork.toRestartInUse.add(sessionToReturn); @@ -407,7 +423,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw for (Map.Entry> entry : e.toReopen.entrySet()) { LOG.debug("Reopening {}", entry.getKey()); handeReopenRequestOnMasterThread( - e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork); + e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork); } e.toReopen.clear(); @@ -428,6 +444,11 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } e.resourcePlanToApply = null; + if (e.moveSession != null) { + handleMoveSessionOnMasterThread(e, poolsToRedistribute); + } + e.moveSession = null; + // 6. Handle all the get/reuse requests. We won't actually give out anything here, but merely // map all the requests and place them in an appropriate order in pool queues. The only // exception is the reuse without queue contention; can be granted immediately. If we can't @@ -436,7 +457,7 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw GetRequest req; while ((req = e.getRequests.pollFirst()) != null) { LOG.debug("Processing a new get request from " + req.mappingInput); - queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork); + queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork, e); } e.toReuse.clear(); @@ -466,10 +487,59 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw } } + private void handleMoveSessionOnMasterThread(final EventState e, + final HashSet poolsToRedistribute) { + MoveSession moveSession = e.moveSession; + SettableFuture moveSessionFuture = e.moveSessionFuture; + String destPoolName = e.moveSession.destPool; + String srcPoolName = e.moveSession.srcSession.getCurrentPoolName(); + String errMsg; + LOG.info("Handling move session event: {}", e.moveSession); + if (validMove(moveSession.srcSession, destPoolName)) { + // 1) Remove from its own pool + // 2) if origin pool is set, give it back to its origin pool + // a) remove from loaned sessions list of origin pool + // b) add to sessions list of origin pool + Boolean removed = checkAndRemoveSessionFromItsPool(moveSession.srcSession, poolsToRedistribute, true); + if (removed != null && removed) { + // 1) Add to destination pool + // 2) Add to origin pool's loaned sessions list + // 3) Remove from origin pool's sessions list + Boolean added = checkAndAddSessionToAnotherPool(moveSession.srcSession, srcPoolName, destPoolName, + poolsToRedistribute); + if (added != null && added) { + moveSessionFuture.set(true); + return; + } else { + errMsg = "Unable to add session: " + moveSession.srcSession + " to destPoolName: " + destPoolName; + } + } else { + errMsg = "Unable to remove session: " + moveSession.srcSession + " from its own pool"; + } + } else { + errMsg = "Validation failed for move. Session: " + moveSession.srcSession + " destPoolName: " + destPoolName; + } + + LOG.error("Move {} failed with err: {}", e.moveSession, errMsg); + moveSessionFuture.set(false); + moveSessionFuture.setException(new AssertionError("Error while move session for " + + moveSession + ". Error: " + errMsg)); + } + + private boolean validMove(final WmTezSession srcSession, final String destPool) { + return srcSession != null && + destPool != null && + !srcSession.isIrrelevantForWm() && + srcSession.getCurrentPoolName() != null && + pools.containsKey(srcSession.getCurrentPoolName()) && + pools.containsKey(destPool) && + !srcSession.getCurrentPoolName().equalsIgnoreCase(destPool); + } + // ========= Master thread methods private void handleInitResultOnMasterThread( - SessionInitContext sw, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + SessionInitContext sw, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { // For the failures, the users have been notified, we just need to clean up. There's no // session here (or it's unused), so no conflicts are possible. We just remove it. // For successes, the user has also been notified, so various requests are also possible; @@ -488,13 +558,13 @@ private void handleInitResultOnMasterThread( sw.lock.unlock(); } LOG.debug("Processing " + ((session == null) ? "failed" : "successful") - + " initialization result for pool " + sw.poolName); + + " initialization result for pool " + sw.poolName); // We could not have removed the pool for this session, or we would have CANCELED the init. PoolState pool = pools.get(sw.poolName); if (pool == null || !pool.initializingSessions.remove(sw)) { // Query parallelism might be fubar. LOG.error("Cannot remove initializing session from the pool " - + sw.poolName + " - internal error"); + + sw.poolName + " - internal error"); } poolsToRedistribute.add(sw.poolName); if (session != null) { @@ -509,7 +579,7 @@ private void handleInitResultOnMasterThread( } private Boolean handleReturnedInUseSessionOnMasterThread( - EventState e, WmTezSession session, HashSet poolsToRedistribute) { + EventState e, WmTezSession session, HashSet poolsToRedistribute) { // This handles the common logic for destroy and return - everything except // the invalid combination of destroy and return themselves, as well as the actual // statement that destroys or returns it. @@ -524,12 +594,12 @@ private Boolean handleReturnedInUseSessionOnMasterThread( if (reuseRequest != null) { reuseRequest.future.setException(new AssertionError("Invalid reuse attempt")); } - return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute); + return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false); } private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session, - SettableFuture future, HashSet poolsToRedistribute, - WmThreadSyncWork syncWork) throws Exception { + SettableFuture future, HashSet poolsToRedistribute, + WmThreadSyncWork syncWork) throws Exception { if (e.updateErrors.remove(session)) { LOG.debug("Ignoring an update error for a session being reopened"); } @@ -540,8 +610,8 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session // In order to expedite things in a general case, we are not actually going to reopen // anything. Instead, we will try to give out an existing session from the pool, and restart // the problematic one in background. - String poolName = session.getPoolName(); - Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute); + String poolName = session.getCurrentPoolName(); + Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false); // If we fail to remove, it's probably an internal error. We'd try to handle it the same way // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism. if (isRemoved == null) { @@ -550,7 +620,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session return; } else if (!isRemoved) { future.setException(new RuntimeException("WM killed this session during reopen: " - + session.getReasonForKill())); + + session.getReasonForKill())); return; // No longer relevant for WM - bail. } // If pool didn't exist, removeSessionFromItsPool would have returned null. @@ -564,7 +634,7 @@ private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session } private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError, - EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { GetRequest reuseRequest = e.toReuse.remove(sessionWithUpdateError); if (reuseRequest != null) { // This session is bad, so don't allow reuse; just convert it to normal get. @@ -573,7 +643,7 @@ private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError // TODO: we should communicate this to the user more explicitly (use kill query API, or // add an option for bg kill checking to TezTask/monitor? // We are assuming the update-error AM is bad and just try to kill it. - Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute); + Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute, false); if (isRemoved != null && !isRemoved) { // An update error for some session that was actually already killed by us. return; @@ -587,7 +657,7 @@ private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError } private void applyNewResourcePlanOnMasterThread( - EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + EventState e, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { int totalQueryParallelism = 0; // FIXME: Add Triggers from metastore to poolstate // Note: we assume here that plan has been validated beforehand, so we don't verify @@ -637,9 +707,7 @@ private void applyNewResourcePlanOnMasterThread( if (e.resourcePlanToApply.isSetTriggers() && e.resourcePlanToApply.isSetPoolTriggers()) { Map triggers = new HashMap<>(); for (WMTrigger trigger : e.resourcePlanToApply.getTriggers()) { - // TODO: parse trigger.getActionExpression() correctly; right now the Action enum is invalid. - ExecutionTrigger execTrigger = new ExecutionTrigger(trigger.getTriggerName(), - ExpressionFactory.fromString(trigger.getTriggerExpression()), Action.KILL_QUERY); + ExecutionTrigger execTrigger = ExecutionTrigger.fromWMTrigger(trigger); triggers.put(trigger.getTriggerName(), execTrigger); } for (WMPoolTrigger poolTrigger : e.resourcePlanToApply.getPoolTriggers()) { @@ -672,7 +740,7 @@ private void applyNewResourcePlanOnMasterThread( } if (deltaSessions != 0) { failOnFutureFailure(tezAmPool.resizeAsync( - deltaSessions, syncWork.toDestroyNoRestart)); + deltaSessions, syncWork.toDestroyNoRestart)); } } @@ -682,7 +750,8 @@ private void failOnFutureFailure(ListenableFuture future) { } private void queueGetRequestOnMasterThread( - GetRequest req, HashSet poolsToRedistribute, WmThreadSyncWork syncWork) { + GetRequest req, HashSet poolsToRedistribute, WmThreadSyncWork syncWork, + final EventState e) { String poolName = userPoolMapping.mapSessionToPoolName(req.mappingInput); if (poolName == null) { req.future.setException(new NoPoolMappingException( @@ -701,9 +770,9 @@ private void queueGetRequestOnMasterThread( if (req.sessionToReuse != null) { // Given that we are trying to reuse, this session MUST be in some pool.sessions. // Kills that could have removed it must have cleared sessionToReuse. - String oldPoolName = req.sessionToReuse.getPoolName(); + String oldPoolName = req.sessionToReuse.getCurrentPoolName(); oldPool = pools.get(oldPoolName); - Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute); + Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute, false); if (isRemoved == null || !isRemoved) { // This is probably an internal error... abandon the reuse attempt. returnSessionOnFailedReuse(req, syncWork, null); @@ -719,7 +788,7 @@ private void queueGetRequestOnMasterThread( if (req.sessionToReuse != null) { // If we can immediately reuse a session, there's nothing to wait for - just return. - req.sessionToReuse.setPoolName(poolName); + req.sessionToReuse.setCurrentPoolName(poolName); req.sessionToReuse.setQueueName(yarnQueue); pool.sessions.add(req.sessionToReuse); if (pool != oldPool) { @@ -728,6 +797,22 @@ private void queueGetRequestOnMasterThread( req.future.set(req.sessionToReuse); return; } + + // before we add new request to queue, check to see if we have loaned any session and we are actively using + // all sessions. If so kill the most recently loaned session (this might have done less work) + // TODO: change this based on priority (shortest running, or based on policy). For now using most recently loaned. + if (pool.getTotalActiveSessions() == pool.queryParallelism && pool.loanedSessions.size() > 0) { + LOG.info("Sessions exhausted in pool: {} and queue is not empty", pool); + WmTezSession recentlyLoanedSession = pool.getLoanedSessions().removeLast(); + recentlyLoanedSession.setOriginPoolName(null); + Boolean removed = checkAndRemoveSessionFromItsPool(recentlyLoanedSession, poolsToRedistribute, false); + if (removed != null && removed) { + recentlyLoanedSession.setCurrentPoolName(poolName); + syncWork.toRestartInUse.add(recentlyLoanedSession); + LOG.info("Returning recently loaned session: {}", recentlyLoanedSession); + } + } + // Otherwise, queue the session and make sure we update this pool. pool.queue.addLast(req); poolsToRedistribute.add(poolName); @@ -735,13 +820,14 @@ private void queueGetRequestOnMasterThread( private void processPoolChangesOnMasterThread( - String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception { + String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception { PoolState pool = pools.get(poolName); if (pool == null) return; // Might be from before the new resource plan. // 1. First, start the queries from the queue. int queriesToStart = Math.min(pool.queue.size(), - pool.queryParallelism - pool.getTotalActiveSessions()); + pool.queryParallelism - pool.getTotalActiveSessions()); + if (queriesToStart > 0) { LOG.debug("Starting {} queries in pool {}", queriesToStart, pool); } @@ -777,21 +863,13 @@ private void processPoolChangesOnMasterThread( // Note: If allocation manager does not have cluster state, it won't update anything. When the // cluster state changes, it will notify us, and we'd update the queries again. allocationManager.updateSessionsAsync(totalAlloc, pool.sessions); - - // 3. Update triggers for this pool. - // TODO: need to merge with per-pool enforcement, it will only work for one pool for now. - if (sessionTriggerProvider != null) { - sessionTriggerProvider.setOpenSessions( - Collections.unmodifiableList(pool.sessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(pool.triggers)); - } } private void returnSessionOnFailedReuse( - GetRequest req, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { + GetRequest req, WmThreadSyncWork syncWork, HashSet poolsToRedistribute) { if (req.sessionToReuse == null) return; if (poolsToRedistribute != null) { - Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute); + Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute, false); // The session cannot have been killed; this happens after all the kills in the current // iteration, so we would have cleared sessionToReuse when killing this. assert isRemoved == null || isRemoved; @@ -809,24 +887,63 @@ private void returnSessionOnFailedReuse( * in WM but wasn't found in the requisite pool (internal error?). */ private Boolean checkAndRemoveSessionFromItsPool( - WmTezSession session, HashSet poolsToRedistribute) { + WmTezSession session, HashSet poolsToRedistribute, boolean isMove) { // It is possible for some request to be queued after a main thread has decided to kill this // session; on the next iteration, we'd be processing that request with an irrelevant session. if (session.isIrrelevantForWm()) { return false; } // If we did not kill this session we expect everything to be present. - String poolName = session.getPoolName(); - session.clearWm(); + String poolName = session.getCurrentPoolName(); if (poolName != null) { poolsToRedistribute.add(poolName); PoolState pool = pools.get(poolName); - if (pool != null && pool.sessions.remove(session)) return true; + session.clearWm(); + if (pool != null && pool.sessions.remove(session)) { + // if this is a loaned session and is being returned after use, remove it from the loaned session list. + if (session.getOriginPoolName() != null && !isMove) { + PoolState originPool = pools.get(session.getOriginPoolName()); + if (originPool != null) { + originPool.loanedSessions.remove(session); + session.setOriginPoolName(null); + } + } + return true; + } } LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session); return null; } + private Boolean checkAndAddSessionToAnotherPool( + WmTezSession session, String srcPoolName, String destPoolName, HashSet poolsToRedistribute) { + // It is possible for some request to be queued after a main thread has decided to kill this + // session; on the next iteration, we'd be processing that request with an irrelevant session. + if (session.isIrrelevantForWm()) { + return false; + } + + PoolState destPool = pools.get(destPoolName); + PoolState srcPool = pools.get(srcPoolName); + if (destPool != null && destPool.sessions.add(session)) { + if (srcPool != null) { + // if origin pool is already set, then this session is already a loaned session (moving to different pool + // doesn't change origin pool, in fact returned sessions will always go to origin pool and not to where it + // came from) + if (session.getOriginPoolName() == null) { + srcPool.getLoanedSessions().add(session); + srcPool.getSessions().remove(session); + session.setOriginPoolName(srcPoolName); + } + } + session.setCurrentPoolName(destPoolName); + poolsToRedistribute.add(destPoolName); + return true; + } + LOG.error("Session {} was not not added to pool {}", session, destPoolName); + return null; + } + // ===== EVENT METHODS public ListenableFuture updateResourcePlanAsync(WMFullResourcePlan plan) { @@ -838,7 +955,7 @@ private Boolean checkAndRemoveSessionFromItsPool( if (current.resourcePlanToApply != null) { LOG.warn("Several resource plans are being applied at the same time; using the latest"); current.applyRpFuture.setException( - new HiveException("Another plan was applied in parallel")); + new HiveException("Another plan was applied in parallel")); } current.resourcePlanToApply = plan; current.applyRpFuture = applyRpFuture; @@ -849,6 +966,21 @@ private Boolean checkAndRemoveSessionFromItsPool( return applyRpFuture; } + public Future applyMoveSessionAsync(WmTezSession srcSession, String destPoolName) { + SettableFuture moveSessionFuture = SettableFuture.create(); + currentLock.lock(); + try { + MoveSession moveSession = new MoveSession(srcSession, destPoolName); + current.moveSession = moveSession; + current.moveSessionFuture = moveSessionFuture; + LOG.info("Queued move session: {}", moveSession); + notifyWmThreadUnderLock(); + } finally { + currentLock.unlock(); + } + return moveSessionFuture; + } + private final static class GetRequest { public static final Comparator ORDER_COMPARATOR = new Comparator() { @Override @@ -990,12 +1122,13 @@ public void notifyInitializationCompleted(SessionInitContext initCtx) { @Override public TezSessionState reopen(TezSessionState session, Configuration conf, - String[] additionalFiles) throws Exception { + String[] additionalFiles) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); HiveConf sessionConf = wmTezSession.getConf(); if (sessionConf == null) { LOG.warn("Session configuration is null for " + wmTezSession); sessionConf = new HiveConf(conf, WorkloadManager.class); + } // TODO: ideally, we should handle reopen the same way no matter what. However, the cases // with additional files will have to wait until HIVE-17827 is unfucked, because it's @@ -1124,28 +1257,27 @@ protected final HiveConf getConf() { return conf; } - public List getTriggerCounterNames() { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); - List counterNames = new ArrayList<>(); - for (Trigger trigger : activeTriggers) { - counterNames.add(trigger.getExpression().getCounterLimit().getName()); + public List getTriggerCounterNames(final TezSessionState session) { + if (session instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) session; + String poolName = wmTezSession.getCurrentPoolName(); + PoolState poolState = pools.get(poolName); + if (poolState != null) { + List counterNames = new ArrayList<>(); + List triggers = poolState.getTriggers(); + if (triggers != null) { + for (Trigger trigger : triggers) { + counterNames.add(trigger.getExpression().getCounterLimit().getName()); + } + } + return counterNames; + } } - return counterNames; - } - - - @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; + return null; } @Override - TriggerValidatorRunnable getTriggerValidatorRunnable() { + Runnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -1158,6 +1290,7 @@ TriggerValidatorRunnable getTriggerValidatorRunnable() { private final LinkedList initializingSessions = new LinkedList<>(); // Note: the list is expected to be a few items; if it's longer we may want an IHM. private final LinkedList sessions = new LinkedList<>(); + private final LinkedList loanedSessions = new LinkedList<>(); private final LinkedList queue = new LinkedList<>(); private final String fullName; @@ -1172,11 +1305,11 @@ public PoolState(String fullName, int queryParallelism, double fraction) { } public int getTotalActiveSessions() { - return sessions.size() + initializingSessions.size(); + return sessions.size() + initializingSessions.size() + loanedSessions.size(); } public void update(int queryParallelism, double fraction, - List toKill, EventState e) { + List toKill, EventState e) { this.finalFraction = this.finalFractionRemaining = fraction; this.queryParallelism = queryParallelism; // TODO: two possible improvements @@ -1196,7 +1329,7 @@ public void update(int queryParallelism, double fraction, } public void destroy(List toKill, LinkedList globalQueue, - IdentityHashMap toReuse) { + IdentityHashMap toReuse) { extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill); // All the pending get requests should just be requeued elsewhere. // Note that we never queue session reuse so sessionToReuse would be null. @@ -1215,16 +1348,20 @@ public double updateAllocationPercentages() { return finalFractionRemaining - allocation * initializingSessions.size(); } + public LinkedList getSessions() { + return sessions; + } + @Override public String toString() { return "[" + fullName + ", query parallelism " + queryParallelism - + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " - + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() - + ", initializing sessions " + initializingSessions.size() + "]"; + + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " + + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() + + ", initializing sessions " + initializingSessions.size() + ", loaned sessions " + loanedSessions.size() + "]"; } private void extractAllSessionsToKill(String killReason, - IdentityHashMap toReuse, List toKill) { + IdentityHashMap toReuse, List toKill) { for (WmTezSession sessionToKill : sessions) { resetRemovedSession(sessionToKill, killReason, toReuse); toKill.add(sessionToKill); @@ -1244,7 +1381,7 @@ private void extractAllSessionsToKill(String killReason, } private void resetRemovedSession(WmTezSession sessionToKill, String killReason, - IdentityHashMap toReuse) { + IdentityHashMap toReuse) { assert killReason != null; sessionToKill.setIsIrrelevantForWm(killReason); sessionToKill.clearWm(); @@ -1261,6 +1398,10 @@ public void setTriggers(final LinkedList triggers) { public List getTriggers() { return triggers; } + + public LinkedList getLoanedSessions() { + return loanedSessions; + } } @@ -1298,65 +1439,65 @@ public void onSuccess(WmTezSession session) { try { oldState = state; switch (oldState) { + case GETTING: { + assert this.state == SessionInitState.GETTING; + session.setCurrentPoolName(poolName); + session.setQueueName(yarnQueue); + this.session = session; + this.state = SessionInitState.WAITING_FOR_REGISTRY; + LOG.debug("Received a session from AM pool {}", session); + break; + } + case WAITING_FOR_REGISTRY: { + assert this.session != null; + this.state = SessionInitState.DONE; + assert session == this.session; + future = this.future; + this.future = null; + break; + } + case CANCELED: { + future = this.future; + this.session = null; + this.future = null; + break; + } + default: { + future = this.future; + this.future = null; + break; + } + } + } finally { + lock.unlock(); + } + switch (oldState) { case GETTING: { - LOG.debug("Received a session from AM pool {}", session); - assert this.state == SessionInitState.GETTING; - session.setPoolName(poolName); - session.setQueueName(yarnQueue); - this.session = session; - this.state = SessionInitState.WAITING_FOR_REGISTRY; + ListenableFuture waitFuture = session.waitForAmRegistryAsync( + amRegistryTimeoutMs, timeoutPool); + Futures.addCallback(waitFuture, this); break; } case WAITING_FOR_REGISTRY: { - assert this.session != null; - this.state = SessionInitState.DONE; - assert session == this.session; - future = this.future; - this.future = null; + // Notify the master thread and the user. + future.set(session); + notifyInitializationCompleted(this); break; } case CANCELED: { - future = this.future; - this.session = null; - this.future = null; + // Return session to the pool; we can do it directly here. + future.setException(new HiveException( + "The query was killed by workload management: " + cancelReason)); + session.setCurrentPoolName(null); + session.setClusterFraction(0f); + tezAmPool.returnSession(session); break; } default: { - future = this.future; - this.future = null; - break; + AssertionError error = new AssertionError("Unexpected state " + state); + future.setException(error); + throw error; } - } - } finally { - lock.unlock(); - } - switch (oldState) { - case GETTING: { - ListenableFuture waitFuture = session.waitForAmRegistryAsync( - amRegistryTimeoutMs, timeoutPool); - Futures.addCallback(waitFuture, this); - break; - } - case WAITING_FOR_REGISTRY: { - // Notify the master thread and the user. - future.set(session); - notifyInitializationCompleted(this); - break; - } - case CANCELED: { - // Return session to the pool; we can do it directly here. - future.setException(new HiveException( - "The query was killed by workload management: " + cancelReason)); - session.setPoolName(null); - session.setClusterFraction(0f); - tezAmPool.returnSession(session); - break; - } - default: { - AssertionError error = new AssertionError("Unexpected state " + state); - future.setException(error); - throw error; - } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index 70adc33..6ae7456 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -39,7 +39,7 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, try { // Note: this may just block to wait for a session based on parallelism. TezSessionState result = wm.getSession(session, input, conf); - desiredCounters.addAll(wm.getTriggerCounterNames()); + desiredCounters.addAll(wm.getTriggerCounterNames(result)); return result; } catch (WorkloadManager.NoPoolMappingException ex) { return getUnmanagedSession(session, conf, desiredCounters, isUnmanagedLlapMode); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Action.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Action.java new file mode 100644 index 0000000..921ad54 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Action.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.wm; + +import java.util.Objects; + +/** + * Action that gets invoked for trigger violations. + */ +public class Action { + + public enum Type { + KILL_QUERY("KILL"), + MOVE_TO_POOL("MOVE TO"); + + String displayName; + + Type(final String displayName) { + this.displayName = displayName; + } + + public String getDisplayName() { + return displayName; + } + + @Override + public String toString() { + return displayName; + } + } + + private final Type type; + private final String poolName; + + public static Action fromMetastoreExpression(String metastoreActionExpression) { + if (metastoreActionExpression.equalsIgnoreCase(Type.KILL_QUERY.getDisplayName())) { + return new Action(Type.KILL_QUERY); + } else { + final String poolName = metastoreActionExpression.substring(Type.MOVE_TO_POOL.getDisplayName().length()).trim(); + if (poolName.isEmpty()) { + throw new IllegalArgumentException("Invalid move action expression (" + metastoreActionExpression + "). Pool " + + "name is empty"); + } else { + return new Action(Type.MOVE_TO_POOL, poolName); + } + } + } + + public Action(Type type) { + this(type, null); + } + + public Action(Type type, String poolName) { + this.type = type; + if (type == Type.MOVE_TO_POOL && (poolName == null || poolName.trim().isEmpty())) { + throw new IllegalArgumentException("Pool name cannot be null or empty for action type " + type); + } + this.poolName = poolName; + } + + public Type getType() { + return type; + } + + public String getPoolName() { + return poolName; + } + + @Override + public boolean equals(final Object other) { + if (other == null || !(other instanceof Action)) { + return false; + } + + if (other == this) { + return true; + } + + Action otherAction = (Action) other; + return type == otherAction.type && Objects.equals(poolName, otherAction.poolName); + } + + @Override + public int hashCode() { + int hash = poolName == null ? 31 : 31 * poolName.hashCode(); + hash += type == null ? 31 * hash : 31 * hash * type.hashCode(); + return hash; + } + + @Override + public String toString() { + return type.getDisplayName() + (poolName == null ? "" : " " + poolName); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java index ad1f7fc..fae8dfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/CustomCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,7 +23,7 @@ private String counterName; private long limit; - public CustomCounterLimit(final String counterName, final long limit) { + CustomCounterLimit(final String counterName, final long limit) { this.counterName = counterName; this.limit = limit; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java index 3529011..05b3d3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExecutionTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,8 @@ import java.util.Objects; +import org.apache.hadoop.hive.metastore.api.WMTrigger; + /** * Trigger with query level scope that contains a name, trigger expression violating which defined action will be * executed. @@ -25,6 +27,7 @@ private String name; private Expression expression; private Action action; + private String violationMsg; public ExecutionTrigger(final String name, final Expression expression, final Action action) { this.name = name; @@ -53,6 +56,16 @@ public Trigger clone() { } @Override + public String getViolationMsg() { + return violationMsg; + } + + @Override + public void setViolationMsg(final String violationMsg) { + this.violationMsg = violationMsg; + } + + @Override public boolean apply(final long current) { return expression.evaluate(current); } @@ -85,4 +98,11 @@ public boolean equals(final Object other) { Objects.equals(expression, otherQR.expression) && Objects.equals(action, otherQR.action); } + + public static ExecutionTrigger fromWMTrigger(final WMTrigger trigger) { + final Action action = Action.fromMetastoreExpression(trigger.getActionExpression()); + ExecutionTrigger execTrigger = new ExecutionTrigger(trigger.getTriggerName(), + ExpressionFactory.fromString(trigger.getTriggerExpression()), action); + return execTrigger; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java index add933f..76753488 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Expression.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -47,7 +47,7 @@ public String getSymbol() { * else false otherwise * * @param current - current value against which expression will be evaluated - * @return + * @return true if current value exceeds limit */ boolean evaluate(final long current); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java index 29f7c89..953faa8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/ExpressionFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java index 656747e..e6ea997 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/FileSystemCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,13 +30,13 @@ private FSCounter fsCounter; private long limit; - public FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { + FileSystemCounterLimit(final String scheme, final FSCounter fsCounter, final long limit) { this.scheme = scheme == null || scheme.isEmpty() ? "" : scheme.toUpperCase(); this.fsCounter = fsCounter; this.limit = limit; } - public static FileSystemCounterLimit fromName(final String counterName, final long limit) { + static FileSystemCounterLimit fromName(final String counterName, final long limit) { String counterNameStr = counterName.toUpperCase(); for (FSCounter fsCounter : FSCounter.values()) { if (counterNameStr.endsWith(fsCounter.name())) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java index db1a037..87c007f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreGlobalTriggersFetcher.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ */ package org.apache.hadoop.hive.ql.wm; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -22,21 +23,16 @@ /** * Fetch global (non-llap) rules from metastore */ -public class MetastoreGlobalTriggersFetcher implements TriggersFetcher { - public static final String GLOBAL_TRIGGER_NAME = "global"; - private final MetastoreResourcePlanTriggersFetcher rpTriggersFetcher; +public class MetastoreGlobalTriggersFetcher { + private static final String GLOBAL_TRIGGER_NAME = "global"; + private Hive db; public MetastoreGlobalTriggersFetcher(final Hive db) { - this.rpTriggersFetcher = new MetastoreResourcePlanTriggersFetcher(db); - } - - @Override - public List fetch(final String ignore) { - return fetch(); + this.db = db; } public List fetch() { - // TODO: - return rpTriggersFetcher.fetch(GLOBAL_TRIGGER_NAME); + // TODO: this entire class will go away, DDLTask will push RP to TezSessionPoolManager where triggers are available + return new ArrayList<>(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java deleted file mode 100644 index db390f2..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/MetastoreResourcePlanTriggersFetcher.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.Hive; - -/** - * Fetch pool specific rules from metastore - */ -public class MetastoreResourcePlanTriggersFetcher implements TriggersFetcher { - private final Hive db; - - public MetastoreResourcePlanTriggersFetcher(final Hive db) { - this.db = db; - } - - @Override - public List fetch(final String resourcePlanName) { - // TODO: implement after integration. - return new ArrayList<>(); - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 408aa2d..06562cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,9 +15,6 @@ */ package org.apache.hadoop.hive.ql.wm; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; @@ -26,31 +23,27 @@ * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List openSessions = new ArrayList<>(); - private List activeTriggers = new ArrayList<>(); - - public SessionTriggerProvider() { - - } + private List sessions; + private List triggers; public SessionTriggerProvider(final List openSessions, final List triggers) { - this.openSessions = openSessions; - this.activeTriggers = triggers; + this.sessions = openSessions; + this.triggers = triggers; } - public void setOpenSessions(final List openSessions) { - this.openSessions = openSessions; + public List getSessions() { + return sessions; } - public void setActiveTriggers(final List activeTriggers) { - this.activeTriggers = activeTriggers; + public List getTriggers() { + return triggers; } - public List getOpenSessions() { - return Collections.unmodifiableList(openSessions); + public void setSessions(final List sessions) { + this.sessions = sessions; } - public List getActiveTriggers() { - return Collections.unmodifiableList(activeTriggers); + public void setTriggers(final List triggers) { + this.triggers = triggers; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java index 3c16e1d..89e61b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TimeCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,7 +27,7 @@ private TimeCounter timeCounter; private long limit; - public TimeCounterLimit(final TimeCounter timeCounter, final long limit) { + TimeCounterLimit(final TimeCounter timeCounter, final long limit) { this.timeCounter = timeCounter; this.limit = limit; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java index 6299f2b..d56a88a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/Trigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,35 +22,6 @@ */ public interface Trigger { - enum Action { - KILL_QUERY(""), - MOVE_TO_POOL(""); - - String poolName; - String msg; - - Action(final String poolName) { - this.poolName = poolName; - } - - public Action setPoolName(final String poolName) { - this.poolName = poolName; - return this; - } - - public String getPoolName() { - return poolName; - } - - public String getMsg() { - return msg; - } - - public void setMsg(final String msg) { - this.msg = msg; - } - } - /** * Based on current value, returns true if trigger is applied else false. * @@ -86,4 +57,18 @@ public void setMsg(final String msg) { * @return clone copy */ Trigger clone(); + + /** + * Get trigger violation message + * + * @return trigger violation message + */ + String getViolationMsg(); + + /** + * Set trigger violation message + * + * @param violationMsg violation message + */ + void setViolationMsg(String violationMsg); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java index 5cd24d5..8b142da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerActionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,5 +28,5 @@ * * @param queriesViolated - violated queries and the rule it violated */ - void applyAction(Map queriesViolated); + void applyAction(Map queriesViolated); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java index a2c7a51..16072c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerContext.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,6 @@ import java.util.Map; import java.util.Set; -import org.apache.hadoop.hive.ql.QueryInfo; - /** * Some context information that are required for rule evaluation. */ @@ -46,14 +44,6 @@ public void setQueryId(final String queryId) { this.queryId = queryId; } - public long getQueryStartTime() { - return queryStartTime; - } - - public void setQueryStartTime(final long queryStartTime) { - this.queryStartTime = queryStartTime; - } - public Set getDesiredCounters() { return desiredCounters; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java index 065ab79..ea25b68 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggerExpression.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java deleted file mode 100644 index c25ea3c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/TriggersFetcher.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.wm; - -import java.util.List; - -/** - * Interface to fetch rules - */ -public interface TriggersFetcher { - List fetch(final String resourcePlanName); -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java index dd19ce6..7d6482a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/VertexCounterLimit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,7 +26,7 @@ private VertexCounter vertexCounter; private long limit; - public VertexCounterLimit(final VertexCounter vertexCounter, final long limit) { + VertexCounterLimit(final VertexCounter vertexCounter, final long limit) { this.vertexCounter = vertexCounter; this.limit = limit; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 84a35cc..5aaa8ff 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,32 +19,45 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.*; +import static org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; -import org.apache.hadoop.hive.metastore.api.WMResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMMapping; -import org.apache.hadoop.hive.metastore.api.WMPool; -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; - -import com.google.common.util.concurrent.SettableFuture; - -import com.google.common.collect.Lists; import java.lang.Thread.State; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; +import org.apache.hadoop.hive.metastore.api.WMMapping; +import org.apache.hadoop.hive.metastore.api.WMPool; +import org.apache.hadoop.hive.metastore.api.WMResourcePlan; +import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.tez.dag.api.TezConfiguration; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.SettableFuture; + public class TestWorkloadManager { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); @@ -419,18 +432,18 @@ public void testReuseWithDifferentPool() throws Exception { wm.start(); WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); - assertEquals("A", sessionA1.getPoolName()); + assertEquals("A", sessionA1.getCurrentPoolName()); assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); - assertEquals("A", sessionA2.getPoolName()); + assertEquals("A", sessionA2.getCurrentPoolName()); assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf); assertSame(sessionA1, sessionB1); - assertEquals("B", sessionB1.getPoolName()); + assertEquals("B", sessionB1.getCurrentPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A. // Make sure that we can still get a session from A. WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); - assertEquals("A", sessionA3.getPoolName()); + assertEquals("A", sessionA3.getCurrentPoolName()); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON); sessionA3.returnToSessionManager(); @@ -450,7 +463,7 @@ public void testApplyPlanUserMapping() throws Exception { // One session will be running, the other will be queued in "A" WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf); - assertEquals("A", sessionA1.getPoolName()); + assertEquals("A", sessionA1.getCurrentPoolName()); assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON); final AtomicReference sessionA2 = new AtomicReference<>(); final AtomicReference error = new AtomicReference<>(); @@ -470,12 +483,12 @@ public void testApplyPlanUserMapping() throws Exception { t1.join(); checkError(error); assertNotNull(sessionA2.get()); - assertEquals("B", sessionA2.get().getPoolName()); + assertEquals("B", sessionA2.get().getCurrentPoolName()); assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON); // The new session will also go to B now. sessionA2.get().returnToSessionManager(); WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf); - assertEquals("B", sessionB1.getPoolName()); + assertEquals("B", sessionB1.getCurrentPoolName()); assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); sessionA1.returnToSessionManager(); sessionB1.returnToSessionManager(); @@ -530,9 +543,9 @@ public void testApplyPlanQpChanges() throws Exception { checkError(error); assertNotNull(sessionA2.get()); assertNotNull(sessionD2.get()); - assertEquals("D", sessionD2.get().getPoolName()); - assertEquals("B", sessionA2.get().getPoolName()); - assertEquals("C", sessionC1.getPoolName()); + assertEquals("D", sessionD2.get().getCurrentPoolName()); + assertEquals("B", sessionA2.get().getCurrentPoolName()); + assertEquals("C", sessionC1.getCurrentPoolName()); assertEquals(0.3f, sessionA2.get().getClusterFraction(), EPSILON); assertEquals(0.2f, sessionC1.getClusterFraction(), EPSILON); assertEquals(0.25f, sessionD1.getClusterFraction(), EPSILON); @@ -578,7 +591,7 @@ public void testAmPoolInteractions() throws Exception { pool.replaceSession(oob, false, null); t1.join(); assertNotNull(sessionA1.get()); - assertEquals("A", sessionA1.get().getPoolName()); + assertEquals("A", sessionA1.get().getCurrentPoolName()); // Increase qp, check that the pool grows. plan = new WMFullResourcePlan(plan(), Lists.newArrayList( @@ -623,6 +636,257 @@ public void testAmPoolInteractions() throws Exception { assertEquals(2, pool.getCurrentSize()); } + @Test(timeout=20000) + public void testMoveSessions() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList( + pool("A", 2, 0.6f), pool("B", 1, 0.4f))); + plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf), + sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf); + + // [A: 2, B: 1] + Map allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(2, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionB1)); + assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON); + assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); + int aSessionCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("A").getSessions()) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getCurrentPoolName()); + assertNull(((WmTezSession) wmTezSession).getOriginPoolName()); + aSessionCount++; + } + assertEquals(2, aSessionCount); + + // [A: 1, B: 2] + Future future = wm.applyMoveSessionAsync(sessionA1, "B"); + assertNotNull(future.get()); + assertTrue(future.get()); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(2, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionB1)); + assertEquals(0.2f, sessionA1.getClusterFraction(), EPSILON); + assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.2f, sessionB1.getClusterFraction(), EPSILON); + int movedCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("B").getSessions()) { + if (wmTezSession == sessionA1) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getOriginPoolName()); + assertEquals("B", ((WmTezSession) wmTezSession).getCurrentPoolName()); + movedCount++; + } + } + assertEquals(1, movedCount); + + // [A: 0, B: 3] + future = wm.applyMoveSessionAsync(sessionA2, "B"); + assertNotNull(future.get()); + assertTrue(future.get()); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(3, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA2)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionB1)); + assertEquals(0.133f, sessionA1.getClusterFraction(), EPSILON); + assertEquals(0.133f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.133f, sessionB1.getClusterFraction(), EPSILON); + movedCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("B").getSessions()) { + if (wmTezSession == sessionA1 || wmTezSession == sessionA2) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getOriginPoolName()); + assertEquals("B", ((WmTezSession) wmTezSession).getCurrentPoolName()); + movedCount++; + } + } + assertEquals(2, movedCount); + + // get back the loaned session + // [A: 1, B: 2] + WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(2, allSessionProviders.get("B").getSessions().size()); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionB1)); + assertEquals(0.0f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.2f, sessionA1.getClusterFraction(), EPSILON); + assertEquals(0.6f, sessionA3.getClusterFraction(), EPSILON); + assertEquals(0.2f, sessionB1.getClusterFraction(), EPSILON); + assertFalse(sessionA2.isOpen()); + movedCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("A").getSessions()) { + if (wmTezSession == sessionA3) { + assertTrue(wmTezSession instanceof WmTezSession); + assertNull(((WmTezSession) wmTezSession).getOriginPoolName()); + assertEquals("A", ((WmTezSession) wmTezSession).getCurrentPoolName()); + movedCount++; + } + } + + // return a loaned session goes back to tez am pool + // [A: 1, B: 1] + wm.returnAfterUse(sessionA1); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(1, allSessionProviders.get("B").getSessions().size()); + assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3)); + assertTrue(allSessionProviders.get("B").getSessions().contains(sessionB1)); + assertEquals(0.0f, sessionA2.getClusterFraction(), EPSILON); + assertEquals(0.0f, sessionA1.getClusterFraction(), EPSILON); + assertEquals(0.6f, sessionA3.getClusterFraction(), EPSILON); + assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON); + } + + @Test(timeout=20000) + public void testMoveSessionsMultiPool() throws Exception { + final HiveConf conf = createConf(); + MockQam qam = new MockQam(); + WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList( + pool("A", 2, 0.4f), pool("B", 2, 0.4f), pool("B.x", 1, 0.2f), + pool("B.y", 1, 0.8f), pool("C", 1, 0.2f))); + plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B"), mapping("C", "C"))); + final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); + wm.start(); + + WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); + + // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0] + Map allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(1, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1)); + int aSessionCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("A").getSessions()) { + if (wmTezSession == sessionA1) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getCurrentPoolName()); + assertNull(((WmTezSession) wmTezSession).getOriginPoolName()); + aSessionCount++; + } + } + assertEquals(1, aSessionCount); + + // [A: 0, B: 1, B.x: 0, B.y: 0, C: 0] + Future future = wm.applyMoveSessionAsync(sessionA1, "B.y"); + assertNotNull(future.get()); + assertTrue(future.get()); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(1, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.32f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("B.y").getSessions().contains(sessionA1)); + int movedCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("B.y").getSessions()) { + if (wmTezSession == sessionA1) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getOriginPoolName()); + assertEquals("B.y", ((WmTezSession) wmTezSession).getCurrentPoolName()); + movedCount++; + } + } + assertEquals(1, movedCount); + + // [A: 0, B: 0, B.x: 0, B.y: 0, C: 1] + future = wm.applyMoveSessionAsync(sessionA1, "C"); + assertNotNull(future.get()); + assertTrue(future.get()); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(1, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.2f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("C").getSessions().contains(sessionA1)); + movedCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("C").getSessions()) { + if (wmTezSession == sessionA1) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getOriginPoolName()); + assertEquals("C", ((WmTezSession) wmTezSession).getCurrentPoolName()); + movedCount++; + } + } + assertEquals(1, movedCount); + + // [A: 0, B: 0, B.x: 1, B.y: 0, C: 0] + future = wm.applyMoveSessionAsync(sessionA1, "B.x"); + assertNotNull(future.get()); + assertTrue(future.get()); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(1, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.08f, sessionA1.getClusterFraction(), EPSILON); + assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1)); + movedCount = 0; + for (TezSessionState wmTezSession : allSessionProviders.get("B.x").getSessions()) { + if (wmTezSession == sessionA1) { + assertTrue(wmTezSession instanceof WmTezSession); + assertEquals("A", ((WmTezSession) wmTezSession).getOriginPoolName()); + assertEquals("B.x", ((WmTezSession) wmTezSession).getCurrentPoolName()); + movedCount++; + } + } + assertEquals(1, movedCount); + + // return a loaned session goes back to tez am pool + // [A: 0, B: 0, B.x: 0, B.y: 0, C: 0] + wm.returnAfterUse(sessionA1); + // wait sometime for cluster fraction to happen in async + Thread.sleep(2000); + allSessionProviders = wm.getAllSessionTriggerProviders(); + assertEquals(0, allSessionProviders.get("A").getSessions().size()); + assertEquals(0, allSessionProviders.get("B").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.x").getSessions().size()); + assertEquals(0, allSessionProviders.get("B.y").getSessions().size()); + assertEquals(0, allSessionProviders.get("C").getSessions().size()); + assertEquals(0.0f, sessionA1.getClusterFraction(), EPSILON); + assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1)); + } + @Test(timeout=10000) public void testAsyncSessionInitFailures() throws Exception { final HiveConf conf = createConf(); @@ -726,19 +990,19 @@ private SampleTezSessionState validatePoolAfterCleanup( SampleTezSessionState theOnlySession = (SampleTezSessionState) pool.getSession(); assertNotNull(theOnlySession); theOnlySession.setWaitForAmRegistryFuture(null); - assertNull(oldSession.getPoolName()); + assertNull(oldSession.getCurrentPoolName()); assertEquals(0f, oldSession.getClusterFraction(), EPSILON); pool.returnSession(theOnlySession); // Make sure we can actually get a session still - parallelism/etc. should not be affected. WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf); - assertEquals(sessionPoolName, result.getPoolName()); + assertEquals(sessionPoolName, result.getCurrentPoolName()); assertEquals(1f, result.getClusterFraction(), EPSILON); result.returnToSessionManager(); return theOnlySession; } private void assertKilledByWm(WmTezSession session) { - assertNull(session.getPoolName()); + assertNull(session.getCurrentPoolName()); assertEquals(0f, session.getClusterFraction(), EPSILON); assertTrue(session.isIrrelevantForWm()); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java index cd78545..b686783 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/wm/TestTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,9 +23,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -/** - * - */ public class TestTrigger { @org.junit.Rule public ExpectedException thrown = ExpectedException.none(); @@ -34,55 +31,55 @@ public void testSimpleQueryTrigger() { Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); - Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: HDFS_BYTES_READ limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); - trigger = new ExecutionTrigger("hdfs_write_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("hdfs_write_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: HDFS_BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); - trigger = new ExecutionTrigger("local_read_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("local_read_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: BYTES_READ limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", FileSystemCounterLimit.FSCounter.BYTES_WRITTEN, 1024)); - trigger = new ExecutionTrigger("local_write_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("local_write_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: BYTES_WRITTEN limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("", FileSystemCounterLimit.FSCounter.SHUFFLE_BYTES, 1024)); - trigger = new ExecutionTrigger("shuffle_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("shuffle_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: SHUFFLE_BYTES limit: 1024", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(1025)); expression = ExpressionFactory.createExpression(new TimeCounterLimit(TimeCounterLimit.TimeCounter .EXECUTION_TIME, 10000)); - trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.MOVE_TO_POOL.setPoolName("fake_pool")); + trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.MOVE_TO_POOL,"fake_pool")); assertEquals("counter: EXECUTION_TIME limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); expression = ExpressionFactory.createExpression(new VertexCounterLimit(VertexCounterLimit.VertexCounter .TOTAL_TASKS,10000)); - trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("highly_parallel", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: TOTAL_TASKS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); expression = ExpressionFactory.createExpression(new CustomCounterLimit("HDFS_WRITE_OPS",10000)); - trigger = new ExecutionTrigger("write_heavy", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("write_heavy", expression, new Action(Action.Type.KILL_QUERY)); assertEquals("counter: HDFS_WRITE_OPS limit: 10000", expression.getCounterLimit().toString()); assertFalse(trigger.apply(1000)); assertTrue(trigger.apply(100000)); @@ -293,10 +290,21 @@ public void testIllegalTimeCounterValue2() { } @Test + public void testActionFromMetastoreStr() { + assertEquals(Action.Type.KILL_QUERY, Action.fromMetastoreExpression("KILL").getType()); + assertEquals(Action.Type.MOVE_TO_POOL, Action.fromMetastoreExpression("MOVE TO bi").getType()); + assertEquals("MOVE TO etl", Action.fromMetastoreExpression("MOVE TO etl").toString()); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid move action expression (MOVE TO ). Pool name is empty"); + assertEquals(Action.Type.MOVE_TO_POOL, Action.fromMetastoreExpression("MOVE TO ").getType()); + } + + @Test public void testTriggerClone() { Expression expression = ExpressionFactory.createExpression(new FileSystemCounterLimit("hdfs", FileSystemCounterLimit.FSCounter.BYTES_READ, 1024)); - Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, Trigger.Action.KILL_QUERY); + Trigger trigger = new ExecutionTrigger("hdfs_read_heavy", expression, new Action(Action.Type.KILL_QUERY)); Trigger clonedTrigger = trigger.clone(); assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger)); assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression())); @@ -306,7 +314,7 @@ public void testTriggerClone() { assertEquals(trigger.hashCode(), clonedTrigger.hashCode()); expression = ExpressionFactory.fromString(" ELAPSED_TIME > 300"); - trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + trigger = new ExecutionTrigger("slow_query", expression, new Action(Action.Type.KILL_QUERY)); clonedTrigger = trigger.clone(); assertNotEquals(System.identityHashCode(trigger), System.identityHashCode(clonedTrigger)); assertNotEquals(System.identityHashCode(trigger.getExpression()), System.identityHashCode(clonedTrigger.getExpression()));