diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java new file mode 100644 index 0000000..263d954 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.BeforeClass; + +public class TestTriggersNoTezSessionPool extends TestTriggersTezSessionPoolManager { + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + // disable session pool + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, false); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index b67c933..1810905 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 9b4714f..f27c689 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -81,7 +81,7 @@ private static TezSessionPoolManager instance = null; /** This is used to close non-default sessions, and also all sessions when stopping. */ - private final List openSessions = new LinkedList<>(); + private final List openSessions = new LinkedList<>(); private MetastoreGlobalTriggersFetcher globalTriggersFetcher; private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; @@ -129,13 +129,7 @@ public void setupPool(HiveConf conf) throws Exception { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new KillTriggerActionHandler(); - // TODO: update runnable to handle per pool validation - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - Hive db = Hive.get(conf); - globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - startTriggerValidator(conf); + initTriggers(conf); String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); @@ -175,6 +169,20 @@ public void setupPool(HiveConf conf) throws Exception { } } + public void initTriggers(final HiveConf conf) throws HiveException { + if (globalTriggersFetcher == null) { + Hive db = Hive.get(conf); + globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); + } + + if (triggerValidatorRunnable == null) { + sessionTriggerProvider = new SessionTriggerProvider(openSessions, globalTriggersFetcher.fetch()); + triggerActionHandler = new KillTriggerActionHandler(); + triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + startTriggerValidator(conf); + } + } + // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration private TezSessionPoolSession createAndInitSession( String queue, boolean isDefault, HiveConf conf) { @@ -310,9 +318,9 @@ public void stop() throws Exception { if ((instance == null) || !this.hasInitialSessions) { return; } - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList(openSessions); } // we can just stop all the sessions @@ -455,11 +463,11 @@ public TezSessionState reopen(TezSessionState sessionState, } public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList(openSessions); } - for (TezSessionPoolSession sessionState : sessionsToClose) { + for (TezSessionState sessionState : sessionsToClose) { System.err.println("Shutting down tez session."); closeIfNotDefault(sessionState, keepTmpDir); } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2c4fe7f..6b99015 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -135,8 +135,10 @@ public synchronized void init(HiveConf hiveConf) { MetricsFactory.init(hiveConf); } + // will be invoked anyway in TezTask. Doing it early to initialize triggers for non-pool tez session. + tezSessionPoolManager = TezSessionPoolManager.getInstance(); + tezSessionPoolManager.initTriggers(hiveConf); if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - tezSessionPoolManager = TezSessionPoolManager.getInstance(); tezSessionPoolManager.setupPool(hiveConf); } } catch (Throwable t) {