diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index aa58d7445c..91511b91bb 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3167,6 +3167,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "true", new StringSet("true", "false", "ignore"), "Whether Tez session pool should allow submitting queries to custom queues. The options\n" + "are true, false (error out), ignore (accept the query but ignore the queue setting)."), + HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS("hive.server2.use.external.sessions", false, + "This flag is used in HiveServer2 to use externally started tez sessions"), + HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE("hive.server2.tez.external.sessions.namespace", "", + "ZK namespace to use for tez external sessions"), + HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS("hive.server2.tez.external.sessions.wait.max.attempts", + 60, "Number of attempts before giving up waiting for external sessions (each attempt is 1 sec long)"), // Operation log configuration HIVE_SERVER2_LOGGING_OPERATION_ENABLED("hive.server2.logging.operation.enabled", true, diff --git itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index bfa3d5d7d2..20c673bfdf 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -95,6 +95,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton; import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager; @@ -1287,7 +1288,7 @@ private void restartSessions(boolean canReuseSession, CliSessionState ss, Sessio if (oldSs != null && canReuseSession && clusterType.getCoreClusterType() == CoreClusterType.TEZ) { // Copy the tezSessionState from the old CliSessionState. - TezSessionState tezSessionState = oldSs.getTezSession(); + TezSession tezSessionState = oldSs.getTezSession(); oldSs.setTezSession(null); ss.setTezSession(tezSessionState); oldSs.close(); diff --git ql/pom.xml ql/pom.xml index a55cbe380d..da7521ff88 100644 --- ql/pom.xml +++ ql/pom.xml @@ -250,7 +250,6 @@ org.apache.hadoop hadoop-yarn-api ${hadoop.version} - true org.apache.hadoop diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java new file mode 100644 index 0000000000..08b203c390 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AbstractTriggerValidator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public abstract class AbstractTriggerValidator { + private ScheduledExecutorService scheduledExecutorService = null; + abstract Runnable getTriggerValidatorRunnable(); + + void startTriggerValidator(long triggerValidationIntervalMs) { + if (scheduledExecutorService == null) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); + Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); + scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, + triggerValidationIntervalMs, TimeUnit.MILLISECONDS); + TezSessionPoolSession.LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); + } + } + + void stopTriggerValidator() { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + scheduledExecutorService = null; + TezSessionPoolSession.LOG.info("Stopped trigger validator"); + } + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java index f357775c86..c2fee51786 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java @@ -28,22 +28,22 @@ /** * Handles only Kill Action. */ -public class KillTriggerActionHandler implements TriggerActionHandler { +public class KillTriggerActionHandler implements TriggerActionHandler { private static final Logger LOG = LoggerFactory.getLogger(KillTriggerActionHandler.class); @Override - public void applyAction(final Map queriesViolated) { - for (Map.Entry entry : queriesViolated.entrySet()) { + public void applyAction(final Map queriesViolated) { + for (Map.Entry entry : queriesViolated.entrySet()) { switch (entry.getValue().getAction().getType()) { case KILL_QUERY: - TezSessionState sessionState = entry.getKey(); + TezSession sessionState = entry.getKey(); String queryId = sessionState.getWmContext().getQueryId(); try { KillQuery killQuery = sessionState.getKillQuery(); // if kill query is null then session might have been released to pool or closed already if (killQuery != null) { - sessionState.getKillQuery().killQuery(queryId, entry.getValue().getViolationMsg(), - sessionState.getConf()); + sessionState.getKillQuery().killQuery( + queryId, entry.getValue().getViolationMsg(), sessionState.getConf()); } } catch (HiveException e) { LOG.warn("Unable to kill query {} for trigger violation"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java new file mode 100644 index 0000000000..4a59d6a10f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.mapreduce.hadoop.MRHelpers; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; + +public class TezExternalSessionState extends TezSessionState { + private String externalAppId; + private boolean isDestroying = false; + private final TezExternalSessionsRegistryClient registry; + + public TezExternalSessionState( + DagUtils utils, HiveConf conf, TezExternalSessionsRegistryClient registry) { + super(utils, conf); + this.registry = registry; + } + + public TezExternalSessionState(String sessionId, HiveConf conf, + TezExternalSessionsRegistryClient registry) { + super(sessionId, conf); + this.registry = registry; + } + + @Override + public void ensureLocalResources(Configuration conf, + String[] newFilesNotFromConf) throws IOException, LoginException, + URISyntaxException, TezException { + return; // A no-op for an external session. + } + + @Override + protected void openInternal(String[] additionalFilesNotFromConf, + boolean isAsync, LogHelper console, HiveResources resources, boolean isPoolInit) + throws IOException, LoginException, URISyntaxException, TezException { + initQueueAndUser(); + + // TODO: is the resource stuff really needed for external? + appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf)); + Map commonLocalResources = new HashMap<>(); + boolean llapMode = addLlapJarsIfNeeded(commonLocalResources); + + Map amEnv = new HashMap(); + MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); + + TezConfiguration tezConfig = createTezConfig(); + ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig); + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + + final TezClient session = TezClient.newBuilder("HIVE-" + getSessionId(), tezConfig) + .setIsSession(true).setLocalResources(commonLocalResources) + .setCredentials(llapCredentials).setServicePluginDescriptor(spd) + .build(); + + LOG.info("Opening new Tez Session (id: " + getSessionId() + ")"); + TezJobMonitor.initShutdownHook(); + + // External sessions doesn't support async mode (getClient should be much cheaper than open, + // and the async mode is anyway only used for CLI). + if (isAsync) { + LOG.info("Ignoring the async argument for an external session {}", getSessionId()); + } + try { + externalAppId = registry.getSession(isPoolInit); + } catch (TezException | LoginException | IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + + // TODO## blocked on Tez release; session.getClient(ApplicationId.fromString(externalAppId)); + LOG.info("Started an external session; client name {}, app ID {}", + session.getClientName(), externalAppId); + setTezClient(session); + } + + @Override + public void close(boolean keepDagFilesDir) throws Exception { + // We never close external sessions that don't have errors. + if (externalAppId != null) { + registry.returnSession(externalAppId); + } + externalAppId = null; + if (isDestroying) { + super.close(keepDagFilesDir); + } + } + + public TezSession reopen() throws Exception { + isDestroying = true; + // Reopen will actually close this session, and get a new external app. + // It could instead somehow communicate to the external manager that the session is bad. + return super.reopen(); + } + + public void destroy() throws Exception { + isDestroying = true; + // This will actually close the session. We assume the external manager will restart it. + // It could instead somehow communicate to the external manager that the session is bad. + super.destroy(); + } + + @Override + public KillQuery getKillQuery() { + // TODO: should this return something custom for external sessions? + return super.getKillQuery(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionsRegistryClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionsRegistryClient.java new file mode 100644 index 0000000000..0412643ec1 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionsRegistryClient.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +// TODO: tez should provide this registry +public class TezExternalSessionsRegistryClient { + private static final Logger LOG = LoggerFactory.getLogger(TezExternalSessionsRegistryClient.class); + + // TODO: internal only for now to start and reconnect to external session + private static final String ZK_PATH = "/tez_am/server"; + private final HiveConf initConf; + private final HashSet available = new HashSet<>(), taken = new HashSet<>(); + private final Object lock = new Object(); + private final int maxAttempts; + + private PathChildrenCache cache; + private boolean isInitialized; + + + public TezExternalSessionsRegistryClient(final HiveConf initConf) { + this.initConf = initConf; + this.maxAttempts = HiveConf.getIntVar(initConf, + ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS); + } + + public void close() { + if (cache != null) { + try { + cache.close(); + } catch (IOException e) { + LOG.error("Failed to close the cache: {}", e.getMessage()); + LOG.debug("Failed to close the cache", e); + } + } + } + + private void init() throws Exception { + String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + String zkNamespace = HiveConf.getVar(initConf, + ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = zkNamespace + ZK_PATH; + CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, + new ExponentialBackoffRetry(1000, 3)); + synchronized (lock) { + this.cache = new PathChildrenCache(client, effectivePath, true); + client.start(); + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + for (ChildData childData : cache.getCurrentData()) { + available.add(getApplicationId(childData)); + } + cache.getListenable().addListener(new ExternalSessionsPathListener()); + LOG.info("Initial external sessions: {}", available); + isInitialized = true; + } + } + + private static String getApplicationId(final ChildData childData) { + return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + } + + private class ExternalSessionsPathListener implements PathChildrenCacheListener { + + @Override + public void childEvent(final CuratorFramework client, + final PathChildrenCacheEvent event) throws Exception { + Preconditions.checkArgument(client != null + && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); + ChildData childData = event.getData(); + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + LOG.info("{} for external session {}", event.getType(), applicationId); + + synchronized (lock) { + switch (event.getType()) { + case CHILD_UPDATED: + case CHILD_ADDED: + if (available.contains(applicationId) || taken.contains(applicationId)) { + return; // We do not expect updates to existing sessions; ignore them for now. + } + available.add(applicationId); + break; + case CHILD_REMOVED: + if (taken.remove(applicationId)) { + LOG.warn("The session in use has disappeared from the registry ({})", applicationId); + } else if (!available.remove(applicationId)) { + LOG.warn("An unknown session has been removed ({})", applicationId); + } + break; + default: + // Ignore all the other events; logged above. + } + } + } + } + + public String getSession(boolean isLimitedWait) throws Exception, InterruptedException { + synchronized (lock) { + if (!isInitialized) { + init(); + } + long endTimeNs = System.nanoTime() + (1000000000 * maxAttempts); + while (available.isEmpty() && (!isLimitedWait || ((endTimeNs - System.nanoTime()) > 0))) { + lock.wait(1000L); + } + Iterator iter = available.iterator(); + if (!iter.hasNext()) { + assert isLimitedWait; + throw new IOException("Cannot get a session after " + maxAttempts + " attempts"); + } + String appId = iter.next(); + iter.remove(); + taken.add(appId); + return appId; + } + } + + public void returnSession(String appId) { + synchronized (lock) { + if (!isInitialized) { + throw new AssertionError("Not initialized"); + } + if (!taken.remove(appId)) { + return; // Session has been removed from ZK. + } + available.add(appId); + lock.notifyAll(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java new file mode 100644 index 0000000000..3223a29091 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; + +import javax.security.auth.login.LoginException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources; +import org.apache.hadoop.hive.ql.session.KillQuery; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.WmContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezException; + +/** + * A bogus interface that basically describes the evolved usage patterns of TezSessionStateImpl. + * Needed due to lack of multiple inheritance in Java; probably good to have, too - may make + * TezSessionState interface a little bit clearer or even encourage some future cleanup. + * + * It's implemented in two ways - core implementations (regular session, external session), + * and extra functionality implementation (pool session, WM session, etc.) that wraps an instance + * of the core implementation (i.e. use composition). With MI, each session type would just inherit + * from one of each. + */ +public interface TezSession { + public static final class HiveResources { + public HiveResources(Path dagResourcesDir) { + this.dagResourcesDir = dagResourcesDir; + } + /** A directory that will contain resources related to DAGs and specified in configs. */ + public final Path dagResourcesDir; + public final Map additionalFilesNotFromConf = new HashMap(); + /** Localized resources of this session; both from conf and not from conf (above). */ + public final Set localizedResources = new HashSet<>(); + + @Override + public String toString() { + return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, " + + localizedResources.size() + " localized resources"; + } + } + + // Core session operations. + void open() throws IOException, LoginException, URISyntaxException, TezException; + void open(boolean isPoolInit) throws IOException, LoginException, URISyntaxException, TezException; + void open(HiveResources resources) throws LoginException, IOException, URISyntaxException, TezException; + void open(String[] additionalFilesNotFromConf) throws IOException, LoginException, URISyntaxException, TezException; + void beginOpen(String[] additionalFiles, LogHelper console) + throws IOException, LoginException, URISyntaxException, TezException; + void endOpen() throws InterruptedException, CancellationException; + TezSession reopen() throws Exception; + void destroy() throws Exception; + void close(boolean keepTmpDir) throws Exception; + void returnToSessionManager() throws Exception; + + /** This is called during open and update (i.e. internally and externally) to localize conf resources. */ + void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) + throws IOException, LoginException, URISyntaxException, TezException; + HiveResources extractHiveResources(); + Path replaceHiveResources(HiveResources resources, boolean isAsync); + + List getLocalizedResources(); + LocalResource getAppJarLr(); + + HiveConf getConf(); + TezClient getTezClient(); + boolean isOpen(); + boolean isOpening(); + boolean getDoAsEnabled(); + String getSessionId(); + String getUser(); + WmContext getWmContext(); // Necessary for triggers, even for non-WM sessions. + void setWmContext(WmContext ctx); + void setQueueName(String queueName); + String getQueueName(); + void setDefault(); + boolean isDefault(); + boolean getLegacyLlapMode(); + void setLegacyLlapMode(boolean b); + void unsetOwnerThread(); + void setOwnerThread(); + KillQuery getKillQuery(); + void setKillQuery(KillQuery kq); +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 89954cba67..5128e184cb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -17,19 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.FutureTask; @@ -37,7 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.session.SessionState; @@ -48,6 +39,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.SettableFuture; + /** * Distinct from TezSessionPool manager in that it implements a session pool, and nothing else. */ @@ -107,8 +105,13 @@ void start() throws Exception { this.parentSessionState = SessionState.get(); if (initialSize == 0) return; // May be resized later. - int threadCount = Math.min(initialSize, + int threadCount = 1; + if (!HiveConf.getBoolVar(initConf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) { + // Don't use multiple threads for external sessions. + threadCount = Math.min(initialSize, HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); + } + Preconditions.checkArgument(threadCount > 0); if (threadCount == 1) { for (int i = 0; i < initialSize; ++i) { @@ -294,7 +297,7 @@ private void startInitialSession(SessionType session) throws Exception { if (!isUsable) throw new IOException(session + " is not usable at pool startup"); session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName()); configureAmRegistry(session); - session.open(); + session.open(true); if (session.stopUsing()) { if (!putSessionBack(session, false)) { LOG.warn("Couldn't add a session during initialization"); @@ -481,7 +484,7 @@ int getCurrentSize() { /** * Should be called when the session is no longer needed, to remove it from bySessionId. */ - public void notifyClosed(TezSessionState session) { + public void notifyClosed(SessionType session) { bySessionId.remove(session.getSessionId()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 2633390861..ab672c6545 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; +import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources; import java.util.ArrayList; import java.util.Collections; @@ -52,7 +52,7 @@ * In case the user specifies a queue explicitly, a new session is created * on that queue and assigned to the session state. */ -public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTriggerValidator +public class TezSessionPoolManager extends AbstractTriggerValidator implements Manager, SessionExpirationTracker.RestartImpl { private enum CustomQueueAllowed { @@ -82,12 +82,14 @@ private static TezSessionPoolManager instance = null; /** This is used to close non-default sessions, and also all sessions when stopping. */ - private final List openSessions = new LinkedList<>(); + private final List openSessions = new LinkedList<>(); private SessionTriggerProvider sessionTriggerProvider; private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; private YarnQueueHelper yarnQueueChecker; + private TezExternalSessionsRegistryClient externalSessions = null; + /** Note: this is not thread-safe. */ public static TezSessionPoolManager getInstance() { TezSessionPoolManager local = instance; @@ -193,6 +195,10 @@ public void setupNonPool(HiveConf conf) { this.yarnQueueChecker = new YarnQueueHelper(conf); } + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) { + externalSessions = new TezExternalSessionsRegistryClient(conf); + } + restrictedConfig = new RestrictedConfigChecker(conf); } @@ -225,7 +231,7 @@ private TezSessionPoolSession createAndInitSession( return sessionState; } - private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Exception { + private TezSession getSession(HiveConf conf, boolean doOpen) throws Exception { // NOTE: this can be called outside of HS2, without calling setupPool. Basically it should be // able to handle not being initialized. Perhaps we should get rid of the instance and // move the setupPool code to ctor. For now, at least hasInitialSessions will be false. @@ -298,7 +304,7 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti * @return * @throws Exception */ - private TezSessionState getNewSessionState(HiveConf conf, + private TezSession getNewSessionState(HiveConf conf, String queueName, boolean doOpen) throws Exception { TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false, conf); if (queueName != null) { @@ -317,7 +323,7 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception { returnSession(session); } - void returnSession(TezSessionState tezSessionState) throws Exception { + void returnSession(TezSession tezSessionState) throws Exception { // Ignore the interrupt status while returning the session, but set it back // on the thread in case anything else needs to deal with it. boolean isInterrupted = Thread.interrupted(); @@ -347,7 +353,7 @@ void returnSession(TezSessionState tezSessionState) throws Exception { } public static void closeIfNotDefault( - TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { + TezSession tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session if not default: " + tezSessionState); if (!tezSessionState.isDefault()) { tezSessionState.close(keepTmpDir); @@ -358,13 +364,13 @@ public void stop() throws Exception { if ((instance == null) || !this.hasInitialSessions) { return; } - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList(openSessions); } // we can just stop all the sessions - for (TezSessionState sessionState : sessionsToClose) { + for (TezSession sessionState : sessionsToClose) { if (sessionState.isDefault()) { sessionState.close(false); } @@ -390,7 +396,7 @@ public void stop() throws Exception { * @throws Exception */ @Override - public void destroy(TezSessionState tezSessionState) throws Exception { + public void destroy(TezSession tezSessionState) throws Exception { LOG.warn("We are closing a " + (tezSessionState.isDefault() ? "default" : "non-default") + " session because of retry failure."); tezSessionState.close(false); @@ -402,7 +408,13 @@ TriggerValidatorRunnable getTriggerValidatorRunnable() { } protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { - return new TezSessionPoolSession(sessionId, this, expirationTracker, conf); + TezSessionState base = null; + if (externalSessions != null) { + base = new TezExternalSessionState(sessionId, conf, externalSessions); + } else { + base = new TezSessionState(sessionId, conf); + } + return new TezSessionPoolSession(this, expirationTracker, base); } /* @@ -411,7 +423,7 @@ protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { * sessions for e.g. when a CLI session is started. The CLI session could re-use the * same tez session eliminating the latencies of new AM and containers. */ - private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) + private static boolean canWorkWithSameSession(TezSession session, HiveConf conf) throws HiveException { if (session == null || conf == null || !session.isOpen()) { return false; @@ -437,7 +449,8 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); // either variables will never be null because a default value is returned in case of absence - if (doAsEnabled != session.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + if (doAsEnabled != session.getConf().getBoolVar( + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { return false; } @@ -448,12 +461,12 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf return (queueName == null) ? confQueueName == null : queueName.equals(confQueueName); } else { // this session should never be a default session unless something has messed up. - throw new HiveException("The pool session " + session + " should have been returned to the pool"); + throw new HiveException("The pool session " + session + " should have been returned to the pool"); } } - public TezSessionState getSession( - TezSessionState session, HiveConf conf, boolean doOpen, boolean llap) throws Exception { + public TezSession getSession( + TezSession session, HiveConf conf, boolean doOpen, boolean llap) throws Exception { if (llap && (this.numConcurrentLlapQueries > 0)) { llapQueue.acquire(); // blocks if no more llap queries can be submitted. } @@ -474,7 +487,7 @@ public TezSessionState getSession( /** Reopens the session that was found to not be running. */ @Override - public TezSessionState reopen(TezSessionState sessionState) throws Exception { + public TezSession reopen(TezSession sessionState) throws Exception { HiveConf sessionConf = sessionState.getConf(); if (sessionState.getQueueName() != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) { @@ -485,7 +498,7 @@ public TezSessionState reopen(TezSessionState sessionState) throws Exception { } static void reopenInternal( - TezSessionState sessionState) throws Exception { + TezSession sessionState) throws Exception { HiveResources resources = sessionState.extractHiveResources(); // TODO: close basically resets the object to a bunch of nulls. // We should ideally not reuse the object because it's pointless and error-prone. @@ -496,11 +509,11 @@ static void reopenInternal( public void closeNonDefaultSessions() throws Exception { - List sessionsToClose = null; + List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions); + sessionsToClose = new ArrayList(openSessions); } - for (TezSessionState sessionState : sessionsToClose) { + for (TezSession sessionState : sessionsToClose) { System.err.println("Shutting down tez session."); closeIfNotDefault(sessionState, false); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index d3748edb86..0387d8938c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -20,20 +20,26 @@ import java.io.IOException; import java.net.URISyntaxException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.login.LoginException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.registry.impl.TezAmInstance; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * TezSession that is aware of the session pool, and also keeps track of expiration and use. @@ -43,57 +49,35 @@ * if it's time, the expiration is triggered; in that case, or if it was already triggered, the * caller gets a different session. When the session is in use when it expires, the expiration * thread ignores it and lets the return to the pool take care of the expiration. + * + * Because of the lack of multiple inheritance in Java, this uses composition. */ @VisibleForTesting -class TezSessionPoolSession extends TezSessionState { +class TezSessionPoolSession implements TezSession { + protected static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolSession.class); private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2; public interface Manager { void registerOpenSession(TezSessionPoolSession session); - void unregisterOpenSession(TezSessionPoolSession session); - void returnAfterUse(TezSessionPoolSession session) throws Exception; - - TezSessionState reopen(TezSessionState session) throws Exception; - - void destroy(TezSessionState session) throws Exception; + TezSession reopen(TezSession session) throws Exception; + void destroy(TezSession session) throws Exception; } - public static abstract class AbstractTriggerValidator { - private ScheduledExecutorService scheduledExecutorService = null; - abstract Runnable getTriggerValidatorRunnable(); - - void startTriggerValidator(long triggerValidationIntervalMs) { - if (scheduledExecutorService == null) { - scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("TriggerValidator").build()); - Runnable triggerValidatorRunnable = getTriggerValidatorRunnable(); - scheduledExecutorService.scheduleWithFixedDelay(triggerValidatorRunnable, triggerValidationIntervalMs, - triggerValidationIntervalMs, TimeUnit.MILLISECONDS); - LOG.info("Started trigger validator with interval: {} ms", triggerValidationIntervalMs); - } - } - void stopTriggerValidator() { - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - scheduledExecutorService = null; - LOG.info("Stopped trigger validator"); - } - } - } private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE); private Long expirationNs; - private final Manager parent; + private final Manager manager; private final SessionExpirationTracker expirationTracker; + private final TezSession superr; - public TezSessionPoolSession(String sessionId, Manager parent, - SessionExpirationTracker tracker, HiveConf conf) { - super(sessionId, conf); - this.parent = parent; + public TezSessionPoolSession(Manager manager, + SessionExpirationTracker tracker, TezSession superr) { + this.superr = superr; + this.manager = manager; this.expirationTracker = tracker; } @@ -106,11 +90,11 @@ Long getExpirationNs() { } @Override - void close(boolean keepTmpDir) throws Exception { + public void close(boolean keepTmpDir) throws Exception { try { - super.close(keepTmpDir); + superr.close(keepTmpDir); } finally { - parent.unregisterOpenSession(this); + manager.unregisterOpenSession(this); if (expirationTracker != null) { expirationTracker.removeFromExpirationQueue(this); } @@ -118,23 +102,70 @@ void close(boolean keepTmpDir) throws Exception { } @Override - protected void openInternal(String[] additionalFiles, - boolean isAsync, LogHelper console, HiveResources resources) - throws IOException, LoginException, URISyntaxException, TezException { - super.openInternal(additionalFiles, isAsync, console, resources); - parent.registerOpenSession(this); + public void open(String[] additionalFilesNotFromConf) + throws LoginException, IOException, URISyntaxException, TezException { + superr.open(additionalFilesNotFromConf); + afterOpen(); + } + + @Override + public void open() throws IOException, LoginException, URISyntaxException, TezException { + superr.open(); + afterOpen(); + } + + @Override + public void open(boolean isPoolInit) throws IOException, LoginException, URISyntaxException, TezException { + superr.open(isPoolInit); + afterOpen(); + } + + private void afterOpen() { + manager.registerOpenSession(this); if (expirationTracker != null) { expirationTracker.addToExpirationQueue(this); } } @Override - public String toString() { - if (expirationNs == null) return super.toString(); - long expiresInMs = (expirationNs - System.nanoTime()) / 1000000L; - return super.toString() + ", expires in " + expiresInMs + "ms"; + public void open(HiveResources resources) + throws LoginException, IOException, URISyntaxException, TezException { + superr.open(resources); + afterOpen(); } + // TODO: this is only supported in CLI, might be good to try to remove it. + @Override + public void beginOpen(String[] additionalFiles, LogHelper console) + throws IOException, LoginException, URISyntaxException, TezException { + superr.beginOpen(additionalFiles, console); + afterOpen(); + } + + @Override + public void endOpen() throws InterruptedException, CancellationException { + superr.endOpen(); + } + + @Override + public void ensureLocalResources(Configuration conf, + String[] newFilesNotFromConf) throws IOException, LoginException, + URISyntaxException, TezException { + superr.ensureLocalResources(conf, newFilesNotFromConf); + } + + @Override + public HiveResources extractHiveResources() { + return superr.extractHiveResources(); + } + + @Override + public Path replaceHiveResources(HiveResources resources, boolean isAsync) { + return superr.replaceHiveResources(resources, isAsync); + } + + // *********** Methods specific to a pool session. + /** * Tries to use this session. When the session is in use, it will not expire. * @return true if the session can be used; false if it has already expired. @@ -192,24 +223,149 @@ private final boolean shouldExpire() { @Override public void returnToSessionManager() throws Exception { - parent.returnAfterUse(this); + manager.returnAfterUse(this); } @Override - public TezSessionState reopen() throws Exception { - return parent.reopen(this); + public TezSession reopen() throws Exception { + return manager.reopen(this); } @Override public void destroy() throws Exception { - parent.destroy(this); + manager.destroy(this); } - boolean isOwnedBy(Manager parent) { - return this.parent == parent; + public boolean isOwnedBy(Manager parent) { + return this.manager == parent; } - void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + public void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { // Nothing to do. } + + @Override + public String toString() { + return super.toString() + getExpirationString(); + } + + private String getExpirationString() { + if (expirationNs == null) return ""; + long expiresInMs = (expirationNs - System.nanoTime()) / 1000000L; + return ", expires in " + expiresInMs + "ms"; + } + + // ********** The methods that we redirect to base. + // We could instead have a separate "data" interface that would "return superr" here, and + // "return this" in the actual session implementation; however that would require everyone to + // call session.getData().method() for some arbitrary set of methods. Let's keep all the + // ugliness in one place. + + @Override + public HiveConf getConf() { + return superr.getConf(); + } + + @Override + public String getSessionId() { + return superr.getSessionId(); + } + + @Override + public String getUser() { + return superr.getUser(); + } + + @Override + public boolean isOpen() { + return superr.isOpen(); + } + + @Override + public void setQueueName(String queueName) { + superr.setQueueName(queueName); + } + + @Override + public String getQueueName() { + return superr.getQueueName(); + } + + @Override + public void setDefault() { + superr.setDefault(); + + } + + @Override + public boolean isDefault() { + return superr.isDefault(); + } + + @Override + public boolean getDoAsEnabled() { + return superr.getDoAsEnabled(); + } + + @Override + public boolean getLegacyLlapMode() { + return superr.getLegacyLlapMode(); + } + + @Override + public void setLegacyLlapMode(boolean b) { + superr.setLegacyLlapMode(b); + } + + @Override + public KillQuery getKillQuery() { + return superr.getKillQuery(); + } + + @Override + public WmContext getWmContext() { + return superr.getWmContext(); + } + + @Override + public void setWmContext(WmContext ctx) { + superr.setWmContext(ctx); + } + + @Override + public LocalResource getAppJarLr() { + return superr.getAppJarLr(); + } + + @Override + public List getLocalizedResources() { + return superr.getLocalizedResources(); + } + + @Override + public TezClient getTezClient() { + return superr.getTezClient(); + } + + @Override + public boolean isOpening() { + return superr.isOpening(); + } + + @Override + public void setOwnerThread() { + superr.setOwnerThread(); + } + + @Override + public void unsetOwnerThread() { + superr.unsetOwnerThread(); + } + + @Override + public void setKillQuery(KillQuery kq) { + superr.setKillQuery(kq); + } + + // ********** End of the methods that we redirect to base. } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 08e65a4a6d..54d92a6c3c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -17,17 +17,13 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.registry.client.api.RegistryOperations; - import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -37,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; + import javax.security.auth.login.LoginException; import org.apache.commons.codec.digest.DigestUtils; @@ -59,12 +56,14 @@ import org.apache.hadoop.hive.llap.tezplugins.LlapTaskCommunicator; import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.session.KillQuery; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -89,16 +88,15 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; /** - * Holds session state related to Tez + * The basic implementation of TezSession. */ @JsonSerialize -public class TezSessionState { +public class TezSessionState implements TezSession { protected static final Logger LOG = LoggerFactory.getLogger(TezSessionState.class.getName()); private static final String TEZ_DIR = "_tez_session_dir"; @@ -107,16 +105,16 @@ private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); - private final HiveConf conf; + protected final HiveConf conf; private Path tezScratchDir; - private LocalResource appJarLr; + protected LocalResource appJarLr; private TezClient session; private Future sessionFuture; /** Console used for user feedback during async session opening. */ private LogHelper console; @JsonProperty("sessionId") private String sessionId; - private final DagUtils utils; + protected final DagUtils utils; @JsonProperty("queueName") private String queueName; @JsonProperty("defaultQueue") @@ -126,22 +124,6 @@ private AtomicReference ownerThread = new AtomicReference<>(null); - public static final class HiveResources { - public HiveResources(Path dagResourcesDir) { - this.dagResourcesDir = dagResourcesDir; - } - /** A directory that will contain resources related to DAGs and specified in configs. */ - public final Path dagResourcesDir; - public final Map additionalFilesNotFromConf = new HashMap(); - /** Localized resources of this session; both from conf and not from conf (above). */ - public final Set localizedResources = new HashSet<>(); - - @Override - public String toString() { - return dagResourcesDir + "; " + additionalFilesNotFromConf.size() + " additional files, " - + localizedResources.size() + " localized resources"; - } - } private HiveResources resources; @JsonProperty("doAsEnabled") @@ -231,50 +213,45 @@ public static String makeSessionId() { return UUID.randomUUID().toString(); } + @Override public void open() throws IOException, LoginException, URISyntaxException, TezException { - String[] noFiles = null; - open(noFiles); + open(false); } /** * Creates a tez session. A session is tied to either a cli/hs2 session. You can * submit multiple DAGs against a session (as long as they are executed serially). */ + @Override public void open(String[] additionalFilesNotFromConf) throws IOException, LoginException, URISyntaxException, TezException { - openInternal(additionalFilesNotFromConf, false, null, null); + openInternal(additionalFilesNotFromConf, false, null, null, false); } + @Override public void open(HiveResources resources) throws LoginException, IOException, URISyntaxException, TezException { - openInternal(null, false, null, resources); + openInternal(null, false, null, resources, false); } + @Override + public void open(boolean isPoolInit) + throws LoginException, IOException, URISyntaxException, TezException { + String[] noFiles = null; + openInternal(noFiles, false, null, null, isPoolInit); + } + + @Override public void beginOpen(String[] additionalFiles, LogHelper console) throws IOException, LoginException, URISyntaxException, TezException { - openInternal(additionalFiles, true, console, null); + openInternal(additionalFiles, true, console, null, false); } protected void openInternal(String[] additionalFilesNotFromConf, - boolean isAsync, LogHelper console, HiveResources resources) + boolean isAsync, LogHelper console, HiveResources resources, boolean isPoolInit) throws IOException, LoginException, URISyntaxException, TezException { - // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. - String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); - if (queueName != null && !queueName.equals(confQueueName)) { - LOG.warn("Resetting a queue name that was already set: was " - + queueName + ", now " + confQueueName); - } - this.queueName = confQueueName; - this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); - - final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar( - conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); - - // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? - UserGroupInformation ugi = Utils.getUGI(); - user = ugi.getShortUserName(); - LOG.info("User of session id " + sessionId + " is " + user); + initQueueAndUser(); // Create the tez tmp dir and a directory for Hive resources. tezScratchDir = createTezDir(sessionId, null); @@ -299,13 +276,7 @@ protected void openInternal(String[] additionalFilesNotFromConf, commonLocalResources.put(DagUtils.getBaseName(lr), lr); } - if (llapMode) { - // localize llap client jars - addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); - addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); - addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); - addJarLRByClass(RegistryOperations.class, commonLocalResources); - } + final boolean llapMode = addLlapJarsIfNeeded(commonLocalResources); // Create environment for AM. Map amEnv = new HashMap(); @@ -313,36 +284,11 @@ protected void openInternal(String[] additionalFilesNotFromConf, // and finally we're ready to create and start the session // generate basic tez config - final TezConfiguration tezConfig = new TezConfiguration(true); - tezConfig.addResource(conf); - - setupTezParamsBasedOnMR(tezConfig); - - // set up the staging directory to use + final TezConfiguration tezConfig = createTezConfig(); tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - conf.stripHiddenConfigurations(tezConfig); - - ServicePluginsDescriptor servicePluginsDescriptor; - Credentials llapCredentials = null; - if (llapMode) { - if (UserGroupInformation.isSecurityEnabled()) { - llapCredentials = new Credentials(); - llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); - } - // TODO Change this to not serialize the entire Configuration - minor. - UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); - // we need plugins to handle llap and uber mode - servicePluginsDescriptor = ServicePluginsDescriptor.create(true, - new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create( - LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) }, - new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create( - LLAP_SERVICE, LLAP_LAUNCHER) }, - new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create( - LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) }); - } else { - servicePluginsDescriptor = ServicePluginsDescriptor.create(true); - } + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + ServicePluginsDescriptor spd = createServicePluginDescriptor(llapMode, tezConfig); // container prewarming. tell the am how many containers we need if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { @@ -353,11 +299,9 @@ protected void openInternal(String[] additionalFilesNotFromConf, tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n); } - setupSessionAcls(tezConfig, conf); - final TezClient session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig) .setIsSession(true).setLocalResources(commonLocalResources) - .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor) + .setCredentials(llapCredentials).setServicePluginDescriptor(spd) .build(); LOG.info("Opening new Tez Session (id: " + sessionId @@ -366,7 +310,7 @@ protected void openInternal(String[] additionalFilesNotFromConf, TezJobMonitor.initShutdownHook(); if (!isAsync) { startSessionAndContainers(session, conf, commonLocalResources, tezConfig, false); - this.session = session; + setTezClient(session); } else { FutureTask sessionFuture = new FutureTask<>(new Callable() { @Override @@ -378,7 +322,7 @@ public TezClient call() throws Exception { } catch (Throwable t) { // The caller has already stopped the session. LOG.error("Failed to start Tez session", t); - throw (t instanceof Exception) ? (Exception)t : new Exception(t); + throw (t instanceof Exception) ? (Exception) t : new Exception(t); } // Check interrupt at the last moment in case we get cancelled quickly. // This is not bulletproof but should allow us to close session in most cases. @@ -397,6 +341,67 @@ public TezClient call() throws Exception { } } + protected static ServicePluginsDescriptor createServicePluginDescriptor(boolean llapMode, + TezConfiguration tezConfig) throws IOException { + if (!llapMode) return ServicePluginsDescriptor.create(true); + // TODO Change this to not serialize the entire Configuration - minor. + UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); + // we need plugins to handle llap and uber mode + return ServicePluginsDescriptor.create(true, + new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create( + LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) }, + new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create( + LLAP_SERVICE, LLAP_LAUNCHER) }, + new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create( + LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) }); + } + + protected final Credentials createLlapCredentials(boolean llapMode, + TezConfiguration tezConfig) throws IOException { + if (!llapMode || !UserGroupInformation.isSecurityEnabled()) return null; + Credentials llapCredentials = new Credentials(); + llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); + return llapCredentials; + } + + protected final TezConfiguration createTezConfig() throws IOException { + TezConfiguration tezConfig = new TezConfiguration(true); + tezConfig.addResource(conf); + setupTezParamsBasedOnMR(tezConfig); + conf.stripHiddenConfigurations(tezConfig); + setupSessionAcls(tezConfig, conf); + return tezConfig; + } + + protected final boolean addLlapJarsIfNeeded(Map commonLocalResources) + throws IOException, LoginException { + if (!"llap".equalsIgnoreCase(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_MODE))) { + return false; + } + // localize llap client jars + addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); + addJarLRByClass(RegistryOperations.class, commonLocalResources); + return true; + } + + protected final void initQueueAndUser() throws LoginException, IOException { + // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. + String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); + if (queueName != null && !queueName.equals(confQueueName)) { + LOG.warn("Resetting a queue name that was already set: was " + + queueName + ", now " + confQueueName); + } + this.queueName = confQueueName; + this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + + // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? + UserGroupInformation ugi = Utils.getUGI(); + user = ugi.getShortUserName(); + LOG.info("User of session id " + sessionId + " is " + user); + } + private static Token getLlapToken( String user, final Configuration conf) throws IOException { // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's @@ -494,7 +499,7 @@ public void endOpen() throws InterruptedException, CancellationException { if (session == null) { throw new RuntimeException("Initialization was interrupted"); } - this.session = session; + setTezClient(session); } catch (ExecutionException e) { throw new RuntimeException(e); } @@ -596,6 +601,7 @@ private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws } /** This is called in openInternal and in TezTask.updateSession to localize conf resources. */ + @Override public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf) throws IOException, LoginException, URISyntaxException, TezException { if (resources == null) { @@ -663,7 +669,8 @@ public void ensureLocalResources(Configuration conf, String[] newFilesNotFromCon * whether or not to remove the scratch dir at the same time. * @throws Exception */ - void close(boolean keepDagFilesDir) throws Exception { + @Override + public void close(boolean keepDagFilesDir) throws Exception { console = null; appJarLr = null; @@ -726,11 +733,17 @@ protected final void cleanupDagResources() throws IOException { } } + @Override public String getSessionId() { return sessionId; } - public TezClient getSession() { + protected final void setTezClient(TezClient session) { + this.session = session; + } + + @Override + public TezClient getTezClient() { if (session == null && sessionFuture != null) { if (!sessionFuture.isDone()) { console.printInfo("Waiting for Tez session and AM to be ready..."); @@ -787,7 +800,7 @@ private Path createTezDir(String sessionId, String suffix) throws IOException { * @throws LoginException when we are unable to determine the user. * @throws URISyntaxException when current jar location cannot be determined. */ - private LocalResource createJarLocalResource(String localJarPath) + protected final LocalResource createJarLocalResource(String localJarPath) throws IOException, LoginException, IllegalArgumentException { // TODO Reduce the number of lookups that happen here. This shouldn't go to HDFS for each call. // The hiveJarDir can be determined once per client. @@ -862,27 +875,34 @@ private String getSha(final Path localFile) throws IOException, IllegalArgumentE } return sha256; } + + @Override public void setQueueName(String queueName) { this.queueName = queueName; } + @Override public String getQueueName() { return queueName; } + @Override public void setDefault() { defaultQueue = true; } + @Override public boolean isDefault() { return defaultQueue; } + @Override public HiveConf getConf() { return conf; } public List getLocalizedResources() { + if (resources == null) return new ArrayList<>(); return new ArrayList<>(resources.localizedResources); } @@ -890,19 +910,22 @@ public String getUser() { return user; } + @Override public boolean getDoAsEnabled() { return doAsEnabled; } /** Mark session as free for use from TezTask, for safety/debugging purposes. */ - public void markFree() { + @Override + public void unsetOwnerThread() { if (ownerThread.getAndSet(null) == null) { throw new AssertionError("Not in use"); } } /** Mark session as being in use from TezTask, for safety/debugging purposes. */ - public void markInUse() { + @Override + public void setOwnerThread() { String newName = Thread.currentThread().getName(); do { String oldName = ownerThread.get(); @@ -913,29 +936,35 @@ public void markInUse() { } while (!ownerThread.compareAndSet(null, newName)); } - void setLegacyLlapMode(boolean value) { + @Override + public void setLegacyLlapMode(boolean value) { this.isLegacyLlapMode = value; } - boolean getLegacyLlapMode() { + @Override + public boolean getLegacyLlapMode() { return this.isLegacyLlapMode; } + @Override public void returnToSessionManager() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().returnSession(this); } - public TezSessionState reopen() throws Exception { + @Override + public TezSession reopen() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. return TezSessionPoolManager.getInstance().reopen(this); } + @Override public void destroy() throws Exception { // By default, TezSessionPoolManager handles this for both pool and non-pool session. TezSessionPoolManager.getInstance().destroy(this); } + @Override public WmContext getWmContext() { return wmContext; } @@ -948,6 +977,7 @@ public void setKillQuery(final KillQuery killQuery) { this.killQuery = killQuery; } + @Override public KillQuery getKillQuery() { return killQuery; } @@ -958,6 +988,7 @@ public HiveResources extractHiveResources() { return result; } + @Override public Path replaceHiveResources(HiveResources resources, boolean isAsync) { Path dir = null; if (this.resources != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f2ed07add5..f2dddfa0d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -133,7 +133,7 @@ public int execute(DriverContext driverContext) { int rc = 1; boolean cleanContext = false; Context ctx = null; - Ref sessionRef = Ref.from(null); + Ref sessionRef = Ref.from(null); try { // Get or create Context object. If we create it we have to clean it later as well. @@ -151,7 +151,7 @@ public int execute(DriverContext driverContext) { SessionState ss = SessionState.get(); // Note: given that we return pool sessions to the pool in the finally block below, and that // we need to set the global to null to do that, this "reuse" may be pointless. - TezSessionState session = sessionRef.value = ss.getTezSession(); + TezSession session = sessionRef.value = ss.getTezSession(); if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } @@ -360,8 +360,8 @@ private void logResources(List additionalLr) { */ @VisibleForTesting void ensureSessionHasResources( - TezSessionState session, String[] nonConfResources) throws Exception { - TezClient client = session.getSession(); + TezSession session, String[] nonConfResources) throws Exception { + TezClient client = session.getTezClient(); // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ? if (client == null) { // Note: the only sane case where this can happen is the non-pool one. We should get rid @@ -523,29 +523,29 @@ private static void setAccessControlsForCurrentUser(DAG dag, String queryId, dag.setAccessControls(ac); } - private TezSessionState getNewTezSessionOnError( - TezSessionState oldSession) throws Exception { + private TezSession getNewTezSessionOnError( + TezSession oldSession) throws Exception { // Note: we don't pass the config to reopen. If the session was already open, it would // have kept running with its current config - preserve that behavior. - TezSessionState newSession = oldSession.reopen(); + TezSession newSession = oldSession.reopen(); console.printInfo("Session re-established."); return newSession; } - DAGClient submit(JobConf conf, DAG dag, Ref sessionStateRef) throws Exception { + DAGClient submit(JobConf conf, DAG dag, Ref sessionStateRef) throws Exception { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); DAGClient dagClient = null; - TezSessionState sessionState = sessionStateRef.value; + TezSession sessionState = sessionStateRef.value; try { try { // ready to start execution on the cluster - dagClient = sessionState.getSession().submitDAG(dag); + dagClient = sessionState.getTezClient().submitDAG(dag); } catch (SessionNotRunning nr) { console.printInfo("Tez session was closed. Reopening..."); sessionStateRef.value = null; sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState); console.printInfo("Session re-established."); - dagClient = sessionState.getSession().submitDAG(dag); + dagClient = sessionState.getTezClient().submitDAG(dag); } } catch (Exception e) { // In case of any other exception, retry. If this also fails, report original error and exit. @@ -554,7 +554,7 @@ DAGClient submit(JobConf conf, DAG dag, Ref sessionStateRef) th + Arrays.toString(e.getStackTrace()) + " retrying..."); sessionStateRef.value = null; sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState); - dagClient = sessionState.getSession().submitDAG(dag); + dagClient = sessionState.getTezClient().submitDAG(dag); } catch (Exception retryException) { // we failed to submit after retrying. Destroy session and bail. sessionStateRef.value = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java index 670184b0ac..c78ce5a21e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TriggerValidatorRunnable.java @@ -43,10 +43,10 @@ @Override public void run() { try { - Map violatedSessions = new HashMap<>(); - final List sessions = sessionTriggerProvider.getSessions(); + Map violatedSessions = new HashMap<>(); + final List sessions = sessionTriggerProvider.getSessions(); final List triggers = sessionTriggerProvider.getTriggers(); - for (TezSessionState sessionState : sessions) { + for (TezSession sessionState : sessions) { WmContext wmContext = sessionState.getWmContext(); if (wmContext != null && !wmContext.isQueryCompleted() && !wmContext.getSubscribedCounters().isEmpty()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index fa2b02e591..8387b43e22 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -18,21 +18,19 @@ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hive.conf.HiveConf; + import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hive.common.util.Ref; -import org.codehaus.jackson.annotate.JsonIgnore; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + @JsonSerialize public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { @JsonProperty("poolName") @@ -56,7 +54,7 @@ private String queryId; private SettableFuture returnFuture = null; - private final WorkloadManager wmParent; + private final WorkloadManager wmManager; /** The actual state of the guaranteed task, and the update state, for the session. */ // Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks. @@ -68,17 +66,16 @@ } private final ActualWmState actualState = new ActualWmState(); - public WmTezSession(String sessionId, WorkloadManager parent, - SessionExpirationTracker expiration, HiveConf conf) { - super(sessionId, parent, expiration, conf); - wmParent = parent; + public WmTezSession( + WorkloadManager manager, SessionExpirationTracker expiration, TezSessionState superr) { + super(manager, expiration, superr); + wmManager = manager; } @VisibleForTesting - WmTezSession(String sessionId, Manager testParent, - SessionExpirationTracker expiration, HiveConf conf) { - super(sessionId, testParent, expiration, conf); - wmParent = null; + WmTezSession(Manager testParent, SessionExpirationTracker expiration, TezSessionState superr) { + super(testParent, expiration, superr); + wmManager = null; } public ListenableFuture waitForAmRegistryAsync( @@ -105,7 +102,7 @@ public WmTezSession(String sessionId, WorkloadManager parent, @Override - void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { + public void updateFromRegistry(TezAmInstance si, int ephSeqVersion) { updateAmEndpointInfo(si, ephSeqVersion); if (si != null) { handleGuaranteedTasksChange(si.getGuaranteedCount()); @@ -138,7 +135,7 @@ public void updateAmEndpointInfo(TezAmInstance si, int ephSeqVersion) { } } } - + private void handleGuaranteedTasksChange(int guaranteedCount) { boolean doNotify = false; @@ -149,7 +146,7 @@ private void handleGuaranteedTasksChange(int guaranteedCount) { doNotify = actualState.target != guaranteedCount; } if (!doNotify) return; - wmParent.notifyOfInconsistentAllocation(this); + wmManager.notifyOfInconsistentAllocation(this); } @Override @@ -199,7 +196,7 @@ Integer setSendingGuaranteed(Integer intAlloc) { return intAlloc; } } - + public String getAllocationState() { synchronized (actualState) { return "actual/target " + actualState.sent + "/" + actualState.target @@ -227,7 +224,7 @@ boolean setFailedToSendGuaranteed() { } public void handleUpdateError(int endpointVersion) { - wmParent.addUpdateError(this, endpointVersion); + wmManager.addUpdateError(this, endpointVersion); } @Override @@ -294,5 +291,4 @@ public String toString() { return super.toString() + ", WM state poolName=" + poolName + ", clusterFraction=" + clusterFraction + ", queryId=" + queryId + ", killReason=" + killReason; } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 5326e3590f..61f4d04995 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -63,7 +63,7 @@ import org.apache.hadoop.hive.metastore.api.WMPoolTrigger; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources; +import org.apache.hadoop.hive.ql.exec.tez.TezSession.HiveResources; import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput; import org.apache.hadoop.hive.ql.exec.tez.WmEvent.EventType; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -96,7 +96,7 @@ * none of that state can be accessed directly - most changes that touch pool state, or interact * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse. */ -public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator +public class WorkloadManager extends AbstractTriggerValidator implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl, WorkloadManagerMxBean { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); @@ -114,6 +114,8 @@ private final int amRegistryTimeoutMs; private final boolean allowAnyPool; private final MetricsSystem metricsSystem; + private final TezExternalSessionsRegistryClient externalSessions; + // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool // sessions, so the pool itself could internally track the ses sions it gave out, since // calling close on an unopened session is probably harmless. @@ -231,6 +233,12 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso metricsSystem = null; } + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_USE_EXTERNAL_SESSIONS)) { + externalSessions = new TezExternalSessionsRegistryClient(conf); + } else { + externalSessions = null; + } + wmThread = new Thread(() -> runWmThread(), "Workload management master"); wmThread.setDaemon(true); wmThread.start(); @@ -280,9 +288,9 @@ private void initTriggers() { public void stop() throws Exception { List sessionsToClose = null; synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); + sessionsToClose = new ArrayList<>(openSessions.keySet()); } - for (TezSessionState sessionState : sessionsToClose) { + for (TezSessionPoolSession sessionState : sessionsToClose) { sessionState.close(false); } if (expirationTracker != null) { @@ -309,7 +317,7 @@ private void updateSessionTriggerProvidersOnMasterThread() { String poolName = entry.getKey(); PoolState poolState = entry.getValue(); final List triggers = Collections.unmodifiableList(poolState.getTriggers()); - final List sessionStates = Collections.unmodifiableList(poolState.getSessions()); + final List sessionStates = Collections.unmodifiableList(poolState.getSessions()); SessionTriggerProvider sessionTriggerProvider = perPoolProviders.get(poolName); if (sessionTriggerProvider != null) { perPoolProviders.get(poolName).setTriggers(triggers); @@ -580,7 +588,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw if (!wasReturned) { syncWork.toDestroyNoRestart.add(sessionToReturn); } else { - if (sessionToReturn.getWmContext() != null && sessionToReturn.getWmContext().isQueryCompleted()) { + WmContext ctx = sessionToReturn.getWmContext(); + if (ctx != null && ctx.isQueryCompleted()) { sessionToReturn.resolveReturnFuture(); } wmEvent.endEvent(sessionToReturn); @@ -665,7 +674,8 @@ private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throw if (!tezAmPool.returnSessionAsync(ctx.session)) { syncWork.toDestroyNoRestart.add(ctx.session); } else { - if (ctx.session.getWmContext() != null && ctx.session.getWmContext().isQueryCompleted()) { + WmContext wmCtx = ctx.session.getWmContext(); + if (wmCtx != null && wmCtx.isQueryCompleted()) { ctx.session.resolveReturnFuture(); } wmEvent.endEvent(ctx.session); @@ -1241,7 +1251,8 @@ private void returnSessionOnFailedReuse( if (!tezAmPool.returnSessionAsync(session)) { syncWork.toDestroyNoRestart.add(session); } else { - if (session.getWmContext() != null && session.getWmContext().isQueryCompleted()) { + WmContext wmCtx = session.getWmContext(); + if (wmCtx != null && wmCtx.isQueryCompleted()) { session.resolveReturnFuture(); } wmEvent.endEvent(session); @@ -1407,11 +1418,11 @@ public String toString() { @VisibleForTesting public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf) throws Exception { + TezSession session, MappingInput input, HiveConf conf) throws Exception { return getSession(session, input, conf, null); } - public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf, + public WmTezSession getSession(TezSession session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET); // Note: not actually used for pool sessions; verify some things like doAs are not set. @@ -1443,7 +1454,7 @@ public WmTezSession getSession(TezSessionState session, MappingInput input, Hive } @Override - public void destroy(TezSessionState session) throws Exception { + public void destroy(TezSession session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); resetGlobalTezSession(wmTezSession); currentLock.lock(); @@ -1577,7 +1588,7 @@ public void notifyInitializationCompleted(SessionInitContext initCtx) { @Override - public TezSessionState reopen(TezSessionState session) throws Exception { + public TezSession reopen(TezSession session) throws Exception { WmTezSession wmTezSession = ensureOwnedSession(session); HiveConf sessionConf = wmTezSession.getConf(); if (sessionConf == null) { @@ -1615,7 +1626,7 @@ private void notifyWmThreadUnderLock() { hasChangesCondition.signalAll(); } - private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { + private WmTezSession checkSessionForReuse(TezSession session) throws Exception { if (session == null) return null; WmTezSession result = null; if (session instanceof WmTezSession) { @@ -1664,10 +1675,16 @@ private WmTezSession createSession(HiveConf conf) { protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { conf = (conf == null) ? new HiveConf(this.conf) : conf; conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true"); - return new WmTezSession(sessionId, this, expirationTracker, conf); + TezSessionState base = null; + if (externalSessions != null) { + base = new TezExternalSessionState(sessionId, conf, externalSessions); + } else { + base = new TezSessionState(sessionId, conf); + } + return new WmTezSession(this, expirationTracker, base); } - private WmTezSession ensureOwnedSession(TezSessionState oldSession) { + private WmTezSession ensureOwnedSession(TezSession oldSession) { if (!(oldSession instanceof WmTezSession) || !((WmTezSession)oldSession).isOwnedBy(this)) { throw new AssertionError("Not a WM session " + oldSession); } @@ -1689,7 +1706,7 @@ public void unregisterOpenSession(TezSessionPoolSession session) { synchronized (openSessions) { openSessions.remove(session); } - tezAmPool.notifyClosed(session); + tezAmPool.notifyClosed(ensureOwnedSession(session)); } @VisibleForTesting diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java index 4b5022a101..91ec1bf05b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerFederation.java @@ -28,7 +28,7 @@ public class WorkloadManagerFederation { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManagerFederation.class); - public static TezSessionState getSession(TezSessionState session, HiveConf conf, + public static TezSession getSession(TezSession session, HiveConf conf, MappingInput input, boolean isUnmanagedLlapMode, WmContext wmContext) throws Exception { Set desiredCounters = new HashSet<>(); // 1. If WM is not present just go to unmanaged. @@ -59,8 +59,8 @@ public static TezSessionState getSession(TezSessionState session, HiveConf conf, } } - private static TezSessionState getUnmanagedSession( - TezSessionState session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode, + private static TezSession getUnmanagedSession( + TezSession session, HiveConf conf, Set desiredCounters, boolean isWorkLlapNode, final WmContext wmContext) throws Exception { TezSessionPoolManager pm = TezSessionPoolManager.getInstance(); session = pm.getSession(session, conf, false, isWorkLlapNode); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 71e130b608..66e0ea937b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.history.HiveHistoryImpl; @@ -230,7 +231,7 @@ private Map> localMapRedErrors; - private TezSessionState tezSessionState; + private TezSession tezSessionState; private String currentDatabase; @@ -1877,23 +1878,23 @@ public static PerfLogger getPerfLogger(boolean resetPerfLogger) { } } - public TezSessionState getTezSession() { + public TezSession getTezSession() { return tezSessionState; } /** Called from TezTask to attach a TezSession to use to the threadlocal. Ugly pattern... */ - public void setTezSession(TezSessionState session) { + public void setTezSession(TezSession session) { if (tezSessionState == session) { return; // The same object. } if (tezSessionState != null) { - tezSessionState.markFree(); + tezSessionState.unsetOwnerThread(); tezSessionState.setKillQuery(null); tezSessionState = null; } tezSessionState = session; if (session != null) { - session.markInUse(); + session.setOwnerThread(); tezSessionState.setKillQuery(getKillQuery()); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java index 16106f481b..677c7412ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java +++ ql/src/java/org/apache/hadoop/hive/ql/wm/SessionTriggerProvider.java @@ -17,21 +17,22 @@ import java.util.List; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; /** * Implementation for providing current open sessions and active trigger. */ public class SessionTriggerProvider { - private List sessions; + private List sessions; private List triggers; - public SessionTriggerProvider(List openSessions, List triggers) { + public SessionTriggerProvider(List openSessions, List triggers) { this.sessions = openSessions; this.triggers = triggers; } - public List getSessions() { + public List getSessions() { return sessions; } @@ -39,7 +40,7 @@ public SessionTriggerProvider(List openSessions, List return triggers; } - public void setSessions(final List sessions) { + public void setSessions(final List sessions) { this.sessions = sessions; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index f5ab981f26..b856e10d9b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -23,9 +23,13 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; + import java.io.IOException; +import java.net.URISyntaxException; import java.util.concurrent.ScheduledExecutorService; + import javax.security.auth.login.LoginException; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -48,8 +52,9 @@ public SampleTezSessionState( String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) { - super(sessionId, parent, (parent instanceof TezSessionPoolManager) - ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf); + super(parent, (parent instanceof TezSessionPoolManager) + ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, + new TezSessionState(sessionId, conf)); this.sessionId = sessionId; this.hiveConf = conf; waitForAmRegFuture = createDefaultWaitForAmRegistryFuture(); @@ -79,6 +84,12 @@ public void open() throws LoginException, IOException { } @Override + public void open(boolean isPoolInit) throws IOException, LoginException, + URISyntaxException, TezException { + open(); + } + + @Override public void open(HiveResources resources) throws LoginException, IOException { open(); } @@ -89,7 +100,7 @@ public void open(String[] additionalFiles) throws IOException, LoginException { } @Override - void close(boolean keepTmpDir) throws TezException, IOException { + public void close(boolean keepTmpDir) throws TezException, IOException { open = keepTmpDir; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d5b683f788..68b9c3e6a1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -67,13 +67,13 @@ public void setUp() { public void testGetNonDefaultSession() { poolManager = new TestTezSessionPoolManager(); try { - TezSessionState sessionState = poolManager.getSession(null, conf, true, false); - TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false); + TezSession sessionState = poolManager.getSession(null, conf, true, false); + TezSession sessionState1 = poolManager.getSession(sessionState, conf, true, false); if (sessionState1 != sessionState) { fail(); } conf.set("tez.queue.name", "nondefault"); - TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false); + TezSession sessionState2 = poolManager.getSession(sessionState, conf, true, false); if (sessionState2 == sessionState) { fail(); } @@ -97,7 +97,7 @@ public void testSessionPoolGetInOrder() { // this is now a LIFO operation // draw 1 and replace - TezSessionState sessionState = poolManager.getSession(null, conf, true, false); + TezSession sessionState = poolManager.getSession(null, conf, true, false); assertEquals("a", sessionState.getQueueName()); poolManager.returnSession(sessionState); @@ -108,13 +108,13 @@ public void testSessionPoolGetInOrder() { // [a,b,c,a,b,c] // draw 2 and return in order - further run should return last returned - TezSessionState first = poolManager.getSession(null, conf, true, false); - TezSessionState second = poolManager.getSession(null, conf, true, false); + TezSession first = poolManager.getSession(null, conf, true, false); + TezSession second = poolManager.getSession(null, conf, true, false); assertEquals("a", first.getQueueName()); assertEquals("b", second.getQueueName()); poolManager.returnSession(first); poolManager.returnSession(second); - TezSessionState third = poolManager.getSession(null, conf, true, false); + TezSession third = poolManager.getSession(null, conf, true, false); assertEquals("b", third.getQueueName()); poolManager.returnSession(third); @@ -157,7 +157,7 @@ public void testSessionPoolThreads() { poolManager = new TestTezSessionPoolManager(); poolManager.setupPool(conf); poolManager.startPool(conf, null); - TezSessionState[] sessions = new TezSessionState[12]; + TezSession[] sessions = new TezSession[12]; int[] queueCounts = new int[3]; for (int i = 0; i < sessions.length; ++i) { sessions[i] = poolManager.getSession(null, conf, true, false); @@ -184,7 +184,7 @@ public void testSessionReopen() { conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE, 1); poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.getQueueName()).thenReturn("default"); Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); @@ -192,7 +192,7 @@ public void testSessionReopen() { poolManager.reopen(session); Mockito.verify(session).close(false); - Mockito.verify(session).open(Mockito.any()); + Mockito.verify(session).open(Mockito.any()); // mocked session starts with default queue assertEquals("default", session.getQueueName()); @@ -278,7 +278,7 @@ public void run() { tmpConf.set("tez.queue.name", ""); } - TezSessionState session = poolManager.getSession(null, tmpConf, true, llap); + TezSession session = poolManager.getSession(null, tmpConf, true, llap); Thread.sleep((random.nextInt(9) % 10) * 1000); session.setLegacyLlapMode(llap); poolManager.returnSession(session); @@ -323,20 +323,20 @@ public void testReturn() { @Test public void testCloseAndOpenDefault() throws Exception { poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.isDefault()).thenReturn(false); Mockito.when(session.getConf()).thenReturn(conf); poolManager.reopen(session); Mockito.verify(session).close(false); - Mockito.verify(session).open(Mockito.any()); + Mockito.verify(session).open(Mockito.any()); } @Test public void testSessionDestroy() throws Exception { poolManager = new TestTezSessionPoolManager(); - TezSessionState session = Mockito.mock(TezSessionState.class); + TezSession session = Mockito.mock(TezSession.class); Mockito.when(session.isDefault()).thenReturn(false); poolManager.destroy(session); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index b67aec371d..a8eb3b14b9 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -172,7 +172,7 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { SessionState.start(hiveConf); session = mock(TezClient.class); sessionState = mock(TezSessionState.class); - when(sessionState.getSession()).thenReturn(session); + when(sessionState.getTezClient()).thenReturn(session); when(sessionState.reopen()).thenReturn(sessionState); when(session.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 4659ecb97b..45366ece1c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -62,7 +62,6 @@ @RunWith(RetryTestRunner.class) public class TestWorkloadManager { - @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class); private final class GetSessionRunnable implements Runnable { @@ -227,7 +226,7 @@ protected WmTezSession createSessionObject(String sessionId, HiveConf conf) { @Override public WmTezSession getSession( - TezSessionState session, MappingInput input, HiveConf conf, + TezSession session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception { // We want to wait for the iteration to finish and set the cluster fraction. WmTezSession state = super.getSession(session, input, conf, null); @@ -236,7 +235,7 @@ public WmTezSession getSession( } @Override - public void destroy(TezSessionState session) throws Exception { + public void destroy(TezSession session) throws Exception { super.destroy(session); ensureWm(); } @@ -252,7 +251,7 @@ public void returnAfterUse(TezSessionPoolSession session) throws Exception { } @Override - public TezSessionState reopen(TezSessionState session) throws Exception { + public TezSession reopen(TezSession session) throws Exception { session = super.reopen(session); ensureWm(); return session; @@ -269,10 +268,10 @@ public void testReuse() throws Exception { MockQam qam = new MockQam(); WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); wm.start(); - TezSessionState nonPool = mock(TezSessionState.class); + TezSession nonPool = mock(TezSession.class); when(nonPool.getConf()).thenReturn(conf); doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, mappingInput("user"), conf); + TezSession session = wm.getSession(nonPool, mappingInput("user"), conf); verify(nonPool).close(anyBoolean()); assertNotSame(nonPool, session); session.returnToSessionManager(); @@ -282,7 +281,7 @@ public void testReuse() throws Exception { session = wm.getSession(diffPool, mappingInput("user"), conf); verify(diffPool).returnToSessionManager(); assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, mappingInput("user"), conf); + TezSession session2 = wm.getSession(session, mappingInput("user"), conf); assertSame(session, session2); } @@ -294,7 +293,7 @@ public void testQueueName() throws Exception { wm.start(); // The queue should be ignored. conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, mappingInput("user"), conf); + TezSession session = wm.getSession(null, mappingInput("user"), conf); assertEquals("test", session.getQueueName()); assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); session.setQueueName("test2"); @@ -415,7 +414,7 @@ public void testMappings() throws Exception { verifyMapping(wm, conf, mappingInput("zzz", groups("g0", "g1"), "g1"), "g1"); // Explicit pool specification - invalid - there's no mapping that matches. try { - TezSessionState r = wm.getSession( + TezSession r = wm.getSession( null, mappingInput("u0", groups("g0", "g1"), "u2"), conf); fail("Expected failure, but got " + r); } catch (Exception ex) { @@ -428,7 +427,7 @@ public void testMappings() throws Exception { verifyMapping(wm, conf, mappingInput("u0", groups("g0", "g1"), "u2"), "u2"); // The mapping that doesn't exist still shouldn't work. try { - TezSessionState r = wm.getSession( + TezSession r = wm.getSession( null, mappingInput("u0", groups("g0", "g1"), "zzz"), conf); fail("Expected failure, but got " + r); } catch (Exception ex) { @@ -824,7 +823,7 @@ public void testDisableEnable() throws Exception { assertEquals(0, tezAmPool.getCurrentSize()); try { - TezSessionState r = wm.getSession(null, mappingInput("A", null), conf, null); + TezSession r = wm.getSession(null, mappingInput("A", null), conf, null); fail("Expected an error but got " + r); } catch (WorkloadManager.NoPoolMappingException ex) { // Ignore, this particular error is expected. @@ -1165,7 +1164,7 @@ public void testAsyncSessionInitFailures() throws Exception { SettableFuture failedWait = SettableFuture.create(); failedWait.setException(new Exception("foo")); theOnlySession.setWaitForAmRegistryFuture(failedWait); - TezSessionState retriedSession = wm.getSession(null, mappingInput("A"), conf); + TezSession retriedSession = wm.getSession(null, mappingInput("A"), conf); assertNotNull(retriedSession); assertNotSame(theOnlySession, retriedSession); // Should have been replaced. retriedSession.returnToSessionManager(); @@ -1175,7 +1174,7 @@ public void testAsyncSessionInitFailures() throws Exception { theOnlySession.setWaitForAmRegistryFuture(failedWait); wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry. try { - TezSessionState r = wm.getSession(null, mappingInput("A"), conf); + TezSession r = wm.getSession(null, mappingInput("A"), conf); fail("Expected an error but got " + r); } catch (Exception ex) { // Expected.