diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java new file mode 100644 index 0000000..263d954 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersNoTezSessionPool.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.jdbc; + +import java.io.File; +import java.net.URL; +import java.sql.Connection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType; +import org.junit.BeforeClass; + +public class TestTriggersNoTezSessionPool extends TestTriggersTezSessionPoolManager { + @BeforeClass + public static void beforeTest() throws Exception { + Class.forName(MiniHS2.getJdbcDriverName()); + + String confDir = "../../data/conf/llap/"; + HiveConf.setHiveSiteLocation(new URL("file://" + new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + + conf = new HiveConf(); + conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES, "default"); + // disable session pool + conf.setBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS, false); + conf.setBoolVar(ConfVars.TEZ_EXEC_SUMMARY, true); + conf.setBoolVar(ConfVars.HIVE_STRICT_CHECKS_CARTESIAN, false); + // don't want cache hits from llap io for testing filesystem bytes read counters + conf.setVar(ConfVars.LLAP_IO_MEMORY_MODE, "none"); + + conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + + "/tez-site.xml")); + + miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); + dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); + kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + + Map confOverlay = new HashMap<>(); + miniHS2.start(confOverlay); + miniHS2.getDFS().getFileSystem().mkdirs(new Path("/apps_staging_dir/anonymous")); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index b67c933..1810905 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 9b4714f..13b15e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -81,7 +81,8 @@ private static TezSessionPoolManager instance = null; /** This is used to close non-default sessions, and also all sessions when stopping. */ - private final List openSessions = new LinkedList<>(); + private final List openPoolSessions = new LinkedList<>(); + private final List openNonPoolSessions = new LinkedList<>(); private MetastoreGlobalTriggersFetcher globalTriggersFetcher; private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; @@ -129,13 +130,7 @@ public void setupPool(HiveConf conf) throws Exception { numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - sessionTriggerProvider = new SessionTriggerProvider(); - triggerActionHandler = new KillTriggerActionHandler(); - // TODO: update runnable to handle per pool validation - triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); - Hive db = Hive.get(conf); - globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); - startTriggerValidator(conf); + initTriggers(conf); String queueAllowedStr = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); @@ -175,6 +170,20 @@ public void setupPool(HiveConf conf) throws Exception { } } + public void initTriggers(final HiveConf conf) throws HiveException { + if (triggerValidatorRunnable == null) { + sessionTriggerProvider = new SessionTriggerProvider(); + triggerActionHandler = new KillTriggerActionHandler(); + triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler()); + startTriggerValidator(conf); + } + + if (globalTriggersFetcher == null) { + Hive db = Hive.get(conf); + globalTriggersFetcher = new MetastoreGlobalTriggersFetcher(db); + } + } + // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration private TezSessionPoolSession createAndInitSession( String queue, boolean isDefault, HiveConf conf) { @@ -288,6 +297,8 @@ void returnSession(TezSessionState tezSessionState) throws Exception { + " belongs to the pool. Put it back in"); defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState); } + openNonPoolSessions.remove(tezSessionState); + updateSessionsTriggers(); // non default session nothing changes. The user can continue to use the existing // session in the SessionState } finally { @@ -311,8 +322,8 @@ public void stop() throws Exception { return; } List sessionsToClose = null; - synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + synchronized (openPoolSessions) { + sessionsToClose = new ArrayList(openPoolSessions); } // we can just stop all the sessions @@ -417,6 +428,8 @@ public TezSessionState getSession( if (canWorkWithSameSession(session, conf)) { session.setLegacyLlapMode(llap); + openNonPoolSessions.add(session); + updateSessionsTriggers(); return session; } @@ -426,6 +439,8 @@ public TezSessionState getSession( session = getSession(conf, doOpen); session.setLegacyLlapMode(llap); + openNonPoolSessions.add(session); + updateSessionsTriggers(); return session; } @@ -456,8 +471,8 @@ public TezSessionState reopen(TezSessionState sessionState, public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { List sessionsToClose = null; - synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + synchronized (openPoolSessions) { + sessionsToClose = new ArrayList(openPoolSessions); } for (TezSessionPoolSession sessionState : sessionsToClose) { System.err.println("Shutting down tez session."); @@ -480,15 +495,18 @@ public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws E /** Called by TezSessionPoolSession when opened. */ @Override public void registerOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { - openSessions.add(session); + synchronized (openPoolSessions) { + openPoolSessions.add(session); } updateSessionsTriggers(); } private void updateSessionsTriggers() { if (sessionTriggerProvider != null && globalTriggersFetcher != null) { - sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions)); + List sessions = new ArrayList<>(); + sessions.addAll(openNonPoolSessions); + sessions.addAll(openPoolSessions); + sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(sessions)); sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(globalTriggersFetcher.fetch())); } } @@ -499,8 +517,8 @@ public void unregisterOpenSession(TezSessionPoolSession session) { if (LOG.isDebugEnabled()) { LOG.debug("Closed a pool session [" + this + "]"); } - synchronized (openSessions) { - openSessions.remove(session); + synchronized (openPoolSessions) { + openPoolSessions.remove(session); } updateSessionsTriggers(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a1b7cfb..73ec21f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -165,6 +165,7 @@ public int execute(DriverContext driverContext) { desiredCounters.addAll(wm.getTriggerCounterNames()); } else { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); + pm.initTriggers(conf); session = pm.getSession(session, conf, false, getWork().getLlapMode()); desiredCounters.addAll(pm.getTriggerCounterNames()); }