diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ef22d6ff2..e566c20562 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3091,6 +3091,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE("hive.server2.active.passive.ha.enable", false, "Whether HiveServer2 Active/Passive High Availability be enabled when Hive Interactive sessions are enabled." + "This will also require hive.server2.support.dynamic.service.discovery to be enabled."), + HIVE_SERVER2_AP_HA_RECOVER_SESSIONS("hive.server2.active.passive.ha.recover.sessions", + true, "Whether to recover sessions if using active-passive HA."), HIVE_SERVER2_ACTIVE_PASSIVE_HA_REGISTRY_NAMESPACE("hive.server2.active.passive.ha.registry.namespace", "hs2ActivePassiveHA", "When HiveServer2 Active/Passive High Availability is enabled, uses this namespace for registering HS2\n" + diff --git llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java index cc1ba33752..44f5b2ee26 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/ServiceInstanceStateChangeListener.java @@ -15,6 +15,8 @@ */ package org.apache.hadoop.hive.registry; +import java.io.IOException; + /** * Callback listener for instance state change events */ @@ -24,7 +26,7 @@ * * @param serviceInstance - created service instance */ - void onCreate(InstanceType serviceInstance, int ephSeqVersion); + void onCreate(InstanceType serviceInstance, int ephSeqVersion) throws IOException; /** * Called when an existing {@link ServiceInstance} is updated. diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java index a8629479ec..a19908ca6a 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -58,10 +58,22 @@ public int getGuaranteedCount() { return Integer.parseInt(str); } + public String getApplicationId() { + return getProperties().get(TezAmRegistryImpl.AM_APP_ID); + } + public String getPluginTokenJobId() { return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID); } + public long getAmAgeMs() { + // See the TODO where AM_APP_START_MS is populated in the service record. + String amStartMsStr = getProperties().get(TezAmRegistryImpl.AM_APP_START_MS); + if (StringUtils.isBlank(amStartMsStr)) return -1; + long amStartMs = Long.parseLong(amStartMsStr); + return System.currentTimeMillis() - amStartMs; + } + public Token getPluginToken() { if (this.token != null) return token; String tokenString = getProperties().get(TezAmRegistryImpl.AM_PLUGIN_TOKEN); @@ -83,5 +95,4 @@ public String toString() { return "TezAmInstance [" + getSessionId() + ", host=" + getHost() + ", rpcPort=" + getRpcPort() + ", pluginPort=" + pluginPort + ", token=" + token + "]"; } - } \ No newline at end of file diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java index 3ff732d9b7..cc6588a326 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -36,15 +36,17 @@ static final String IPC_TEZCLIENT = "tez-client"; static final String IPC_PLUGIN = "llap-plugin"; static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", - AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count"; + AM_PLUGIN_JOBID = "am.plugin.jobid", AM_GUARANTEED_COUNT = "am.guaranteed.count", + AM_APP_ID = "am.app.id", AM_APP_START_MS = "am.age"; + private final static String NAMESPACE_PREFIX = "tez-am-"; private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; private final String registryName; private ServiceRecord srv; - public static TezAmRegistryImpl create(Configuration conf, boolean useSecureZk) { - String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); + public static TezAmRegistryImpl create( + String amRegistryName, Configuration conf, boolean useSecureZk) { return StringUtils.isBlank(amRegistryName) ? null : new TezAmRegistryImpl(amRegistryName, conf, useSecureZk); } @@ -69,8 +71,8 @@ public void populateCache(boolean doInvokeListeners) throws IOException { populateCache(pcc, doInvokeListeners); } - public String register(int amPort, int pluginPort, String sessionId, - String serializedToken, String jobIdForToken, int guaranteedCount) throws IOException { + public String register(int amPort, int pluginPort, String sessionId, String serializedToken, + String jobIdForToken, int guaranteedCount, String applicationId) throws IOException { if (srv != null) { throw new UnsupportedOperationException("Already registered with " + srv); } @@ -84,8 +86,14 @@ public String register(int amPort, int pluginPort, String sessionId, IPC_PLUGIN, new InetSocketAddress(hostname, pluginPort)); srv.addInternalEndpoint(pluginEndpoint); } + srv.set(AM_APP_ID, applicationId); srv.set(AM_SESSION_ID, sessionId); boolean hasToken = serializedToken != null; + // TODO: this is kind of brittle. First, the clock can be adjusted and then, there could be + // discrepancies between machines. A better approach would be storing AM age based on + // nanoTime, and updating it every now and then. However, we are measuring days, or at + // least hours, so this should be reasonable for the first cut. + srv.set(AM_APP_START_MS, System.currentTimeMillis()); srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : ""); srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : ""); srv.set(AM_GUARANTEED_COUNT, Integer.toString(guaranteedCount)); diff --git llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index 7ca3548561..f6f986a9d2 100644 --- llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -485,7 +485,8 @@ protected final void populateCache(PathChildrenCache instancesCache, boolean doI private final Logger LOG = LoggerFactory.getLogger(InstanceStateChangeListener.class); @Override - public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) { + public void childEvent(final CuratorFramework client, + final PathChildrenCacheEvent event) throws IOException { Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED, "client is not started"); diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 82179645da..ebf35b0d4c 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -268,6 +268,7 @@ public void setError(TaskInfo ctx, Throwable t) { private long tgVersionSent = Long.MIN_VALUE; private LlapTaskCommunicator communicator; + private final String amAppId, amHiveSessionId; private final int amPort; private final String serializedToken, jobIdForToken; // We expect the DAGs to not be super large, so store full dependency set for each vertex to @@ -287,11 +288,12 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { public static final String LLAP_PLUGIN_ENDPOINT_ENABLED = "llap.plugin.endpoint.enabled"; @VisibleForTesting - public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock, - boolean initMetrics) { + public LlapTaskSchedulerService( + TaskSchedulerContext taskSchedulerContext, Clock clock, boolean initMetrics) { super(taskSchedulerContext); this.clock = clock; this.amPort = taskSchedulerContext.getAppClientPort(); + this.amAppId = taskSchedulerContext.getApplicationAttemptId().getApplicationId().toString(); this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable(); try { this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); @@ -299,6 +301,7 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock throw new TezUncheckedException( "Failed to parse user payload for " + LlapTaskSchedulerService.class.getSimpleName(), e); } + this.amHiveSessionId = HiveConf.getVar(conf, ConfVars.HIVESESSIONID); if (conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) { JobTokenSecretManager sm = null; @@ -394,7 +397,8 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock LOG.info("Running with configuration: hosts={}, numSchedulableTasksPerNode={}, " + "nodeBlacklistConf={}, localityConf={}", hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf); - this.amRegistry = TezAmRegistryImpl.create(conf, true); + String registryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); + this.amRegistry = TezAmRegistryImpl.create(registryName, conf, true); synchronized (LlapTaskCommunicator.pluginInitLock) { LlapTaskCommunicator peer = LlapTaskCommunicator.instance; @@ -764,8 +768,8 @@ public Void call() throws Exception { if (amRegistry != null) { amRegistry.start(); int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1; - amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID), - serializedToken, jobIdForToken, 0); + amRegistry.register(amPort, pluginPort, amHiveSessionId, + serializedToken, jobIdForToken, 0, amAppId); } } finally { writeLock.unlock(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java index df14f11226..c018920246 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java @@ -194,10 +194,22 @@ public void stop() { } } + // A paranoid safety margin to ageMs in case of recovery. + private static final double AM_AGE_SAFETY_MARGIN = 1.05; - public void addToExpirationQueue(TezSessionPoolSession session) { + public boolean isOldAmUsable(long amAgeMs) { + return sessionLifetimeMs > (long)(amAgeMs * AM_AGE_SAFETY_MARGIN); + } + + public boolean addToExpirationQueue(TezSessionPoolSession session, long amAgeMs) { long jitterModMs = (long)(sessionLifetimeJitterMs * rdm.nextFloat()); - session.setExpirationNs(System.nanoTime() + (sessionLifetimeMs + jitterModMs) * 1000000L); + // We'll add a paranoid safety margin to ageMs in case of recovery. + long amLifetimeMs = sessionLifetimeMs + jitterModMs - (long)(amAgeMs * AM_AGE_SAFETY_MARGIN); + if (amLifetimeMs <= 0) { + assert amAgeMs > 0; + return false; + } + session.setExpirationNs(System.nanoTime() + amLifetimeMs * 1000000L); if (LOG.isDebugEnabled()) { LOG.debug("Adding a pool session [" + this + "] to expiration queue"); } @@ -218,6 +230,7 @@ public void addToExpirationQueue(TezSessionPoolSession session) { expirationQueue.add(session); expirationQueue.notifyAll(); } + return true; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 89954cba67..922d5699f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -23,13 +23,12 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import com.google.common.util.concurrent.SettableFuture; + import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.FutureTask; @@ -37,7 +36,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.fs.Path; + +import javax.security.auth.login.LoginException; + +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.session.SessionState; @@ -45,6 +47,7 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,15 +58,18 @@ private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class); public interface SessionObjectFactory { - SessionType create(SessionType oldSession); + SessionType create(SessionType oldSession, String sessionId); } private final HiveConf initConf; - private int initialSize = 0; // For testing only. + private int initialSize = 0; private final SessionObjectFactory sessionObjFactory; + /** The main lock for pool, asyncRequests, etc. */ private final ReentrantLock poolLock = new ReentrantLock(true); private final Condition notEmpty = poolLock.newCondition(); + /** The exclusion between pool initialization and resize; see resizeAsync comments. */ + private final Object poolInitLock = new Object(); private final LinkedList pool = new LinkedList<>(); private final LinkedList> asyncRequests = new LinkedList<>(); /** @@ -79,6 +85,7 @@ private final String amRegistryName; private final TezAmRegistryImpl amRegistry; + private final ChangeListener amChangeListener; private final ConcurrentHashMap bySessionId = new ConcurrentHashMap<>(); @@ -86,38 +93,80 @@ // TODO: rather, Tez sessions should not depend on SessionState. private SessionState parentSessionState; - TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent, + TezSessionPool(HiveConf initConf, int numSessionsTotal, String amRegistryName, SessionObjectFactory sessionFactory) { this.initConf = initConf; this.initialSize = numSessionsTotal; - this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null; - this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName(); + if (amRegistryName != null) { + this.amRegistry = TezAmRegistryImpl.create(amRegistryName, initConf, true); + this.amRegistryName = amRegistry.getRegistryName(); + this.amChangeListener = new ChangeListener(); + } else { + this.amRegistry = null; + this.amRegistryName = null; + this.amChangeListener = null; + } this.sessionObjFactory = sessionFactory; } - void start() throws Exception { + void start(boolean recoverAms) throws Exception { + if (amRegistry == null && recoverAms) { + throw new IllegalStateException("Registry not initialized for AM recovery"); + } + this.parentSessionState = SessionState.get(); + if (parentSessionState == null) { + // Tez session wrapper currently depends on SessionState for some unnecessary stuff. + LOG.warn("Hive session state is not present during initialization"); + } + + synchronized (poolInitLock) { + startUnderInitLock(recoverAms); + } + } + + private void startUnderInitLock(boolean recoverAms) throws Exception { if (amRegistry != null) { amRegistry.start(); amRegistry.initializeWithoutRegistering(); - // Note: we may later have special logic to pick up old AMs, if any. - amRegistry.registerStateChangeListener(new ChangeListener()); + // Note: this style of state management assumes that noone else (in this process) + // will use the pool, or create sessions, while this is ongoing. + amChangeListener.setRecoveryMode(recoverAms); + amRegistry.registerStateChangeListener(amChangeListener); amRegistry.populateCache(true); + amChangeListener.setRecoveryMode(false); } - this.parentSessionState = SessionState.get(); - if (initialSize == 0) return; // May be resized later. + int sessionsToCreate = initialSize; + + if (recoverAms) { + poolLock.lock(); + try { + if (sessionsToCreate < pool.size()) { + // The recovery code should have handled this. + throw new AssertionError("We've recovered more sessions than we need: " + + pool.size() + "/" + sessionsToCreate); + } + sessionsToCreate -= pool.size(); + } finally { + poolLock.unlock(); + } + } + + LOG.info("Creating " + sessionsToCreate + " new sessions"); - int threadCount = Math.min(initialSize, + if (sessionsToCreate == 0) return; // May be resized later. + + int threadCount = Math.min(sessionsToCreate, HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); Preconditions.checkArgument(threadCount > 0); if (threadCount == 1) { - for (int i = 0; i < initialSize; ++i) { - SessionType session = sessionObjFactory.create(null); + for (int i = 0; i < sessionsToCreate; ++i) { + SessionType session = sessionObjFactory.create(null, null); if (session == null) break; startInitialSession(session); } } else { - final AtomicInteger remaining = new AtomicInteger(initialSize); + final AtomicInteger remaining = new AtomicInteger(sessionsToCreate); @SuppressWarnings("unchecked") FutureTask[] threadTasks = new FutureTask[threadCount]; for (int i = threadTasks.length - 1; i >= 0; --i) { @@ -186,7 +235,7 @@ private boolean returnSessionInternal(SessionType session, boolean isAsync) { sessionState.setTezSession(null); } if (!session.stopUsing()) return true; // The session will be restarted and return to us. - boolean canPutBack = putSessionBack(session, true); + boolean canPutBack = putSessionBack(session, true, false); if (canPutBack) return true; if (LOG.isDebugEnabled()) { LOG.debug("Closing an unneeded returned session " + session); @@ -205,7 +254,7 @@ private boolean returnSessionInternal(SessionType session, boolean isAsync) { * Puts session back into the pool. * @return true if the session has been put back; false if it's not needed and should be killed. */ - private boolean putSessionBack(SessionType session, boolean isFirst) { + private boolean putSessionBack(SessionType session, boolean isFirst, boolean isRecovery) { SettableFuture future = null; poolLock.lock(); try { @@ -218,6 +267,14 @@ private boolean putSessionBack(SessionType session, boolean isFirst) { return false; } } + if (isRecovery && initialSize <= pool.size()) { + // For the reconnect case only, validate against the initialSize. + // When not in recovery mode, we'll never put back more sessions than needed unless the + // pool has been resized down, which is handled by delta due to sync convenience for + // multiple parallel resize requests; recovery cannot use the delta because the initial + // pool size is unknown. + return false; + } // If there are async requests, satisfy them first. if (!asyncRequests.isEmpty()) { if (!session.tryUse(false)) { @@ -245,7 +302,7 @@ private boolean putSessionBack(SessionType session, boolean isFirst) { void replaceSession(SessionType oldSession) throws Exception { // Re-setting the queue config is an old hack that we may remove in future. - SessionType newSession = sessionObjFactory.create(oldSession); + SessionType newSession = sessionObjFactory.create(oldSession, null); String queueName = oldSession.getQueueName(); try { oldSession.close(false); @@ -274,7 +331,7 @@ void replaceSession(SessionType oldSession) throws Exception { SessionState.setCurrentSessionState(parentSessionState); } newSession.open(); - if (!putSessionBack(newSession, false)) { + if (!putSessionBack(newSession, false, false)) { if (LOG.isDebugEnabled()) { LOG.debug("Closing an unneeded session " + newSession + "; trying to replace " + oldSession); @@ -296,7 +353,7 @@ private void startInitialSession(SessionType session) throws Exception { configureAmRegistry(session); session.open(); if (session.stopUsing()) { - if (!putSessionBack(session, false)) { + if (!putSessionBack(session, false, false)) { LOG.warn("Couldn't add a session during initialization"); try { session.close(false); @@ -320,10 +377,32 @@ private void configureAmRegistry(SessionType session) { private final class ChangeListener implements ServiceInstanceStateChangeListener { + private boolean isRecoveryMode = false; + + public void setRecoveryMode(boolean value) { + this.isRecoveryMode = value; + } + @Override - public void onCreate(TezAmInstance si, int ephSeqVersion) { + public void onCreate(TezAmInstance si, int ephSeqVersion) throws IOException { String sessionId = si.getSessionId(); SessionType session = bySessionId.get(sessionId); + if (!isRecoveryMode) { + onCreateNew(si, ephSeqVersion, sessionId, session); + return; + } + // In collect mode, we treat every AM as old; this HS2 is definitely not starting AMs. + // We don't expect anyone else to be either, but there could be Tez-side recovery. + if (session != null) { + // No one should populate bySessionId at this stage. Ignore; we could also kill it in YARN. + LOG.warn("We are collecting existing AMs; the session " + session + " is unexpected"); + return; + } + reconnectToExistingSession(si, ephSeqVersion, sessionId); + } + + private void onCreateNew(TezAmInstance si, int ephSeqVersion, + String sessionId, SessionType session) { if (session != null) { LOG.info("AM for " + sessionId + ", v." + ephSeqVersion + " has registered; updating [" + session + "] with an endpoint at " + si.getPluginPort()); @@ -374,15 +453,21 @@ int getInitialSize() { */ public ListenableFuture resizeAsync(int delta, List toClose) { if (delta == 0) return createDummyFuture(); - poolLock.lock(); - try { - if (delta < 0) { - return resizeDownInternal(-delta, toClose); - } else { - return resizeUpInternal(delta); + // We are potentially going to block the WM thread here for a long time, a terrible crime. + // This only happens if the resource plan changes during pool initialization; the complexity + // of syncing resize with initialization in a non-blocking manner is not justified, esp. given + // that no-one can use the sessions anyway until the initialization is completed. + synchronized (poolInitLock) { + poolLock.lock(); + try { + if (delta < 0) { + return resizeDownInternal(-delta, toClose); + } else { + return resizeUpInternal(delta); + } + } finally { + poolLock.unlock(); } - } finally { - poolLock.unlock(); } } @@ -463,7 +548,7 @@ public Boolean call() throws Exception { int oldVal = remaining.get(); if (oldVal <= 0) return true; if (!remaining.compareAndSet(oldVal, oldVal - 1)) continue; - startInitialSession(sessionObjFactory.create(null)); + startInitialSession(sessionObjFactory.create(null, null)); } } } @@ -484,4 +569,52 @@ int getCurrentSize() { public void notifyClosed(TezSessionState session) { bySessionId.remove(session.getSessionId()); } + + private void reconnectToExistingSession( + TezAmInstance si, int ephSeqVersion, String sessionId) throws IOException { + SessionType session = sessionObjFactory.create(null, sessionId); + if (session == null) { + // This can only happen in the non-WM factory; the factory itself would need to be fixed. + throw new RuntimeException("Cannot create a session object"); + } + String applicationId = si.getApplicationId(); + if (StringUtils.isBlank(applicationId)) { + LOG.warn("Cannot reconnect; no applicationId in " + si); + return; // Ignore; we couldn't even kill it in YARN; maybe via Tez AM host/port info. + } + boolean isUsable = session.tryUse(true); + if (!isUsable) { + throw new IOException(session + " is not usable at pool startup"); + } + session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName()); + long ageMs = si.getAmAgeMs(); + try { + // Note: this depends on SessionState object, sadly (e.g. for scratch dir). + // We need to make sure it's set up - right now, we are running on the init + // thread, so it should be ok. + // TODO: how expensive can this be? do we need a threadpool, like the one for init? + if (!session.reconnect(applicationId, ageMs)) { + return; // The session is too old; reconnect takes care of getting rid of it. + } + } catch (LoginException | URISyntaxException | TezException e) { + throw new IOException(e); + } + // Account for the session in internal structures, and update session conf. + configureAmRegistry(session); + if (!session.stopUsing()) { + LOG.warn("The session has expired during initialization: " + session); + return; + } + // See if we need this session. There may be more running than configured pool size. + if (!putSessionBack(session, false, true)) { + LOG.warn("Closing an unneeded session during initialization: " + session); + try { + session.close(false); + } catch (Exception ex) { + LOG.error("Failed to close an unneeded session", ex); + } + } + // Propagate registry information to the session itself. + session.updateFromRegistry(si, ephSeqVersion); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 2633390861..ca30f1a4d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -106,7 +106,7 @@ public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) thro throw new AssertionError("setupPool or setupNonPool needs to be called first"); } if (defaultSessionPool != null) { - defaultSessionPool.start(); + defaultSessionPool.start(false); } if (expirationTracker != null) { expirationTracker.start(); @@ -132,16 +132,16 @@ public void setupPool(HiveConf conf) throws Exception { int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); if (numSessionsTotal > 0) { - boolean enableAmRegistry = false; - defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry, + defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, null, new TezSessionPool.SessionObjectFactory() { int queueIx = 0; @Override - public TezSessionPoolSession create(TezSessionPoolSession oldSession) { + public TezSessionPoolSession create( + TezSessionPoolSession oldSession, String sessionId) { if (oldSession != null) { - return createAndInitSession( - oldSession.getQueueName(), oldSession.isDefault(), oldSession.getConf()); + return createAndInitSession(null, oldSession.getQueueName(), + oldSession.isDefault(), oldSession.getConf()); } // We never resize the pool, so assume this is initialization. // If that changes, we might have to make the factory interface more complicated. @@ -161,7 +161,7 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { } } HiveConf sessionConf = new HiveConf(initConf); - return createAndInitSession(defaultQueueList[localQueueIx], true, sessionConf); + return createAndInitSession(null, defaultQueueList[localQueueIx], true, sessionConf); } }); } @@ -210,8 +210,9 @@ public void initTriggers(final HiveConf conf) { // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration private TezSessionPoolSession createAndInitSession( - String queue, boolean isDefault, HiveConf conf) { - TezSessionPoolSession sessionState = createSession(TezSessionState.makeSessionId(), conf); + String sessionId, String queue, boolean isDefault, HiveConf conf) { + TezSessionPoolSession sessionState = createSession( + sessionId != null ? sessionId : TezSessionState.makeSessionId(), conf); // TODO When will the queue ever be null. // Pass queue and default in as constructor parameters, and make them final. if (queue != null) { @@ -300,7 +301,7 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti */ private TezSessionState getNewSessionState(HiveConf conf, String queueName, boolean doOpen) throws Exception { - TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false, conf); + TezSessionPoolSession retTezSessionState = createAndInitSession(null, queueName, false, conf); if (queueName != null) { conf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index d3748edb86..0dc3892d99 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -124,7 +124,35 @@ protected void openInternal(String[] additionalFiles, super.openInternal(additionalFiles, isAsync, console, resources); parent.registerOpenSession(this); if (expirationTracker != null) { - expirationTracker.addToExpirationQueue(this); + boolean isNotExpired = expirationTracker.addToExpirationQueue(this, 0L); + assert isNotExpired; + } + } + + @Override + public boolean reconnect(String applicationId, long amAgeMs) throws IOException, + LoginException, URISyntaxException, TezException { + if (expirationTracker != null && !expirationTracker.isOldAmUsable(amAgeMs)) { + closeExpiredOnReconnect(applicationId); + return false; + } + if (!super.reconnect(applicationId, amAgeMs)) { + return false; + } + parent.registerOpenSession(this); + if (expirationTracker != null && !expirationTracker.addToExpirationQueue(this, amAgeMs)) { + closeExpiredOnReconnect(applicationId); + return false; + } + return true; + } + + private void closeExpiredOnReconnect(String applicationId) { + LOG.warn("Not using an old AM due to expiration timeout: " + applicationId); + try { + this.close(false); + } catch (Exception e) { + LOG.info("Failed to close the old AM", e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 08e65a4a6d..f75d750306 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -236,6 +236,134 @@ public void open() throws IOException, LoginException, URISyntaxException, TezEx open(noFiles); } + public boolean reconnect(String applicationId, long amAgeMs) + throws IOException, LoginException, URISyntaxException, TezException { + this.queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); + this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + this.user = Utils.getUGI().getShortUserName(); + + // TODO: we currently have to create the client as if it was new, to have the correct + // local object state. So, go thru most of the same things that open does. + + Path scratchDir = reconnectTezDir(sessionId, null); + if (scratchDir == null) { + LOG.warn("Cannot find the original session scratchDir, will create a new one"); + scratchDir = createTezDir(sessionId, null); + } + tezScratchDir = scratchDir; + scratchDir = reconnectTezDir(sessionId, "resources"); + if (scratchDir == null) { + LOG.warn("Cannot find the original resources scratchDir, will create a new one"); + scratchDir = createTezDir(sessionId, "resources"); + } + this.resources = new HiveResources(scratchDir); + LOG.info("Created new resources: " + resources); + + // We won't actually localize anything here; the files should already be there. + // The NotFromConf files are unknown - if any are needed later and were localized by + // the previous HS2, this one will try to localize them again and find they're already there. + ensureLocalResources(conf, null); + + boolean llapMode = isLlapMode(); + + // exec and LLAP jars are definitely already localized... we just need to init the fields. + // TODO: perhaps we could improve this method to share some fields when multiple + // sessions are being reconnected at the same time. + final Map commonLocalResources = makeCombinedJarMap(llapMode); + + // generate basic tez config + final TezConfiguration tezConfig = createTezConfiguration(); + final Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + final ServicePluginsDescriptor spd = createPluginDescriptor(llapMode, tezConfig); + setupSessionAcls(tezConfig, conf); + + // Do not initialize prewarm here. Either it's already prewarmed or we don't care (for now). + + TezClient session = createTezClientObject( + tezConfig, commonLocalResources, llapCredentials, spd); + + // Note: if this ever moved on a threadpool, see isOnThread stuff + // in open() w.r.t. how to propagate errors correctly. + // Can this ever return a different client? Not as of this writing. Hmm. + session = session.getClient(applicationId); + this.session = session; + return true; + } + + private boolean isLlapMode() { + return "llap".equalsIgnoreCase(HiveConf.getVar( + conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + } + + private TezClient createTezClientObject(TezConfiguration tezConfig, + Map commonLocalResources, + Credentials llapCredentials, ServicePluginsDescriptor spd) { + return TezClient.newBuilder("HIVE-" + sessionId, tezConfig).setIsSession(true) + .setLocalResources(commonLocalResources).setCredentials(llapCredentials) + .setServicePluginDescriptor(spd).build(); + } + + private ServicePluginsDescriptor createPluginDescriptor( + final boolean llapMode, final TezConfiguration tezConfig) + throws IOException { + ServicePluginsDescriptor servicePluginsDescriptor; + if (llapMode) { + // TODO Change this to not serialize the entire Configuration - minor. + UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); + // we need plugins to handle llap and uber mode + servicePluginsDescriptor = ServicePluginsDescriptor.create(true, + new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create( + LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) }, + new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create( + LLAP_SERVICE, LLAP_LAUNCHER) }, + new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create( + LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) }); + } else { + servicePluginsDescriptor = ServicePluginsDescriptor.create(true); + } + return servicePluginsDescriptor; + } + + private Credentials createLlapCredentials( + boolean llapMode, TezConfiguration tezConfig) throws IOException { + Credentials llapCredentials = null; + if (llapMode && UserGroupInformation.isSecurityEnabled()) { + llapCredentials = new Credentials(); + llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); + } + return llapCredentials; + } + + private TezConfiguration createTezConfiguration() { + final TezConfiguration tezConfig = new TezConfiguration(true); + tezConfig.addResource(conf); + + setupTezParamsBasedOnMR(tezConfig); + tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); + conf.stripHiddenConfigurations(tezConfig); + return tezConfig; + } + + private Map makeCombinedJarMap(final boolean llapMode) + throws IOException, LoginException, URISyntaxException { + appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf)); + + final Map commonLocalResources = new HashMap(); + commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr); + for (LocalResource lr : this.resources.localizedResources) { + commonLocalResources.put(DagUtils.getBaseName(lr), lr); + } + + if (llapMode) { + // localize llap client jars + addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); + addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); + addJarLRByClass(RegistryOperations.class, commonLocalResources); + } + return commonLocalResources; + } + /** * Creates a tez session. A session is tied to either a cli/hs2 session. You can * submit multiple DAGs against a session (as long as they are executed serially). @@ -268,8 +396,7 @@ protected void openInternal(String[] additionalFilesNotFromConf, this.queueName = confQueueName; this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); - final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar( - conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + final boolean llapMode = isLlapMode(); // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ? UserGroupInformation ugi = Utils.getUGI(); @@ -288,61 +415,19 @@ protected void openInternal(String[] additionalFilesNotFromConf, LOG.info("Created new resources: " + resources); } - // unless already installed on all the cluster nodes, we'll have to - // localize hive-exec.jar as well. - appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf)); - - // configuration for the application master - final Map commonLocalResources = new HashMap(); - commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr); - for (LocalResource lr : this.resources.localizedResources) { - commonLocalResources.put(DagUtils.getBaseName(lr), lr); - } - - if (llapMode) { - // localize llap client jars - addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources); - addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources); - addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources); - addJarLRByClass(RegistryOperations.class, commonLocalResources); - } + final Map commonLocalResources = makeCombinedJarMap(llapMode); // Create environment for AM. + // TODO: this is unused since AMConfiguration was replaced with TezConfiguration. Map amEnv = new HashMap(); MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv); - // and finally we're ready to create and start the session - // generate basic tez config - final TezConfiguration tezConfig = new TezConfiguration(true); - tezConfig.addResource(conf); - - setupTezParamsBasedOnMR(tezConfig); - - // set up the staging directory to use - tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString()); - conf.stripHiddenConfigurations(tezConfig); + final TezConfiguration tezConfig = createTezConfiguration(); ServicePluginsDescriptor servicePluginsDescriptor; - Credentials llapCredentials = null; - if (llapMode) { - if (UserGroupInformation.isSecurityEnabled()) { - llapCredentials = new Credentials(); - llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig)); - } - // TODO Change this to not serialize the entire Configuration - minor. - UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig); - // we need plugins to handle llap and uber mode - servicePluginsDescriptor = ServicePluginsDescriptor.create(true, - new TaskSchedulerDescriptor[] { TaskSchedulerDescriptor.create( - LLAP_SERVICE, LLAP_SCHEDULER).setUserPayload(servicePluginPayload) }, - new ContainerLauncherDescriptor[] { ContainerLauncherDescriptor.create( - LLAP_SERVICE, LLAP_LAUNCHER) }, - new TaskCommunicatorDescriptor[] { TaskCommunicatorDescriptor.create( - LLAP_SERVICE, LLAP_TASK_COMMUNICATOR).setUserPayload(servicePluginPayload) }); - } else { - servicePluginsDescriptor = ServicePluginsDescriptor.create(true); - } + Credentials llapCredentials = createLlapCredentials(llapMode, tezConfig); + servicePluginsDescriptor = createPluginDescriptor(llapMode, tezConfig); // container prewarming. tell the am how many containers we need if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) { @@ -355,10 +440,8 @@ protected void openInternal(String[] additionalFilesNotFromConf, setupSessionAcls(tezConfig, conf); - final TezClient session = TezClient.newBuilder("HIVE-" + sessionId, tezConfig) - .setIsSession(true).setLocalResources(commonLocalResources) - .setCredentials(llapCredentials).setServicePluginDescriptor(servicePluginsDescriptor) - .build(); + final TezClient session = createTezClientObject( + tezConfig, commonLocalResources, llapCredentials, servicePluginsDescriptor); LOG.info("Opening new Tez Session (id: " + sessionId + ", scratch dir: " + tezScratchDir + ")"); @@ -559,7 +642,7 @@ private void setupTezParamsBasedOnMR(TezConfiguration conf) { } else if (!preferTez) { conf.set(dep.getValue(), mrValue, "TRANSLATED_TO_TEZ_AND_MR_OVERRIDE"); } - LOG.info("Config: mr(unset):" + dep.getKey() + ", mr initial value=" + LOG.debug("Config: mr(unset):" + dep.getKey() + ", mr initial value=" + mrValue + ", tez(original):" + dep.getValue() + "=" + tezValue + ", tez(final):" + dep.getValue() + "=" + conf.get(dep.getValue())); @@ -761,23 +844,42 @@ public LocalResource getAppJarLr() { * be used with Tez. Assumes scratchDir exists. */ private Path createTezDir(String sessionId, String suffix) throws IOException { - // tez needs its own scratch dir (per session) - // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. - SessionState sessionState = SessionState.get(); - String hdfsScratchDir = sessionState == null ? HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR) : sessionState - .getHdfsScratchDirURIString(); - Path tezDir = new Path(hdfsScratchDir, TEZ_DIR); - tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); + Path tezDir = getScratchDirPath(sessionId, suffix, false); FileSystem fs = tezDir.getFileSystem(conf); - FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); + FsPermission fsPermission = new FsPermission( + HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION)); fs.mkdirs(tezDir, fsPermission); // Make sure the path is normalized (we expect validation to pass since we just created it). tezDir = DagUtils.validateTargetDir(tezDir, conf).getPath(); - + // Directory removal will be handled by cleanup at the SessionState level. return tezDir; } + private Path reconnectTezDir(String sessionId, String suffix) throws IOException { + // Make sure the path is normalized (we expect validation to pass since we just created it). + Path dir = getScratchDirPath(sessionId, suffix, false); + FileStatus dirFs = DagUtils.validateTargetDir(dir, conf); + if (dirFs != null) { + return dirFs.getPath(); + } + // Perhaps the original session open used a different config. + dir = getScratchDirPath(sessionId, suffix, true); + if (dir == null) return null; + dirFs = DagUtils.validateTargetDir(dir, conf); + return (dirFs != null) ? dirFs.getPath() : null; + } + + private Path getScratchDirPath(String sessionId, String suffix, boolean isAlternative) { + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. + SessionState sessionState = SessionState.get(); + if (sessionState == null && isAlternative) return null; // No alternative. + String hdfsScratchDir = (isAlternative || sessionState == null) + ? HiveConf.getVar(conf, ConfVars.SCRATCHDIR) : sessionState.getHdfsScratchDirURIString(); + Path tezDir = new Path(hdfsScratchDir, TEZ_DIR); + return new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix))); + } + /** * Returns a local resource representing a jar. * This resource will be used to execute the plan on the cluster. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 7137a17aae..60895e0266 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -110,6 +110,7 @@ private final SessionExpirationTracker expirationTracker; private final RestrictedConfigChecker restrictedConfig; private final QueryAllocationManager allocationManager; + private final boolean recoverAms; private final String yarnQueue; private final int amRegistryTimeoutMs; private final boolean allowAnyPool; @@ -183,7 +184,8 @@ public static WorkloadManager getInstance() { } /** Called once, when HS2 initializes. */ - public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullResourcePlan plan) + public static WorkloadManager create(String yarnQueue, HiveConf conf, + WMFullResourcePlan plan, boolean recoverAms) throws ExecutionException, InterruptedException { assert INSTANCE == null; // We could derive the expected number of AMs to pass in. @@ -211,8 +213,9 @@ public static WorkloadManager create(String yarnQueue, HiveConf conf, WMFullReso this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); - tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true, - oldSession -> createSession(oldSession == null ? null : oldSession.getConf())); + tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, + HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME), + (oldSession, id) -> createSession(oldSession == null ? null : oldSession.getConf(), id)); restrictedConfig = new RestrictedConfigChecker(conf); // Only creates the expiration tracker if expiration is configured. expirationTracker = SessionExpirationTracker.create(conf, this); @@ -256,7 +259,7 @@ private static int determineQueryParallelism(WMFullResourcePlan plan) { public void start() throws Exception { initTriggers(); - tezAmPool.start(); + tezAmPool.start(recoverAms); if (expirationTracker != null) { expirationTracker.start(); } @@ -277,13 +280,24 @@ private void initTriggers() { } } - public void stop() throws Exception { - List sessionsToClose = null; - synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); - } - for (TezSessionState sessionState : sessionsToClose) { - sessionState.close(false); + public void stop(boolean isStop) throws Exception { + if (!recoverAms || isStop) { + List sessionsToClose = null; + synchronized (openSessions) { + sessionsToClose = new ArrayList(openSessions.keySet()); + } + for (TezSessionState sessionState : sessionsToClose) { + sessionState.close(false); + } + } else { + int count = 0; + synchronized (openSessions) { + count = openSessions.size(); + openSessions.clear(); + } + if (count > 0) { + LOG.info("AM recovery enabled; leaving " + count + " Tez sessions running"); + } } if (expirationTracker != null) { expirationTracker.stop(); @@ -1652,8 +1666,9 @@ private void validateConfig(HiveConf conf) throws HiveException { } } - private WmTezSession createSession(HiveConf conf) { - WmTezSession session = createSessionObject(TezSessionState.makeSessionId(), conf); + private WmTezSession createSession(HiveConf conf, String sessionId) { + WmTezSession session = createSessionObject( + sessionId != null ? sessionId : TezSessionState.makeSessionId(), conf); session.setQueueName(yarnQueue); session.setDefault(); LOG.info("Created new interactive session object " + session.getSessionId()); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java index 4659ecb97b..2efb0dd34e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java @@ -189,12 +189,12 @@ public static MappingInput mappingInput(String userName, List groups, St public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, QueryAllocationManager qam) throws ExecutionException, InterruptedException { - super(null, yarnQueue, conf, qam, createDummyPlan(numSessions)); + super(null, yarnQueue, conf, qam, createDummyPlan(numSessions), false); } public WorkloadManagerForTest(String yarnQueue, HiveConf conf, QueryAllocationManager qam, WMFullResourcePlan plan) throws ExecutionException, InterruptedException { - super(null, yarnQueue, conf, qam, plan); + super(null, yarnQueue, conf, qam, plan, false); } @Override diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 6184fdcc91..b7ebd44f89 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -156,6 +156,7 @@ private String serviceUri; private boolean serviceDiscovery; private boolean activePassiveHA; + private boolean recoverAms; private LeaderLatchListener leaderLatchListener; private ExecutorService leaderActionsExecutorService; private HS2ActivePassiveHARegistry hs2HARegistry; @@ -279,6 +280,8 @@ public void run() { this.serviceDiscovery = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY); this.activePassiveHA = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ACTIVE_PASSIVE_HA_ENABLE); + this.recoverAms = activePassiveHA + && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_AP_HA_RECOVER_SESSIONS); try { if (serviceDiscovery) { @@ -415,7 +418,7 @@ private WMFullResourcePlan createTestResourcePlan() { pool.setQueryParallelism(1); resourcePlan = new WMFullResourcePlan( new WMResourcePlan("testDefault"), Lists.newArrayList(pool)); - resourcePlan.getPlan().setDefaultPoolPath("testDefault"); + resourcePlan.getPlan().setDefaultPoolPath("llap"); return resourcePlan; } @@ -787,7 +790,7 @@ public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); hiveServer2.closeHiveSessions(); - hiveServer2.stopOrDisconnectTezSessions(); + hiveServer2.stopOrDisconnectTezSessions(false); LOG.info("Stopped/Disconnected tez sessions."); // resolve futures used for testing @@ -832,6 +835,11 @@ private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resource tezSessionPoolManager = TezSessionPoolManager.getInstance(); HiveConf hiveConf = getHiveConf(); if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + if (recoverAms) { + // Note: the main problem being the setups with multiple default queues. + // the session factory will also need to be adjusted by recovery. + LOG.warn("AM recovery for non-WM pool is not yet supported."); + } tezSessionPoolManager.setupPool(hiveConf); } else { tezSessionPoolManager.setupNonPool(hiveConf); @@ -844,18 +852,17 @@ private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resource } private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) { - if (!StringUtils.isEmpty(wmQueue)) { - // Initialize workload management. - LOG.info("Initializing workload management"); - try { - wm = WorkloadManager.create(wmQueue, getHiveConf(), resourcePlan); - wm.start(); - LOG.info("Workload manager initialized."); - } catch (Exception e) { - throw new ServiceException("Unable to instantiate and start Workload Manager", e); - } - } else { + if (StringUtils.isEmpty(wmQueue)) { LOG.info("Workload management is not enabled."); + return; + } + LOG.info("Initializing workload management"); + try { + wm = WorkloadManager.create(wmQueue, getHiveConf(), resourcePlan, recoverAms); + wm.start(); + LOG.info("Workload manager initialized."); + } catch (Exception e) { + throw new ServiceException("Unable to instantiate and start Workload Manager", e); } } @@ -873,7 +880,7 @@ private void closeHiveSessions() { } } - private void stopOrDisconnectTezSessions() { + private void stopOrDisconnectTezSessions(boolean isStop) { LOG.info("Stopping/Disconnecting tez sessions."); // There should already be an instance of the session pool manager. // If not, ignoring is fine while stopping HiveServer2. @@ -887,7 +894,7 @@ private void stopOrDisconnectTezSessions() { } if (wm != null) { try { - wm.stop(); + wm.stop(isStop); LOG.info("Stopped workload manager."); } catch (Exception e) { LOG.error("Error while stopping workload manager.", e); @@ -932,7 +939,7 @@ public synchronized void stop() { } } - stopOrDisconnectTezSessions(); + stopOrDisconnectTezSessions(true); if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { try {