diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a6ecb37..ddf2085 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3422,6 +3422,13 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Interval for validating triggers during execution of a query. Triggers defined in resource plan will get\n" + "validated for all SQL operations after every defined interval (default: 500ms) and corresponding action\n" + "defined in the trigger will be taken"), + HIVE_RESOURCE_PLAN_MAX_POOL_SIZE("hive.resource.plan.max.pool.size", 10, + "Maximum number of pools in a resource plan"), + HIVE_TRIGGER_ACTION_PRIORITIZE_KILL("hive.trigger.action.prioritize.kill", true, + "When a query violates multiple triggers, one with KILL QUERY action and another with MOVE TO different pool\n" + + "action, setting this flag to true will make KILL action take priority over MOVE. If this setting is false\n" + + "MOVE TO different pool action will be tried (applicable only for managed pools) as long as move happens\n" + + "to a different pool"), SPARK_USE_OP_STATS("hive.spark.use.op.stats", true, "Whether to use operator stats to determine reducer parallelism for Hive on Spark.\n" + diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTriggersJdbcTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTriggersJdbcTest.java new file mode 100644 index 0000000..1ba256a --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractTriggersJdbcTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public abstract class AbstractTriggersJdbcTest { + protected static MiniHS2 miniHS2 = null; + protected static String dataFileDir; + static Path kvDataFilePath; + protected static String tableName = "testtab1"; + + protected static HiveConf conf = null; + protected Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + protected void runQueryWithTrigger(final String query, final List setCmds, + final String expect) + throws Exception { + + Connection con = hs2Conn; + TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + selStmt.execute(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + if (expect == null) { + assertNull("Expected query to succeed", throwable[0]); + } else { + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + } + + abstract void setupTriggers(final List triggers) throws Exception; + + protected List getConfigs(String... more) { + List setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); + setCmds.add("set mapred.min.split.size=100"); + setCmds.add("set mapred.max.split.size=100"); + setCmds.add("set tez.grouping.min-size=100"); + setCmds.add("set tez.grouping.max-size=100"); + if (more != null) { + setCmds.addAll(Arrays.asList(more)); + } + return setCmds; + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersActionPriorityKill.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersActionPriorityKill.java new file mode 100644 index 0000000..7d21cc6 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersActionPriorityKill.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.io.File; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestTriggersActionPriorityKill extends AbstractTriggersJdbcTest { + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + conf.setBoolean("hive.test.workload.management", true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + conf.setBoolVar(ConfVars.HIVE_TRIGGER_ACTION_PRIORITIZE_KILL, true); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Test(timeout = 60000) + public void testTriggerSlowQueryPriorityKill() throws Exception { + Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", expression, Trigger.Action.KILL_QUERY); + Trigger moveTrigger = new ExecutionTrigger("slow_query_move", expression, + Trigger.Action.MOVE_TO_POOL.setPoolName("r2")); + setupTriggers(Lists.newArrayList(killTrigger, moveTrigger)); + String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); + } + + @Override + protected void setupTriggers(final List triggers) throws Exception { + WorkloadManager wm = WorkloadManager.getInstance(); + Map poolStateMap = wm.getPools(); + WorkloadManager.PoolState originalPoolState = poolStateMap.get("llap"); + originalPoolState.setTriggers(triggers); + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersActionPriorityMove.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersActionPriorityMove.java new file mode 100644 index 0000000..e405a99 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersActionPriorityMove.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.io.File; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestTriggersActionPriorityMove extends AbstractTriggersJdbcTest { + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE, "default"); + conf.setBoolean("hive.test.workload.management", true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + conf.setBoolVar(ConfVars.HIVE_TRIGGER_ACTION_PRIORITIZE_KILL, false); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Test(timeout = 60000) + public void testTriggerSlowQueryPriorityMove() throws Exception { + Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Trigger killTrigger = new ExecutionTrigger("slow_query_kill", expression, Trigger.Action.KILL_QUERY); + // FIXME: this pool 'r2' does not exist. Will have to validate if the query actually moved to different after + // HIVE-17841 and commands for creating pools (easier that way else new RP has to be injected during WM init) + Trigger moveTrigger = new ExecutionTrigger("slow_query_move", expression, + Trigger.Action.MOVE_TO_POOL.setPoolName("r2")); + setupTriggers(Lists.newArrayList(killTrigger, moveTrigger)); + String query = "select t1.under_col, t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), null); + } + + @Override + protected void setupTriggers(final List triggers) throws Exception { + WorkloadManager wm = WorkloadManager.getInstance(); + Map poolStateMap = wm.getPools(); + WorkloadManager.PoolState originalPoolState = poolStateMap.get("llap"); + originalPoolState.setTriggers(triggers); + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index c227a63..39497f7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -16,100 +16,22 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.File; -import java.net.URL; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; -import org.apache.hive.jdbc.miniHS2.MiniHS2; -import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Lists; -public class TestTriggersTezSessionPoolManager { - protected static MiniHS2 miniHS2 = null; - protected static String dataFileDir; - static Path kvDataFilePath; - private static String tableName = "testtab1"; - - protected static HiveConf conf = null; - protected Connection hs2Conn = null; - - @BeforeClass - public static void beforeTest() throws Exception { - Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); - - conf = new HiveConf(); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); - conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); - conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); - conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); - // don't want cache hits from llap io for testing filesystem bytes read counters - conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); - kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - - Map confOverlay = new HashMap<>(); - miniHS2.start(confOverlay); - miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); - } - - @Before - public void setUp() throws Exception { - hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } - - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } - - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } +public class TestTriggersTezSessionPoolManager extends AbstractTriggersJdbcTest { @Test(timeout = 60000) public void testTriggerSlowQueryElapsedTime() throws Exception { @@ -131,14 +53,14 @@ public void testTriggerSlowQueryExecutionTime() throws Exception { runQueryWithTrigger(query, null, "Query was cancelled"); } - @Test(timeout = 60000) + @Test(timeout = 90000) public void testTriggerHighShuffleBytes() throws Exception { Expression expression = ExpressionFactory.fromString("SHUFFLE_BYTES > 100"); Trigger trigger = new ExecutionTrigger("big_shuffle", expression, Trigger.Action.KILL_QUERY); setupTriggers(Lists.newArrayList(trigger)); - String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + - " t2 on t1.under_col>=t2.under_col"; - runQueryWithTrigger(query, null, "Query was cancelled"); + String query = "select sleep(t1.under_col, 10), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col order by t1.under_col"; + runQueryWithTrigger(query, getConfigs(), "Query was cancelled"); } @Test(timeout = 60000) @@ -195,67 +117,10 @@ public void testMultipleTriggers2() throws Exception { runQueryWithTrigger(query, null, "Query was cancelled"); } - private void createSleepUDF() throws SQLException { - String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); - Connection con = hs2Conn; - Statement stmt = con.createStatement(); - stmt.execute("create temporary function sleep as '" + udfName + "'"); - stmt.close(); - } - - private void runQueryWithTrigger(final String query, final List setCmds, - final String expect) - throws Exception { - - Connection con = hs2Conn; - TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); - createSleepUDF(); - - final Statement selStmt = con.createStatement(); - final Throwable[] throwable = new Throwable[1]; - Thread queryThread = new Thread(() -> { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } - } - selStmt.execute(query); - } catch (SQLException e) { - throwable[0] = e; - } - }); - queryThread.start(); - - queryThread.join(); - selStmt.close(); - - if (expect == null) { - assertNull("Expected query to succeed", throwable[0]); - } else { - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); - } - } - - protected void setupTriggers(final List triggers) throws Exception { + @Override + void setupTriggers(final List triggers) throws Exception { MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); when(triggersFetcher.fetch()).thenReturn(triggers); TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); } - - private List getConfigs(String... more) { - List setCmds = new ArrayList<>(); - setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); - setCmds.add("set mapred.min.split.size=100"); - setCmds.add("set mapred.max.split.size=100"); - setCmds.add("set tez.grouping.min-size=100"); - setCmds.add("set tez.grouping.max-size=100"); - if (more != null) { - setCmds.addAll(Arrays.asList(more)); - } - return setCmds; - } } \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java index 8d7693d..3519894 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java @@ -16,9 +16,6 @@ package org.apache.hive.jdbc; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - import java.io.File; import java.net.URL; import java.util.HashMap; @@ -57,6 +54,7 @@ public static void beforeTest() throws Exception { conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); // don't want cache hits from llap io for testing filesystem bytes read counters conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + conf.setBoolVar(ConfVars.HIVE_TRIGGER_ACTION_PRIORITIZE_KILL, false); conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); @@ -73,8 +71,8 @@ public static void beforeTest() throws Exception { @Override protected void setupTriggers(final List triggers) throws Exception { WorkloadManager wm = WorkloadManager.getInstance(); - WorkloadManager.PoolState poolState = spy(new WorkloadManager.PoolState("llap", 1, 1f)); - when(poolState.getTriggers()).thenReturn(triggers); - wm.getPools().put("llap", poolState); + Map poolStateMap = wm.getPools(); + WorkloadManager.PoolState originalPoolState = poolStateMap.get("llap"); + originalPoolState.setTriggers(triggers); } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java new file mode 100644 index 0000000..5278dad --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/PerPoolTriggerValidatorRunnable.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PerPoolTriggerValidatorRunnable implements Runnable { + protected static transient Logger LOG = LoggerFactory.getLogger(PerPoolTriggerValidatorRunnable.class); + private final WorkloadManager wm; + private final TriggerActionHandler triggerActionHandler; + private final ScheduledExecutorService validatorExecutorService; + private final Map poolValidators; + private final long triggerValidationIntervalMs; + private final boolean prioritizeKill; + + PerPoolTriggerValidatorRunnable(final WorkloadManager wm, final TriggerActionHandler triggerActionHandler, + final long triggerValidationIntervalMs, final int maxPoolSize, final boolean prioritizeKill) { + this.wm = wm; + this.triggerActionHandler = triggerActionHandler; + this.validatorExecutorService = Executors.newScheduledThreadPool(maxPoolSize); + this.poolValidators = new HashMap<>(); + this.triggerValidationIntervalMs = triggerValidationIntervalMs; + this.prioritizeKill = prioritizeKill; + } + + @Override + public void run() { + try { + Map poolStates = wm.getPools(); + for (Map.Entry entry : poolStates.entrySet()) { + String poolName = entry.getKey(); + WorkloadManager.PoolState poolState = entry.getValue(); + if (!poolValidators.containsKey(poolName)) { + LOG.info("Creating trigger validator for pool: {}", poolName); + TriggerValidatorRunnable poolValidator = new TriggerValidatorRunnable(poolState, triggerActionHandler, + prioritizeKill); + validatorExecutorService.scheduleWithFixedDelay(poolValidator, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + poolValidators.put(poolName, poolValidator); + } + } + } catch (Throwable t) { + // if exception is thrown in scheduled tasks, no further tasks will be scheduled, hence this ugly catch + LOG.warn(PerPoolTriggerValidatorRunnable.class.getSimpleName() + " caught exception.", t); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 9b4714f..e601733 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -129,13 +130,14 @@ public void setupPool(HiveConf conf) throws Exception { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); + long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); sessionTriggerProvider = new SessionTriggerProvider(); triggerActionHandler = new KillTriggerActionHandler(); - // TODO: update runnable to handle per pool validation - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + triggerValidatorRunnable = new TriggerValidatorRunnable(sessionTriggerProvider, triggerActionHandler, false); Hive db = Hive.get(conf); globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - startTriggerValidator(conf); + startTriggerValidator(triggerValidationIntervalMs); String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); @@ -345,16 +347,6 @@ public void destroy(TezSessionState tezSessionState) throws Exception { } @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; - } - - @Override TriggerValidatorRunnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -488,8 +480,8 @@ public void registerOpenSession(TezSessionPoolSession session) { private void updateSessionsTriggers() { if (sessionTriggerProvider != null && globalTriggersFetcher != null) { - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); + sessionTriggerProvider.setSessions(Collections.unmodifiableList(openSessions)); + sessionTriggerProvider.setTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); } } @@ -520,7 +512,7 @@ public void setGlobalTriggersFetcher(MetastoreGlobalTriggersFetcher metastoreGlo public List getTriggerCounterNames() { List counterNames = new ArrayList<>(); if (sessionTriggerProvider != null) { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); + List activeTriggers = sessionTriggerProvider.getTriggers(); for (Trigger trigger : activeTriggers) { counterNames.add(trigger.getExpression().getCounterLimit().getName()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 6135223..7f484cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -67,18 +67,12 @@ TezSessionState reopen(TezSessionState session, Configuration conf, } public static abstract class AbstractTriggerValidator { - abstract SessionTriggerProvider getSessionTriggerProvider(); + abstract Runnable getTriggerValidatorRunnable(); - abstract TriggerActionHandler getTriggerActionHandler(); - - abstract TriggerValidatorRunnable getTriggerValidatorRunnable(); - - public void startTriggerValidator(Configuration conf) { - long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, - HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + public void startTriggerValidator(long triggerValidationIntervalMs) { final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - TriggerValidatorRunnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, triggerValidationIntervalMs, TimeUnit.MILLISECONDS); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index c3a2a2b..0ee553f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -162,7 +162,7 @@ public int execute(DriverContext driverContext) { // TODO: in future, we may also pass getUserIpAddress. // Note: for now this will just block to wait for a session based on parallelism. session = wm.getSession(session, ss.getUserName(), conf); - desiredCounters.addAll(wm.getTriggerCounterNames()); + desiredCounters.addAll(wm.getTriggerCounterNames(session)); } else { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, getWork().getLlapMode()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 42cb3d8..196d678 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -31,19 +31,23 @@ protected static transient Logger LOG = LoggerFactory.getLogger(TriggerValidatorRunnable.class); private final SessionTriggerProvider sessionTriggerProvider; private final TriggerActionHandler triggerActionHandler; + private final boolean prioritizeKill; + private final boolean prioritizeMove; TriggerValidatorRunnable(final SessionTriggerProvider sessionTriggerProvider, - final TriggerActionHandler triggerActionHandler) { + final TriggerActionHandler triggerActionHandler, final boolean prioritizeKill) { this.sessionTriggerProvider = sessionTriggerProvider; this.triggerActionHandler = triggerActionHandler; + this.prioritizeKill = prioritizeKill; + this.prioritizeMove = !prioritizeKill; } @Override public void run() { try { Map violatedSessions = new HashMap<>(); - final List sessions = sessionTriggerProvider.getOpenSessions(); - final List triggers = sessionTriggerProvider.getActiveTriggers(); + final List sessions = sessionTriggerProvider.getSessions(); + final List triggers = sessionTriggerProvider.getTriggers(); for (TezSessionState s : sessions) { TriggerContext triggerContext = s.getTriggerContext(); if (triggerContext != null) { @@ -54,8 +58,37 @@ public void run() { if (currentCounters.containsKey(desiredCounter)) { if (t.apply(currentCounters.get(desiredCounter))) { String queryId = s.getTriggerContext().getQueryId(); - LOG.info("Query {} violated trigger {}. Going to apply action {}", queryId, t, t.getAction()); - violatedSessions.put(s, t.getAction()); + Trigger.Action currentAction = t.getAction(); + if (violatedSessions.containsKey(s)) { + // session already has a violation + Trigger.Action existingAction = violatedSessions.get(s); + if (existingAction.name().equals(Trigger.Action.KILL_QUERY.name()) && + currentAction.name().equals(Trigger.Action.MOVE_TO_POOL.name()) && prioritizeMove) { + // [KILL, MOVE] -> [MOVE] + violatedSessions.put(s, currentAction); + LOG.info("Query {} violated trigger {}. Going to apply action {} over {} as kill is not " + + "prioritized", queryId, t, currentAction, existingAction); + } else if (existingAction.name().equals(Trigger.Action.MOVE_TO_POOL.name()) && + currentAction.name().equals(Trigger.Action.KILL_QUERY.name()) && prioritizeKill) { + // [MOVE, KILL] -> [KILL] + // we are prioritizing kill here over move + violatedSessions.put(s, currentAction); + LOG.info("Query {} violated trigger {}. Going to apply action {} over {} as kill is " + + "prioritized", queryId, t, currentAction, existingAction); + } else if (existingAction.name().equals(currentAction.name())) { + // [MOVE, MOVE'] -> [MOVE'] + // latest move takes precedence over old. ideally, multiple triggers should be spaced out + // properly in time, say >5 mins MOVE TO etl, >1 hr MOVE TO blackhole, > 5hr KILL. + // If both are kills, it doesn't matter. For non-llap, MOVE cannot be there. + violatedSessions.put(s, currentAction); + LOG.info("Query {} violated trigger {}. Going to apply action {} over {}", queryId, t, + currentAction, existingAction); + } + } else { + // first violation for the session + violatedSessions.put(s, currentAction); + LOG.info("Query {} violated trigger {}. Going to apply action {}", queryId, t, currentAction); + } } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java index ce9147f..ab58632 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerViolationActionHandler.java @@ -26,13 +26,18 @@ public class TriggerViolationActionHandler implements TriggerActionHandler { private static final Logger LOG = LoggerFactory.getLogger(TriggerViolationActionHandler.class); + private final WorkloadManager wm; + public TriggerViolationActionHandler(final WorkloadManager wm) { + this.wm = wm; + } @Override public void applyAction(final Map queriesViolated) { + TezSessionState sessionState; for (Map.Entry entry : queriesViolated.entrySet()) { switch (entry.getValue()) { case KILL_QUERY: - TezSessionState sessionState = entry.getKey(); + sessionState = entry.getKey(); String queryId = sessionState.getTriggerContext().getQueryId(); try { sessionState.getKillQuery().killQuery(queryId); @@ -40,6 +45,21 @@ public void applyAction(final Map queriesViolat LOG.warn("Unable to kill query {} for trigger violation"); } break; + case MOVE_TO_POOL: + sessionState = entry.getKey(); + if (sessionState instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) sessionState; + String destPoolName = entry.getValue().getPoolName(); + String srcPoolName = wmTezSession.getPoolName(); + if (!srcPoolName.equalsIgnoreCase(destPoolName)) { + LOG.warn("Moving session {} from {} pool to {} pool", wmTezSession.getSessionId(), srcPoolName, destPoolName); + wm.redistributePoolAllocations(destPoolName, wmTezSession, null, false); + } else { + LOG.warn("Ignoring moving session {} to same pool. srcPoolName: {} destPoolName: {}", + wmTezSession.getSessionId(), srcPoolName, destPoolName); + } + } + break; default: throw new RuntimeException("Unsupported action: " + entry.getValue()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index d725e90..786f989 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -20,16 +20,17 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -77,11 +78,28 @@ private int internalPoolsVersion; private UserPoolMapping userPoolMapping; - private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; - private TriggerValidatorRunnable triggerValidatorRunnable; + private PerPoolTriggerValidatorRunnable triggerValidatorRunnable; - public static class PoolState { + public List getTriggerCounterNames(final TezSessionState session) { + if (session instanceof WmTezSession) { + WmTezSession wmTezSession = (WmTezSession) session; + String poolName = wmTezSession.getPoolName(); + PoolState poolState = pools.get(poolName); + if (poolState != null) { + List counterNames = new ArrayList<>(); + List triggers = poolState.getTriggers(); + if (triggers != null) { + for (Trigger trigger : triggers) { + counterNames.add(trigger.getExpression().getCounterLimit().getName()); + } + } + return counterNames; + } + } + return null; + } + + public static class PoolState extends SessionTriggerProvider { // Add stuff here as WM is implemented. private final Object lock = new Object(); private final List sessions = new ArrayList<>(); @@ -91,14 +109,14 @@ private final double finalFraction; private double finalFractionRemaining; private final int queryParallelism; - private final List triggers = new ArrayList<>(); - public PoolState(String fullName, int queryParallelism, double fraction) { + public PoolState(String fullName, int queryParallelism, double fraction, List triggers) { this.fullName = fullName; this.queryParallelism = queryParallelism; // A fair semaphore to ensure correct queue order. this.sessionsClaimed = new Semaphore(queryParallelism, true); this.finalFraction = this.finalFractionRemaining = fraction; + this.triggers = triggers; } @Override @@ -106,10 +124,16 @@ public String toString() { return "[" + fullName + ", query parallelism " + queryParallelism + ", fraction of the cluster " + finalFraction + ", fraction used by child pools " + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size() - + "]"; + + ", triggers " + triggers + "]"; } + @Override + public List getSessions() { + // also casts WmTezSession to TezSessionState + return Collections.unmodifiableList(sessions); + } + @Override public List getTriggers() { return triggers; } @@ -167,11 +191,14 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourc for (int i = 0; i < numSessions; i++) { sessions.addInitialSession(createSession()); } - // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers) - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new TriggerViolationActionHandler(); - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - startTriggerValidator(conf); + final long triggerValidationIntervalMs = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, TimeUnit.MILLISECONDS); + final int maxPoolSize = HiveConf.getIntVar(conf, ConfVars.HIVE_RESOURCE_PLAN_MAX_POOL_SIZE); + final boolean prioritizeKill = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRIGGER_ACTION_PRIORITIZE_KILL); + TriggerActionHandler triggerActionHandler = new TriggerViolationActionHandler(this); + triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(this, triggerActionHandler, + triggerValidationIntervalMs, maxPoolSize, prioritizeKill); + startTriggerValidator(triggerValidationIntervalMs); } private int initializeHivePools(TmpResourcePlan plan) { @@ -203,7 +230,7 @@ private int addHivePool(TmpHivePool pool, PoolState parent) { fraction = parent.finalFraction * pool.getResourceFraction(); parent.finalFractionRemaining -= fraction; } - PoolState state = new PoolState(fullName, totalQueryParallelism, fraction); + PoolState state = new PoolState(fullName, totalQueryParallelism, fraction, pool.getTriggers()); if (pool.getChildren() != null) { for (TmpHivePool child : pool.getChildren()) { totalQueryParallelism += addHivePool(child, state); @@ -333,17 +360,17 @@ protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { return true; } - private void redistributePoolAllocations( - String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove, + void redistributePoolAllocations( + String destPoolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove, boolean releaseParallelism) { List sessionsToUpdate = null; double totalAlloc = 0; - assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); - assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); + assert sessionToAdd == null || destPoolName.equals(sessionToAdd.getPoolName()); + assert sessionToRemove == null || destPoolName.equals(sessionToRemove.getPoolName()); poolsLock.readLock().lock(); boolean hasRemoveFailed = false; try { - PoolState pool = pools.get(poolName); + PoolState pool = pools.get(destPoolName); synchronized (pool.lock) { // This should be a 2nd order fn but it's too much pain in Java for one LOC. if (sessionToAdd != null) { @@ -371,7 +398,6 @@ private void redistributePoolAllocations( poolsLock.readLock().unlock(); } allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); - updateSessionsTriggers(); if (hasRemoveFailed) { throw new AssertionError("Cannot remove the session from the pool and release " + "the query slot; HS2 may fail to accept queries"); @@ -503,7 +529,6 @@ public void registerOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.put(session, null); } - updateSessionsTriggers(); } /** Called by TezSessionPoolSession when closed. */ @@ -512,20 +537,6 @@ public void unregisterOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.remove(session); } - updateSessionsTriggers(); - } - - private void updateSessionsTriggers() { - if (sessionTriggerProvider != null) { - List openSessions = new ArrayList<>(); - List activeTriggers = new ArrayList<>(); - for (PoolState poolState : pools.values()) { - activeTriggers.addAll(poolState.getTriggers()); - openSessions.addAll(poolState.sessions); - } - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); - sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(activeTriggers)); - } } @VisibleForTesting @@ -569,17 +580,7 @@ int getNumSessions() { } @Override - SessionTriggerProvider getSessionTriggerProvider() { - return sessionTriggerProvider; - } - - @Override - TriggerActionHandler getTriggerActionHandler() { - return triggerActionHandler; - } - - @Override - TriggerValidatorRunnable getTriggerValidatorRunnable() { + Runnable getTriggerValidatorRunnable() { return triggerValidatorRunnable; } @@ -592,29 +593,21 @@ protected final HiveConf getConf() { return conf; } - public List getTriggerCounterNames() { - List activeTriggers = sessionTriggerProvider.getActiveTriggers(); - List counterNames = new ArrayList<>(); - for (Trigger trigger : activeTriggers) { - counterNames.add(trigger.getExpression().getCounterLimit().getName()); - } - return counterNames; - } - - // TODO: temporary until real WM schema is created. public static class TmpHivePool { private final String name; private final List children; private final int queryParallelism; private final double resourceFraction; + private final List triggers; - public TmpHivePool(String name, - List children, int queryParallelism, double resourceFraction) { + public TmpHivePool(String name, List children, int queryParallelism, + double resourceFraction, List triggers) { this.name = name; this.children = children; this.queryParallelism = queryParallelism; this.resourceFraction = resourceFraction; + this.triggers = triggers; } public String getName() { @@ -629,6 +622,7 @@ public int getQueryParallelism() { public double getResourceFraction() { return resourceFraction; } + public List getTriggers() {return triggers;} } public static enum TmpUserMappingType { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 408aa2d..addc86e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -16,8 +16,6 @@ package org.apache.hadoop.hive.ql.wm; import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; @@ -26,31 +24,31 @@ * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List openSessions = new ArrayList<>(); - private List activeTriggers = new ArrayList<>(); + protected List sessions = new ArrayList<>(); + protected List triggers = new ArrayList<>(); public SessionTriggerProvider() { } public SessionTriggerProvider(final List openSessions, final List triggers) { - this.openSessions = openSessions; - this.activeTriggers = triggers; + this.sessions = openSessions; + this.triggers = triggers; } - public void setOpenSessions(final List openSessions) { - this.openSessions = openSessions; + public void setSessions(final List openSessions) { + this.sessions = openSessions; } - public void setActiveTriggers(final List activeTriggers) { - this.activeTriggers = activeTriggers; + public void setTriggers(final List activeTriggers) { + this.triggers = activeTriggers; } - public List getOpenSessions() { - return Collections.unmodifiableList(openSessions); + public List getSessions() { + return sessions; } - public List getActiveTriggers() { - return Collections.unmodifiableList(activeTriggers); + public List getTriggers() { + return triggers; } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 258a865..44de9da 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -115,7 +115,7 @@ public WorkloadManagerForTest(String yarnQueue, HiveConf conf, private static TmpResourcePlan createDummyPlan(int numSessions) { return new TmpResourcePlan( - Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f)), + Lists.newArrayList(new TmpHivePool("llap", null, numSessions, 1.0f, null)), Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0))); } @@ -227,9 +227,9 @@ public void testClusterFractions() throws Exception { HiveConf conf = createConf(); MockQam qam = new MockQam(); List l2 = Lists.newArrayList( - new TmpHivePool("p1", null, 1, 0.5f), new TmpHivePool("p2", null, 2, 0.3f)); + new TmpHivePool("p1", null, 1, 0.5f, null), new TmpHivePool("p2", null, 2, 0.3f, null)); TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( - new TmpHivePool("r1", l2, 1, 0.6f), new TmpHivePool("r2", null, 1, 0.4f)), + new TmpHivePool("r1", l2, 1, 0.6f, null), new TmpHivePool("r2", null, 1, 0.4f, null)), Lists.newArrayList(create("p1", "r1/p1"), create("p2", "r1/p2"), create("r1", "r1"), create("r2", "r2"))); WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); @@ -259,7 +259,7 @@ public void testQueueing() throws Exception { final HiveConf conf = createConf(); MockQam qam = new MockQam(); TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( - new TmpHivePool("A", null, 2, 0.5f), new TmpHivePool("B", null, 2, 0.5f)), + new TmpHivePool("A", null, 2, 0.5f, null), new TmpHivePool("B", null, 2, 0.5f, null)), Lists.newArrayList(create("A", "A"), create("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); @@ -367,7 +367,7 @@ public void testReuseWithDifferentPool() throws Exception { final HiveConf conf = createConf(); MockQam qam = new MockQam(); TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList( - new TmpHivePool("A", null, 2, 0.6f), new TmpHivePool("B", null, 1, 0.4f)), + new TmpHivePool("A", null, 2, 0.6f, null), new TmpHivePool("B", null, 1, 0.4f, null)), Lists.newArrayList(create("A", "A"), create("B", "B"))); final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan); wm.start(); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2c4fe7f..448e818 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -187,7 +187,7 @@ public void run() { String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); if (wmQueue != null && !wmQueue.isEmpty()) { wm = WorkloadManager.create(wmQueue, hiveConf, new TmpResourcePlan( - Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f)), + Lists.newArrayList(new TmpHivePool("llap", null, 1, 1.0f, null)), Lists.newArrayList(new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 0)))); } else { wm = null;