diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java new file mode 100644 index 0000000..68d57ca --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/AbstractJdbcTriggersTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.LlapBaseInputFormat; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +public abstract class AbstractJdbcTriggersTest { + protected static MiniHS2 miniHS2 = null; + protected static String dataFileDir; + static Path kvDataFilePath; + protected static String tableName = "testtab1"; + + protected static HiveConf conf = null; + protected Connection hs2Conn = null; + + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } + + @Before + public void setUp() throws Exception { + hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + } + + @After + public void tearDown() throws Exception { + LlapBaseInputFormat.closeAll(); + hs2Conn.close(); + } + + @AfterClass + public static void afterTest() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + } + + private void createSleepUDF() throws SQLException { + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Connection con = hs2Conn; + Statement stmt = con.createStatement(); + stmt.execute("create temporary function sleep as '" + udfName + "'"); + stmt.close(); + } + + protected void runQueryWithTrigger(final String query, final List setCmds, + final String expect) + throws Exception { + + Connection con = hs2Conn; + TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); + createSleepUDF(); + + final Statement selStmt = con.createStatement(); + final Throwable[] throwable = new Throwable[1]; + Thread queryThread = new Thread(() -> { + try { + if (setCmds != null) { + for (String setCmd : setCmds) { + selStmt.execute(setCmd); + } + } + selStmt.execute(query); + } catch (SQLException e) { + throwable[0] = e; + } + }); + queryThread.start(); + + queryThread.join(); + selStmt.close(); + + if (expect == null) { + assertNull("Expected query to succeed", throwable[0]); + } else { + assertNotNull("Expected non-null throwable", throwable[0]); + assertEquals(SQLException.class, throwable[0].getClass()); + assertTrue(expect + " is not contained in " + throwable[0].getMessage(), + throwable[0].getMessage().contains(expect)); + } + } + + abstract void setupTriggers(final List triggers) throws Exception; + + protected List getConfigs(String... more) { + List setCmds = new ArrayList<>(); + setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); + setCmds.add("set mapred.min.split.size=100"); + setCmds.add("set mapred.max.split.size=100"); + setCmds.add("set tez.grouping.min-size=100"); + setCmds.add("set tez.grouping.max-size=100"); + if (more != null) { + setCmds.addAll(Arrays.asList(more)); + } + return setCmds; + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java new file mode 100644 index 0000000..fb3af932 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; +import org.apache.hadoop.hive.ql.wm.Expression; +import org.apache.hadoop.hive.ql.wm.ExpressionFactory; +import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; +import org.apache.hadoop.hive.ql.wm.Trigger; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestTriggersNoTezSessionPool extends AbstractJdbcTriggersTest { + + @Test(timeout = 60000) + public void testTriggerSlowQueryExecutionTime() throws Exception { + Expression expression = ExpressionFactory.fromString("EXECUTION_TIME > 1000"); + Trigger trigger = new ExecutionTrigger("slow_query", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, null, trigger + " violated"); + } + + @Test(timeout = 60000) + public void testTriggerTotalTasks() throws Exception { + Expression expression = ExpressionFactory.fromString("TOTAL_TASKS > 50"); + Trigger trigger = new ExecutionTrigger("highly_parallel", expression, Trigger.Action.KILL_QUERY); + setupTriggers(Lists.newArrayList(trigger)); + String query = "select sleep(t1.under_col, 5), t1.value from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col>=t2.under_col"; + runQueryWithTrigger(query, getConfigs(), trigger + " violated"); + } + + @Override + void setupTriggers(final List triggers) throws Exception { + MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); + when(triggersFetcher.fetch()).thenReturn(triggers); + TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); + } +} \ No newline at end of file diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java index aeca663..f5c8750 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersTezSessionPoolManager.java @@ -16,102 +16,22 @@ package org.apache.hive.jdbc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.File; -import java.net.URL; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.LlapBaseInputFormat; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.Expression; import org.apache.hadoop.hive.ql.wm.ExpressionFactory; import org.apache.hadoop.hive.ql.wm.MetastoreGlobalTriggersFetcher; import org.apache.hadoop.hive.ql.wm.Trigger; -import org.apache.hive.jdbc.miniHS2.MiniHS2; -import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.google.common.collect.Lists; -public class TestTriggersTezSessionPoolManager { - protected static MiniHS2 miniHS2 = null; - protected static String dataFileDir; - static Path kvDataFilePath; - private static String tableName = "testtab1"; - - protected static HiveConf conf = null; - protected Connection hs2Conn = null; - - @BeforeClass - public static void beforeTest() throws Exception { - Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); - - conf = new HiveConf(); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); - conf.setTimeVar(ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL_MS, 100, TimeUnit.MILLISECONDS); - conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, true); - conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); - conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); - // don't want cache hits from llap io for testing filesystem bytes read counters - conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); - kvDataFilePath = new Path(dataFileDir, "kv1.txt"); - - Map confOverlay = new HashMap<>(); - miniHS2.start(confOverlay); - miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); - } - - @Before - public void setUp() throws Exception { - hs2Conn = TestJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); - } - - @After - public void tearDown() throws Exception { - LlapBaseInputFormat.closeAll(); - hs2Conn.close(); - } - - @AfterClass - public static void afterTest() throws Exception { - if (miniHS2.isStarted()) { - miniHS2.stop(); - } - } +public class TestTriggersTezSessionPoolManager extends AbstractJdbcTriggersTest { @Test(timeout = 60000) public void testTriggerSlowQueryElapsedTime() throws Exception { @@ -316,67 +236,10 @@ public void testMultipleTriggers2() throws Exception { runQueryWithTrigger(query, null, shuffleTrigger + " violated"); } - private void createSleepUDF() throws SQLException { - String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); - Connection con = hs2Conn; - Statement stmt = con.createStatement(); - stmt.execute("create temporary function sleep as '" + udfName + "'"); - stmt.close(); - } - - private void runQueryWithTrigger(final String query, final List setCmds, - final String expect) - throws Exception { - - Connection con = hs2Conn; - TestJdbcWithMiniLlap.createTestTable(con, null, tableName, kvDataFilePath.toString()); - createSleepUDF(); - - final Statement selStmt = con.createStatement(); - final Throwable[] throwable = new Throwable[1]; - Thread queryThread = new Thread(() -> { - try { - if (setCmds != null) { - for (String setCmd : setCmds) { - selStmt.execute(setCmd); - } - } - selStmt.execute(query); - } catch (SQLException e) { - throwable[0] = e; - } - }); - queryThread.start(); - - queryThread.join(); - selStmt.close(); - - if (expect == null) { - assertNull("Expected query to succeed", throwable[0]); - } else { - assertNotNull("Expected non-null throwable", throwable[0]); - assertEquals(SQLException.class, throwable[0].getClass()); - assertTrue(expect + " is not contained in " + throwable[0].getMessage(), - throwable[0].getMessage().contains(expect)); - } - } - + @Override protected void setupTriggers(final List triggers) throws Exception { MetastoreGlobalTriggersFetcher triggersFetcher = mock(MetastoreGlobalTriggersFetcher.class); when(triggersFetcher.fetch()).thenReturn(triggers); TezSessionPoolManager.getInstance().setGlobalTriggersFetcher(triggersFetcher); } - - private List getConfigs(String... more) { - List setCmds = new ArrayList<>(); - setCmds.add("set hive.exec.dynamic.partition.mode=nonstrict"); - setCmds.add("set mapred.min.split.size=100"); - setCmds.add("set mapred.max.split.size=100"); - setCmds.add("set tez.grouping.min-size=100"); - setCmds.add("set tez.grouping.max-size=100"); - if (more != null) { - setCmds.addAll(Arrays.asList(more)); - } - return setCmds; - } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index b67c933..1810905 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 9b4714f..f27c689 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -81,7 +81,7 @@ private static TezSessionPoolManager instance = null; /** This is used to close non-default sessions, and also all sessions when stopping. */ - private final List openSessions = new LinkedList<>(); + private final List openSessions = new LinkedList<>(); private MetastoreGlobalTriggersFetcher globalTriggersFetcher; private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; @@ -129,13 +129,7 @@ public void setupPool(HiveConf conf) throws Exception { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new KillTriggerActionHandler(); - // TODO: update runnable to handle per pool validation - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - Hive db = Hive.get(conf); - globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - startTriggerValidator(conf); + initTriggers(conf); String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); @@ -175,6 +169,20 @@ public void setupPool(HiveConf conf) throws Exception { } } + public void initTriggers(final HiveConf conf) throws HiveException { + if (globalTriggersFetcher == null) { + Hive db = Hive.get(conf); + globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); + } + + if (triggerValidatorRunnable == null) { + sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch()); + triggerActionHandler = new KillTriggerActionHandler(); + triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + startTriggerValidator(conf); + } + } + // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration private TezSessionPoolSession createAndInitSession( String queue, boolean isDefault, HiveConf conf) { @@ -310,9 +318,9 @@ public void stop() throws Exception { if ((instance == null) || !this.hasInitialSessions) { return; } - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList(openSessions); } // we can just stop all the sessions @@ -455,11 +463,11 @@ public TezSessionState reopen(TezSessionState sessionState, } public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList(openSessions); } - for (TezSessionPoolSession sessionState : sessionsToClose) { + for (TezSessionState sessionState : sessionsToClose) { System.err.println("Shutting down tez session."); closeIfNotDefault(sessionState, keepTmpDir); } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2c4fe7f..6b99015 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -135,8 +135,10 @@ public synchronized void init(HiveConf hiveConf) { MetricsFactory.init(hiveConf); } + // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + tezSessionPoolManager.initTriggers(hiveConf); if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - tezSessionPoolManager = TezSessionPoolManager.getInstance(); tezSessionPoolManager.setupPool(hiveConf); } } catch (Throwable t) {