diff --git a/common/src/java/org/apache/hive/wm/ISessionPoolManager.java b/common/src/java/org/apache/hive/wm/ISessionPoolManager.java new file mode 100644 index 0000000..c6d48f4 --- /dev/null +++ b/common/src/java/org/apache/hive/wm/ISessionPoolManager.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.wm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Session pool interface that is also restartable. + */ +public interface ISessionPoolManager extends ISessionRestart { + /** + * Setup session pool + * + * @param hiveConf - hive conf + * @throws Exception + */ + void setupPool(final HiveConf hiveConf) throws Exception; + + /** + * Start session pool + * + * @throws Exception + */ + void startPool() throws Exception; + + /** + * Register an open session with session pool manager + * + * @param session - open session + */ + void registerOpenSession(T session); + + /** + * Register an open session from session pool manager + * + * @param session - open session + */ + void unregisterOpenSession(T session); + + /** + * Return a session after use + * + * @param session - session + * @throws Exception + */ + void returnAfterUse(T session) throws Exception; + + /** + * Reopen a specified session + * + * @param session - session + * @param conf - hive conf + * @param inputOutputJars - jars location + * @return + * @throws Exception + */ + T reopen(T session, Configuration conf, String[] inputOutputJars) throws Exception; + + /** + * Destroy a session + * + * @param session - session + * @throws Exception + */ + void destroy(T session) throws Exception; +} diff --git a/common/src/java/org/apache/hive/wm/ISessionRestart.java b/common/src/java/org/apache/hive/wm/ISessionRestart.java new file mode 100644 index 0000000..ef4a8d0 --- /dev/null +++ b/common/src/java/org/apache/hive/wm/ISessionRestart.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.wm; + +/** + * Interface for closing and reopen a session. + */ +public interface ISessionRestart { + /** + * Close and reopen a session + * + * @param session - session + * @throws Exception + */ + void closeAndReopenSession(T session) throws Exception; +} diff --git a/common/src/java/org/apache/hive/wm/IWorkloadManager.java b/common/src/java/org/apache/hive/wm/IWorkloadManager.java new file mode 100644 index 0000000..0cae611 --- /dev/null +++ b/common/src/java/org/apache/hive/wm/IWorkloadManager.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.wm; + +/** + * Interface for workload management. + */ +public interface IWorkloadManager extends ISessionPoolManager { + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java index da93a3a..438c17e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.wm.ISessionRestart; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +45,10 @@ private final Thread restartThread; private final long sessionLifetimeMs; private final long sessionLifetimeJitterMs; - private final RestartImpl sessionRestartImpl; + private final ISessionRestart sessionRestartImpl; private volatile SessionState initSessionState; - interface RestartImpl { - void closeAndReopenPoolSession(TezSessionPoolSession session) throws Exception; - } - - public static SessionExpirationTracker create(HiveConf conf, RestartImpl restartImpl) { + public static SessionExpirationTracker create(HiveConf conf, ISessionRestart restartImpl) { long sessionLifetimeMs = conf.getTimeVar( ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS); if (sessionLifetimeMs == 0) return null; @@ -60,7 +57,7 @@ public static SessionExpirationTracker create(HiveConf conf, RestartImpl restart } private SessionExpirationTracker( - long sessionLifetimeMs, long sessionLifetimeJitterMs, RestartImpl restartImpl) { + long sessionLifetimeMs, long sessionLifetimeJitterMs, ISessionRestart restartImpl) { this.sessionRestartImpl = restartImpl; this.sessionLifetimeMs = sessionLifetimeMs; this.sessionLifetimeJitterMs = sessionLifetimeJitterMs; @@ -114,7 +111,7 @@ private void runRestartThread() { TezSessionPoolSession next = restartQueue.take(); LOG.info("Restarting the expired session [" + next + "]"); try { - sessionRestartImpl.closeAndReopenPoolSession(next); + sessionRestartImpl.closeAndReopenSession(next); } catch (InterruptedException ie) { throw ie; } catch (Exception e) { @@ -230,7 +227,7 @@ public void closeAndRestartExpiredSession( if (isAsync) { restartQueue.add(session); } else { - sessionRestartImpl.closeAndReopenPoolSession(session); + sessionRestartImpl.closeAndReopenSession(session); } } } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 538d745..4f5e930 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -18,24 +18,21 @@ package org.apache.hadoop.hive.ql.exec.tez; -import org.apache.hadoop.hive.registry.impl.TezAmInstance; - import java.util.HashSet; import java.util.concurrent.Semaphore; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.Set; +import org.apache.hive.wm.ISessionPoolManager; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; @@ -49,8 +46,7 @@ * In case the user specifies a queue explicitly, a new session is created * on that queue and assigned to the session state. */ -public class TezSessionPoolManager - implements SessionExpirationTracker.RestartImpl, Manager { +public class TezSessionPoolManager implements ISessionPoolManager { private enum CustomQueueAllowed { TRUE, @@ -59,7 +55,6 @@ } private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class); - static final Random rdm = new Random(); private Semaphore llapQueue; private HiveConf initConf = null; @@ -83,7 +78,7 @@ private final List openSessions = new LinkedList<>(); /** Note: this is not thread-safe. */ - public static TezSessionPoolManager getInstance() throws Exception { + public static TezSessionPoolManager getInstance() { TezSessionPoolManager local = instance; if (local == null) { instance = local = new TezSessionPoolManager(); @@ -95,6 +90,7 @@ public static TezSessionPoolManager getInstance() throws Exception { protected TezSessionPoolManager() { } + @Override public void startPool() throws Exception { if (defaultSessionPool != null) { defaultSessionPool.startInitialSessions(); @@ -104,6 +100,7 @@ public void startPool() throws Exception { } } + @Override public void setupPool(HiveConf conf) throws InterruptedException { String[] defaultQueueList = HiveConf.getTrimmedStringsVar( conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); @@ -434,7 +431,7 @@ public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { /** Closes a running (expired) pool session and reopens it. */ @Override - public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception { + public void closeAndReopenSession(TezSessionPoolSession oldSession) throws Exception { String queueName = oldSession.getQueueName(); if (queueName == null) { LOG.warn("Pool session has a null queue: " + oldSession); @@ -470,7 +467,7 @@ public SessionExpirationTracker getExpirationTracker() { @Override public TezSessionPoolSession reopen( - TezSessionPoolSession session, Configuration conf, String[] inputOutputJars) { + TezSessionPoolSession session, Configuration conf, String[] inputOutputJars) { return reopen(session, conf, inputOutputJars); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 4488c12..5127256 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -20,8 +20,7 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance; -import org.apache.hadoop.security.token.Token; -import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.hive.wm.ISessionPoolManager; import org.apache.hadoop.conf.Configuration; @@ -52,22 +51,13 @@ class TezSessionPoolSession extends TezSessionState { private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2; - interface Manager { - void registerOpenSession(TezSessionPoolSession session); - void unregisterOpenSession(TezSessionPoolSession session); - void returnAfterUse(TezSessionPoolSession session) throws Exception; - TezSessionState reopen(TezSessionPoolSession session, Configuration conf, - String[] inputOutputJars) throws Exception; - void destroy(TezSessionPoolSession session) throws Exception; - } - private final AtomicInteger sessionState = new AtomicInteger(STATE_NONE); private Long expirationNs; - private final Manager parent; + private final ISessionPoolManager parent; private final SessionExpirationTracker expirationTracker; - public TezSessionPoolSession(String sessionId, Manager parent, + public TezSessionPoolSession(String sessionId, ISessionPoolManager parent, SessionExpirationTracker tracker, HiveConf conf) { super(sessionId, conf); this.parent = parent; @@ -179,7 +169,7 @@ public void destroy() throws Exception { parent.destroy(this); } - boolean isOwnedBy(Manager parent) { + boolean isOwnedBy(ISessionPoolManager parent) { return this.parent == parent; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 29d6fe6..dab8e25 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -149,10 +149,10 @@ public int execute(DriverContext driverContext) { if (session != null && !session.isOpen()) { LOG.warn("The session: " + session + " has not been opened"); } - if (WorkloadManager.isInUse(ss.getConf())) { + if (TezWorkloadManager.isInUse(ss.getConf())) { // TODO: in future, we may also pass getUserIpAddress. // Note: for now this will just block to wait for a session based on parallelism. - session = WorkloadManager.getInstance().getSession(session, ss.getUserName(), conf); + session = TezWorkloadManager.getInstance().getSession(session, ss.getUserName(), conf); } else { session = TezSessionPoolManager.getInstance().getSession( session, conf, false, getWork().getLlapMode()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezWorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezWorkloadManager.java new file mode 100644 index 0000000..a6cf86c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezWorkloadManager.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.concurrent.TimeoutException; + +import java.util.concurrent.TimeUnit; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hive.wm.IWorkloadManager; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + + +/** Workload management entry point for HS2. */ +public class TezWorkloadManager implements IWorkloadManager { + private static final Logger LOG = LoggerFactory.getLogger(TezWorkloadManager.class); + // TODO: this is a temporary setting that will go away, so it's not in HiveConf. + public static final String TEST_WM_CONFIG = "hive.test.workload.management"; + + private final HiveConf conf; + private final TezSessionPool sessions; + private final SessionExpirationTracker expirationTracker; + private final RestrictedConfigChecker restrictedConfig; + private final QueryAllocationManager allocationManager; + private final String yarnQueue; + // TODO: it's not clear that we need to track this - unlike PoolManager we don't have non-pool + // sessions, so the pool itself could internally track the sessions it gave out, since + // calling close on an unopened session is probably harmless. + private final IdentityHashMap openSessions = + new IdentityHashMap<>(); + /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ + private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); + private final HashMap pools = new HashMap<>(); + private final int amRegistryTimeoutMs; + + private static class PoolState { + // Add stuff here as WM is implemented. + private final Object lock = new Object(); + private final List sessions = new ArrayList<>(); + } + + // TODO: this is temporary before HiveServerEnvironment is merged. + private static volatile TezWorkloadManager INSTANCE; + public static TezWorkloadManager getInstance() { + TezWorkloadManager wm = INSTANCE; + assert wm != null; + return wm; + } + + public static boolean isInUse(Configuration conf) { + return INSTANCE != null && conf.getBoolean(TEST_WM_CONFIG, false); + } + + /** Called once, when HS2 initializes. */ + public static TezWorkloadManager create(String yarnQueue, HiveConf conf) { + assert INSTANCE == null; + Token amsToken = createAmsToken(); + // We could derive the expected number of AMs to pass in. + LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1); + QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm); + // TODO: Hardcode one session for now; initial policies should be passed in. + return (INSTANCE = new TezWorkloadManager(yarnQueue, conf, 1, qam, amsToken)); + } + + private static Token createAmsToken() { + if (!UserGroupInformation.isSecurityEnabled()) return null; + // This application ID is completely bogus. + ApplicationId id = ApplicationId.newInstance( + System.nanoTime(), (int)(System.nanoTime() % 100000)); + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString())); + JobTokenSecretManager jobTokenManager = new JobTokenSecretManager(); + Token sessionToken = new Token<>(identifier, jobTokenManager); + sessionToken.setService(identifier.getJobId()); + return sessionToken; + } + + @VisibleForTesting + TezWorkloadManager(String yarnQueue, HiveConf conf, int numSessions, + QueryAllocationManager qam, Token amsToken) { + this.yarnQueue = yarnQueue; + this.conf = conf; + initializeHivePools(); + + this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( + conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); + sessions = new TezSessionPool<>(conf, numSessions, true); + restrictedConfig = new RestrictedConfigChecker(conf); + allocationManager = qam; + // Only creates the expiration tracker if expiration is configured. + expirationTracker = SessionExpirationTracker.create(conf, this); + for (int i = 0; i < numSessions; i++) { + sessions.addInitialSession(createSession()); + } + } + + private void initializeHivePools() { + // TODO: real implementation + poolsLock.writeLock().lock(); + try { + pools.put("llap", new PoolState()); + } finally { + poolsLock.writeLock().unlock(); + } + } + + public TezSessionState getSession( + TezSessionState session, String userName, HiveConf conf) throws Exception { + validateConfig(conf); + String poolName = mapSessionToPoolName(userName); + // TODO: do query parallelism enforcement here based on the policies and pools. + while (true) { + WmTezSession result = checkSessionForReuse(session); + // TODO: when proper AM management is implemented, we should call tryGet... here, because the + // parallelism will be enforced here, and pool would always have a session for us. + result = (result == null ? sessions.getSession() : result); + result.setQueueName(yarnQueue); + result.setPoolName(poolName); + if (!ensureAmIsRegistered(result)) continue; // Try another. + redistributePoolAllocations(poolName, result, null); + return result; + } + } + + @VisibleForTesting + protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { + // Make sure AM is ready to use and registered with AM registry. + try { + session.waitForAmPluginInfo(amRegistryTimeoutMs); + } catch (TimeoutException ex) { + LOG.error("Timed out waiting for AM registry information for " + session.getSessionId()); + session.destroy(); + return false; + } + return true; + } + + private void redistributePoolAllocations( + String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove) { + List sessionsToUpdate = null; + double totalAlloc = 0; + assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); + assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); + poolsLock.readLock().lock(); + try { + PoolState pool = pools.get(poolName); + synchronized (pool.lock) { + // This should be a 2nd order fn but it's too much pain in Java for one LOC. + if (sessionToAdd != null) { + pool.sessions.add(sessionToAdd); + } + if (sessionToRemove != null) { + if (!pool.sessions.remove(sessionToRemove)) { + LOG.error("Session " + sessionToRemove + " could not be removed from the pool"); + } + sessionToRemove.setClusterFraction(0); + } + totalAlloc = updatePoolAllocations(pool.sessions); + sessionsToUpdate = new ArrayList<>(pool.sessions); + } + } finally { + poolsLock.readLock().unlock(); + } + allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); + } + + private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { + if (session == null) return null; + WmTezSession result = null; + if (session instanceof WmTezSession) { + result = (WmTezSession) session; + if (result.isOwnedBy(this)) { + return result; + } + // TODO: this should never happen, at least for now. Throw? + LOG.warn("Attempting to reuse a session not belonging to us: " + result); + result.returnToSessionManager(); + return null; + } + LOG.warn("Attempting to reuse a non-WM session for workload management:" + session); + if (session instanceof TezSessionPoolSession) { + session.returnToSessionManager(); + } else { + session.close(false); // This is a non-pool session, get rid of it. + } + return null; + } + + private double updatePoolAllocations(List sessions) { + // TODO: real implementation involving in-the-pool policy interface, etc. + double allocation = 1.0 / sessions.size(); + for (WmTezSession session : sessions) { + session.setClusterFraction(allocation); + } + return 1.0; + } + + private String mapSessionToPoolName(String userName) { + // TODO: real implementation, probably calling into another class initialized with policies. + return "llap"; + } + + private void validateConfig(HiveConf conf) throws HiveException { + String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); + if ((queueName != null) && !queueName.isEmpty()) { + LOG.warn("Ignoring " + TezConfiguration.TEZ_QUEUE_NAME + "=" + queueName); + conf.set(TezConfiguration.TEZ_QUEUE_NAME, yarnQueue); + } + if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { + // Should this also just be ignored? Throw for now, doAs unlike queue is often set by admin. + throw new HiveException(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is not supported"); + } + if (restrictedConfig != null) { + restrictedConfig.validate(conf); + } + } + + public void start() throws Exception { + sessions.startInitialSessions(); + if (expirationTracker != null) { + expirationTracker.start(); + } + allocationManager.start(); + } + + public void stop() throws Exception { + List sessionsToClose = null; + synchronized (openSessions) { + sessionsToClose = new ArrayList(openSessions.keySet()); + } + for (TezSessionState sessionState : sessionsToClose) { + sessionState.close(false); + } + if (expirationTracker != null) { + expirationTracker.stop(); + } + allocationManager.stop(); + } + + private WmTezSession createSession() { + WmTezSession session = createSessionObject(TezSessionState.makeSessionId()); + session.setQueueName(yarnQueue); + session.setDefault(); + LOG.info("Created new interactive session " + session.getSessionId()); + return session; + } + + @VisibleForTesting + protected WmTezSession createSessionObject(String sessionId) { + return new WmTezSession(sessionId, this, expirationTracker, new HiveConf(conf)); + } + + /** Closes a running (expired) pool session and reopens it. */ + @Override + public void closeAndReopenSession(final TezSessionState oldSession) throws Exception { + sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null, null); + } + + private WmTezSession ensureOwnedSession(TezSessionState oldSession) { + if (!(oldSession instanceof WmTezSession) || !((WmTezSession) oldSession).isOwnedBy(this)) { + throw new AssertionError("Not a WM session " + oldSession); + } + WmTezSession session = (WmTezSession) oldSession; + return session; + } + + @Override + public void setupPool(final HiveConf hiveConf) throws InterruptedException { + TezSessionPoolManager.getInstance().setupPool(hiveConf); + } + + @Override + public void startPool() throws Exception { + TezSessionPoolManager.getInstance().startPool(); + } + + /** Called by TezSessionPoolSession when opened. */ + @Override + public void registerOpenSession(final TezSessionState session) { + synchronized (openSessions) { + openSessions.put(session, true); + } + } + + /** Called by TezSessionPoolSession when closed. */ + @Override + public void unregisterOpenSession(final TezSessionState session) { + synchronized (openSessions) { + openSessions.remove(session); + } + } + + @VisibleForTesting + public SessionExpirationTracker getExpirationTracker() { + return expirationTracker; + } + + @Override + public void returnAfterUse(final TezSessionState session) throws Exception { + boolean isInterrupted = Thread.interrupted(); + try { + WmTezSession wmSession = ensureOwnedSession(session); + redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); + sessions.returnSession((WmTezSession) session); + } finally { + // Reset the interrupt status. + if (isInterrupted) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public TezSessionState reopen(final TezSessionState session, final Configuration conf, final String[] additionalFiles) + throws Exception { + WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession(); + newSession.setPoolName(oldSession.getPoolName()); + HiveConf sessionConf = session.getConf(); + if (sessionConf == null) { + LOG.warn("Session configuration is null for " + session); + // default queue name when the initial session was created + sessionConf = new HiveConf(conf, TezWorkloadManager.class); + } + sessions.replaceSession(oldSession, newSession, true, additionalFiles, sessionConf); + // We are going to immediately give this session out, so ensure AM registry. + if (!ensureAmIsRegistered(newSession)) { + throw new Exception("Session is not usable after reopen"); + } + redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession); + return newSession; + } + + @Override + public void destroy(final TezSessionState session) throws Exception { + LOG.warn("Closing a pool session because of retry failure."); + // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution. + WmTezSession wmSession = ensureOwnedSession(session); + closeAndReopenSession(wmSession); + redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); + } + + protected final HiveConf getConf() { + return conf; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 00501ee..ee7609e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.registry.impl.TezAmInstance; +import org.apache.hive.wm.ISessionPoolManager; public class WmTezSession extends TezSessionPoolSession implements AmPluginNode { private String poolName; @@ -41,7 +42,7 @@ } private final ActualWmState actualState = new ActualWmState(); - public WmTezSession(String sessionId, Manager parent, + public WmTezSession(String sessionId, ISessionPoolManager parent, SessionExpirationTracker expiration, HiveConf conf) { super(sessionId, parent, expiration, conf); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java deleted file mode 100644 index 288d705..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ /dev/null @@ -1,372 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.tez; - -import java.util.concurrent.TimeoutException; - -import java.util.concurrent.TimeUnit; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.common.security.JobTokenIdentifier; -import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.TezConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - - -/** Workload management entry point for HS2. */ -public class WorkloadManager - implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { - private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); - // TODO: this is a temporary setting that will go away, so it's not in HiveConf. - public static final String TEST_WM_CONFIG = "hive.test.workload.management"; - - private final HiveConf conf; - private final TezSessionPool sessions; - private final SessionExpirationTracker expirationTracker; - private final RestrictedConfigChecker restrictedConfig; - private final QueryAllocationManager allocationManager; - private final String yarnQueue; - // TODO: it's not clear that we need to track this - unlike PoolManager we don't have non-pool - // sessions, so the pool itself could internally track the sessions it gave out, since - // calling close on an unopened session is probably harmless. - private final IdentityHashMap openSessions = - new IdentityHashMap<>(); - /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */ - private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock(); - private final HashMap pools = new HashMap<>(); - private final int amRegistryTimeoutMs; - - private static class PoolState { - // Add stuff here as WM is implemented. - private final Object lock = new Object(); - private final List sessions = new ArrayList<>(); - } - - // TODO: this is temporary before HiveServerEnvironment is merged. - private static volatile WorkloadManager INSTANCE; - public static WorkloadManager getInstance() { - WorkloadManager wm = INSTANCE; - assert wm != null; - return wm; - } - - public static boolean isInUse(Configuration conf) { - return INSTANCE != null && conf.getBoolean(TEST_WM_CONFIG, false); - } - - /** Called once, when HS2 initializes. */ - public static WorkloadManager create(String yarnQueue, HiveConf conf) { - assert INSTANCE == null; - Token amsToken = createAmsToken(); - // We could derive the expected number of AMs to pass in. - LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1); - QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm); - // TODO: Hardcode one session for now; initial policies should be passed in. - return (INSTANCE = new WorkloadManager(yarnQueue, conf, 1, qam, amsToken)); - } - - private static Token createAmsToken() { - if (!UserGroupInformation.isSecurityEnabled()) return null; - // This application ID is completely bogus. - ApplicationId id = ApplicationId.newInstance( - System.nanoTime(), (int)(System.nanoTime() % 100000)); - JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString())); - JobTokenSecretManager jobTokenManager = new JobTokenSecretManager(); - Token sessionToken = new Token<>(identifier, jobTokenManager); - sessionToken.setService(identifier.getJobId()); - return sessionToken; - } - - @VisibleForTesting - WorkloadManager(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam, Token amsToken) { - this.yarnQueue = yarnQueue; - this.conf = conf; - initializeHivePools(); - - this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar( - conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS); - sessions = new TezSessionPool<>(conf, numSessions, true); - restrictedConfig = new RestrictedConfigChecker(conf); - allocationManager = qam; - // Only creates the expiration tracker if expiration is configured. - expirationTracker = SessionExpirationTracker.create(conf, this); - for (int i = 0; i < numSessions; i++) { - sessions.addInitialSession(createSession()); - } - } - - private void initializeHivePools() { - // TODO: real implementation - poolsLock.writeLock().lock(); - try { - pools.put("llap", new PoolState()); - } finally { - poolsLock.writeLock().unlock(); - } - } - - public TezSessionState getSession( - TezSessionState session, String userName, HiveConf conf) throws Exception { - validateConfig(conf); - String poolName = mapSessionToPoolName(userName); - // TODO: do query parallelism enforcement here based on the policies and pools. - while (true) { - WmTezSession result = checkSessionForReuse(session); - // TODO: when proper AM management is implemented, we should call tryGet... here, because the - // parallelism will be enforced here, and pool would always have a session for us. - result = (result == null ? sessions.getSession() : result); - result.setQueueName(yarnQueue); - result.setPoolName(poolName); - if (!ensureAmIsRegistered(result)) continue; // Try another. - redistributePoolAllocations(poolName, result, null); - return result; - } - } - - @VisibleForTesting - protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { - // Make sure AM is ready to use and registered with AM registry. - try { - session.waitForAmPluginInfo(amRegistryTimeoutMs); - } catch (TimeoutException ex) { - LOG.error("Timed out waiting for AM registry information for " + session.getSessionId()); - session.destroy(); - return false; - } - return true; - } - - private void redistributePoolAllocations( - String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove) { - List sessionsToUpdate = null; - double totalAlloc = 0; - assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName()); - assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName()); - poolsLock.readLock().lock(); - try { - PoolState pool = pools.get(poolName); - synchronized (pool.lock) { - // This should be a 2nd order fn but it's too much pain in Java for one LOC. - if (sessionToAdd != null) { - pool.sessions.add(sessionToAdd); - } - if (sessionToRemove != null) { - if (!pool.sessions.remove(sessionToRemove)) { - LOG.error("Session " + sessionToRemove + " could not be removed from the pool"); - } - sessionToRemove.setClusterFraction(0); - } - totalAlloc = updatePoolAllocations(pool.sessions); - sessionsToUpdate = new ArrayList<>(pool.sessions); - } - } finally { - poolsLock.readLock().unlock(); - } - allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate); - } - - private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception { - if (session == null) return null; - WmTezSession result = null; - if (session instanceof WmTezSession) { - result = (WmTezSession) session; - if (result.isOwnedBy(this)) { - return result; - } - // TODO: this should never happen, at least for now. Throw? - LOG.warn("Attempting to reuse a session not belonging to us: " + result); - result.returnToSessionManager(); - return null; - } - LOG.warn("Attempting to reuse a non-WM session for workload management:" + session); - if (session instanceof TezSessionPoolSession) { - session.returnToSessionManager(); - } else { - session.close(false); // This is a non-pool session, get rid of it. - } - return null; - } - - private double updatePoolAllocations(List sessions) { - // TODO: real implementation involving in-the-pool policy interface, etc. - double allocation = 1.0 / sessions.size(); - for (WmTezSession session : sessions) { - session.setClusterFraction(allocation); - } - return 1.0; - } - - private String mapSessionToPoolName(String userName) { - // TODO: real implementation, probably calling into another class initialized with policies. - return "llap"; - } - - private void validateConfig(HiveConf conf) throws HiveException { - String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); - if ((queueName != null) && !queueName.isEmpty()) { - LOG.warn("Ignoring " + TezConfiguration.TEZ_QUEUE_NAME + "=" + queueName); - conf.set(TezConfiguration.TEZ_QUEUE_NAME, yarnQueue); - } - if (conf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { - // Should this also just be ignored? Throw for now, doAs unlike queue is often set by admin. - throw new HiveException(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + " is not supported"); - } - if (restrictedConfig != null) { - restrictedConfig.validate(conf); - } - } - - public void start() throws Exception { - sessions.startInitialSessions(); - if (expirationTracker != null) { - expirationTracker.start(); - } - allocationManager.start(); - } - - public void stop() throws Exception { - List sessionsToClose = null; - synchronized (openSessions) { - sessionsToClose = new ArrayList(openSessions.keySet()); - } - for (TezSessionState sessionState : sessionsToClose) { - sessionState.close(false); - } - if (expirationTracker != null) { - expirationTracker.stop(); - } - allocationManager.stop(); - } - - private WmTezSession createSession() { - WmTezSession session = createSessionObject(TezSessionState.makeSessionId()); - session.setQueueName(yarnQueue); - session.setDefault(); - LOG.info("Created new interactive session " + session.getSessionId()); - return session; - } - - @VisibleForTesting - protected WmTezSession createSessionObject(String sessionId) { - return new WmTezSession(sessionId, this, expirationTracker, new HiveConf(conf)); - } - - @Override - public void returnAfterUse(TezSessionPoolSession session) throws Exception { - boolean isInterrupted = Thread.interrupted(); - try { - WmTezSession wmSession = ensureOwnedSession(session); - redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); - sessions.returnSession((WmTezSession) session); - } finally { - // Reset the interrupt status. - if (isInterrupted) { - Thread.currentThread().interrupt(); - } - } - } - - - /** Closes a running (expired) pool session and reopens it. */ - @Override - public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception { - sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null, null); - } - - private WmTezSession ensureOwnedSession(TezSessionPoolSession oldSession) { - if (!(oldSession instanceof WmTezSession) || !oldSession.isOwnedBy(this)) { - throw new AssertionError("Not a WM session " + oldSession); - } - WmTezSession session = (WmTezSession) oldSession; - return session; - } - - /** Called by TezSessionPoolSession when opened. */ - @Override - public void registerOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { - openSessions.put(session, true); - } - } - - /** Called by TezSessionPoolSession when closed. */ - @Override - public void unregisterOpenSession(TezSessionPoolSession session) { - synchronized (openSessions) { - openSessions.remove(session); - } - } - - @VisibleForTesting - public SessionExpirationTracker getExpirationTracker() { - return expirationTracker; - } - - @Override - public TezSessionState reopen(TezSessionPoolSession session, Configuration conf, - String[] additionalFiles) throws Exception { - WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession(); - newSession.setPoolName(oldSession.getPoolName()); - HiveConf sessionConf = session.getConf(); - if (sessionConf == null) { - LOG.warn("Session configuration is null for " + session); - // default queue name when the initial session was created - sessionConf = new HiveConf(conf, WorkloadManager.class); - } - sessions.replaceSession(oldSession, newSession, true, additionalFiles, sessionConf); - // We are going to immediately give this session out, so ensure AM registry. - if (!ensureAmIsRegistered(newSession)) { - throw new Exception("Session is not usable after reopen"); - } - redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession); - return newSession; - } - - @Override - public void destroy(TezSessionPoolSession session) throws Exception { - LOG.warn("Closing a pool session because of retry failure."); - // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution. - WmTezSession wmSession = ensureOwnedSession(session); - closeAndReopenPoolSession(wmSession); - redistributePoolAllocations(wmSession.getPoolName(), null, wmSession); - } - - protected final HiveConf getConf() { - return conf; - } -} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index 59efd43..b9023b5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.wm.ISessionPoolManager; import org.apache.tez.dag.api.TezException; @@ -47,7 +48,7 @@ private boolean doAsEnabled; public SampleTezSessionState( - String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) { + String sessionId, ISessionPoolManager parent, HiveConf conf) { super(sessionId, parent, (parent instanceof TezSessionPoolManager) ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf); this.sessionId = sessionId; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezWorkloadManager.java new file mode 100644 index 0000000..3264a38 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezWorkloadManager.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.tez.dag.api.TezConfiguration; + +import java.util.List; + +import org.junit.Test; + +public class TestTezWorkloadManager { + private static class MockQam implements QueryAllocationManager { + boolean isCalled = false; + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public void updateSessionsAsync(double totalMaxAlloc, List sessions) { + isCalled = true; + } + + void assertWasCalled() { + assertTrue(isCalled); + isCalled = false; + } + } + + private static class TezWorkloadManagerForTest extends TezWorkloadManager { + + TezWorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, + QueryAllocationManager qam) { + super(yarnQueue, conf, numSessions, qam, null); + } + + @Override + protected WmTezSession createSessionObject(String sessionId) { + return new SampleTezSessionState(sessionId, this, new HiveConf(getConf())); + } + + @Override + protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { + return true; + } + } + + @Test(timeout = 10000) + public void testReuse() throws Exception { + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + TezWorkloadManager wm = new TezWorkloadManagerForTest("test", conf, 1, qam); + wm.start(); + TezSessionState nonPool = mock(TezSessionState.class); + when(nonPool.getConf()).thenReturn(conf); + doNothing().when(nonPool).close(anyBoolean()); + TezSessionState session = wm.getSession(nonPool, null, conf); + verify(nonPool).close(anyBoolean()); + assertNotSame(nonPool, session); + session.returnToSessionManager(); + TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); + when(diffPool.getConf()).thenReturn(conf); + doNothing().when(diffPool).returnToSessionManager(); + session = wm.getSession(diffPool, null, conf); + verify(diffPool).returnToSessionManager(); + assertNotSame(diffPool, session); + TezSessionState session2 = wm.getSession(session, null, conf); + assertSame(session, session2); + } + + @Test(timeout = 10000) + public void testQueueName() throws Exception { + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + TezWorkloadManager wm = new TezWorkloadManagerForTest("test", conf, 1, qam); + wm.start(); + // The queue should be ignored. + conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); + TezSessionState session = wm.getSession(null, null, conf); + assertEquals("test", session.getQueueName()); + assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); + session.setQueueName("test2"); + session = wm.getSession(session, null, conf); + assertEquals("test", session.getQueueName()); + } + + // Note (unrelated to epsilon): all the fraction checks are valid with the current logic in the + // absence of policies. This will change when there are policies. + private final static double EPSILON = 0.001; + + @Test(timeout = 10000) + public void testReopen() throws Exception { + // We should always get a different object, and cluster fraction should be propagated. + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + TezWorkloadManager wm = new TezWorkloadManagerForTest("test", conf, 1, qam); + wm.start(); + WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + assertEquals(1.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + WmTezSession session2 = (WmTezSession) session.reopen(conf, null); + assertNotSame(session, session2); + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + assertEquals(0.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + } + + @Test(timeout = 10000) + public void testDestroyAndReturn() throws Exception { + // Session should not be lost; however the fraction should be discarded. + HiveConf conf = createConf(); + MockQam qam = new MockQam(); + TezWorkloadManager wm = new TezWorkloadManagerForTest("test", conf, 2, qam); + wm.start(); + WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); + assertEquals(1.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf); + assertEquals(0.5, session.getClusterFraction(), EPSILON); + assertEquals(0.5, session2.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + assertNotSame(session, session2); + session.destroy(); // Destroy before returning to the pool. + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + assertEquals(0.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + + // We never lose pool session, so we should still be able to get. + session = (WmTezSession) wm.getSession(null, null, conf); + session.returnToSessionManager(); + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + assertEquals(0.0, session.getClusterFraction(), EPSILON); + qam.assertWasCalled(); + + // Now destroy the returned session (which is technically not valid) and confirm correctness. + session.destroy(); + assertEquals(1.0, session2.getClusterFraction(), EPSILON); + //qam.assertWasNotCalled(); + } + + private HiveConf createConf() { + HiveConf conf = new HiveConf(); + conf.set(ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME.varname, "-1"); + conf.set(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); + conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, ""); + return conf; + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java deleted file mode 100644 index 7adf895..0000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.tez; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.tez.dag.api.TezConfiguration; - -import java.util.List; - -import org.junit.Test; - -public class TestWorkloadManager { - private static class MockQam implements QueryAllocationManager { - boolean isCalled = false; - - @Override - public void start() { - } - - @Override - public void stop() { - } - - @Override - public void updateSessionsAsync(double totalMaxAlloc, List sessions) { - isCalled = true; - } - - void assertWasCalled() { - assertTrue(isCalled); - isCalled = false; - } - } - - private static class WorkloadManagerForTest extends WorkloadManager { - - WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions, - QueryAllocationManager qam) { - super(yarnQueue, conf, numSessions, qam, null); - } - - @Override - protected WmTezSession createSessionObject(String sessionId) { - return new SampleTezSessionState(sessionId, this, new HiveConf(getConf())); - } - - @Override - protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception { - return true; - } - } - - @Test(timeout = 10000) - public void testReuse() throws Exception { - HiveConf conf = createConf(); - MockQam qam = new MockQam(); - WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); - wm.start(); - TezSessionState nonPool = mock(TezSessionState.class); - when(nonPool.getConf()).thenReturn(conf); - doNothing().when(nonPool).close(anyBoolean()); - TezSessionState session = wm.getSession(nonPool, null, conf); - verify(nonPool).close(anyBoolean()); - assertNotSame(nonPool, session); - session.returnToSessionManager(); - TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class); - when(diffPool.getConf()).thenReturn(conf); - doNothing().when(diffPool).returnToSessionManager(); - session = wm.getSession(diffPool, null, conf); - verify(diffPool).returnToSessionManager(); - assertNotSame(diffPool, session); - TezSessionState session2 = wm.getSession(session, null, conf); - assertSame(session, session2); - } - - @Test(timeout = 10000) - public void testQueueName() throws Exception { - HiveConf conf = createConf(); - MockQam qam = new MockQam(); - WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); - wm.start(); - // The queue should be ignored. - conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2"); - TezSessionState session = wm.getSession(null, null, conf); - assertEquals("test", session.getQueueName()); - assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME)); - session.setQueueName("test2"); - session = wm.getSession(session, null, conf); - assertEquals("test", session.getQueueName()); - } - - // Note (unrelated to epsilon): all the fraction checks are valid with the current logic in the - // absence of policies. This will change when there are policies. - private final static double EPSILON = 0.001; - - @Test(timeout = 10000) - public void testReopen() throws Exception { - // We should always get a different object, and cluster fraction should be propagated. - HiveConf conf = createConf(); - MockQam qam = new MockQam(); - WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam); - wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); - assertEquals(1.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); - WmTezSession session2 = (WmTezSession) session.reopen(conf, null); - assertNotSame(session, session2); - assertEquals(1.0, session2.getClusterFraction(), EPSILON); - assertEquals(0.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); - } - - @Test(timeout = 10000) - public void testDestroyAndReturn() throws Exception { - // Session should not be lost; however the fraction should be discarded. - HiveConf conf = createConf(); - MockQam qam = new MockQam(); - WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam); - wm.start(); - WmTezSession session = (WmTezSession) wm.getSession(null, null, conf); - assertEquals(1.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); - WmTezSession session2 = (WmTezSession) wm.getSession(null, null, conf); - assertEquals(0.5, session.getClusterFraction(), EPSILON); - assertEquals(0.5, session2.getClusterFraction(), EPSILON); - qam.assertWasCalled(); - assertNotSame(session, session2); - session.destroy(); // Destroy before returning to the pool. - assertEquals(1.0, session2.getClusterFraction(), EPSILON); - assertEquals(0.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); - - // We never lose pool session, so we should still be able to get. - session = (WmTezSession) wm.getSession(null, null, conf); - session.returnToSessionManager(); - assertEquals(1.0, session2.getClusterFraction(), EPSILON); - assertEquals(0.0, session.getClusterFraction(), EPSILON); - qam.assertWasCalled(); - - // Now destroy the returned session (which is technically not valid) and confirm correctness. - session.destroy(); - assertEquals(1.0, session2.getClusterFraction(), EPSILON); - //qam.assertWasNotCalled(); - } - - private HiveConf createConf() { - HiveConf conf = new HiveConf(); - conf.set(ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME.varname, "-1"); - conf.set(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false"); - conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, ""); - return conf; - } -} diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 5cb973c..2371888 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -57,8 +57,6 @@ import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; -import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; -import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; @@ -79,6 +77,8 @@ import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; import org.apache.hive.service.servlet.QueryProfileServlet; +import org.apache.hive.service.wm.tez.WorkloadManager; +import org.apache.hive.wm.IWorkloadManager; import org.apache.logging.log4j.util.Strings; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -107,7 +107,7 @@ private CuratorFramework zooKeeperClient; private boolean deregisteredWithZooKeeper = false; // Set to true only when deregistration happens private HttpServer webServer; // Web UI - private WorkloadManager wm; + private IWorkloadManager wm; public HiveServer2() { super(HiveServer2.class.getSimpleName()); @@ -165,7 +165,9 @@ public void run() { // Initialize workload management. String wmQueue = HiveConf.getVar(hiveConf, ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE); if (wmQueue != null && !wmQueue.isEmpty()) { - wm = WorkloadManager.create(wmQueue, hiveConf); + WorkloadManager workloadManager = new WorkloadManager(WorkloadManager.class.getSimpleName(), wmQueue, + cliService.getSessionManager()); + addService(workloadManager); } else { wm = null; } @@ -552,24 +554,6 @@ public synchronized void stop() { LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e); } } - // There should already be an instance of the session pool manager. - // If not, ignoring is fine while stopping HiveServer2. - if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - try { - TezSessionPoolManager.getInstance().stop(); - } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } - if (wm != null) { - try { - wm.stop(); - } catch (Exception e) { - LOG.error("Workload manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); - } - } if (hiveConf != null && hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { try { @@ -607,9 +591,8 @@ private static void startHiveServer2() throws Throwable { HiveServer2 server = null; try { // Initialize the pool before we start the server; don't start yet. - TezSessionPoolManager sessionPool = null; + IWorkloadManager sessionPool = null; if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - sessionPool = TezSessionPoolManager.getInstance(); sessionPool.setupPool(hiveConf); } @@ -635,9 +618,6 @@ private static void startHiveServer2() throws Throwable { if (sessionPool != null) { sessionPool.startPool(); } - if (server.wm != null) { - server.wm.start(); - } if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { SparkSessionManagerImpl.getInstance().setup(hiveConf); diff --git a/service/src/java/org/apache/hive/service/wm/tez/WorkloadManager.java b/service/src/java/org/apache/hive/service/wm/tez/WorkloadManager.java new file mode 100644 index 0000000..713923e --- /dev/null +++ b/service/src/java/org/apache/hive/service/wm/tez/WorkloadManager.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.wm.tez; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.tez.TezSessionState; +import org.apache.hadoop.hive.ql.exec.tez.TezWorkloadManager; +import org.apache.hive.service.AbstractService; +import org.apache.hive.service.cli.session.SessionManager; +import org.apache.hive.wm.IWorkloadManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class WorkloadManager extends AbstractService implements IWorkloadManager { + private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); + private TezWorkloadManager tezWorkloadManager; + private SessionManager sessionManager; + private String yarnQueue; + + public WorkloadManager(final String serviceName, final String yarnQueue, final SessionManager sessionManager) { + super(serviceName); + this.yarnQueue = yarnQueue; + this.sessionManager = sessionManager; + } + + @Override + public synchronized void init(final HiveConf hiveConf) { + if (tezWorkloadManager == null) { + tezWorkloadManager = TezWorkloadManager.create(yarnQueue, hiveConf); + } + } + + @Override + public synchronized void start() { + try { + if (tezWorkloadManager != null) { + tezWorkloadManager.start(); + } + } catch (Exception e) { + LOG.error("Unable to start {} service. ", getName(), e); + } + } + + @Override + public synchronized void stop() { + try { + if (tezWorkloadManager != null) { + tezWorkloadManager.stop(); + } + } catch (Exception e) { + LOG.error("Unable to stop {} service. ", getName(), e); + } + } + + @Override + public void closeAndReopenSession(final TezSessionState session) throws Exception { + tezWorkloadManager.closeAndReopenSession(session); + } + + @Override + public void registerOpenSession(final TezSessionState session) { + tezWorkloadManager.registerOpenSession(session); + } + + @Override + public void unregisterOpenSession(final TezSessionState session) { + tezWorkloadManager.unregisterOpenSession(session); + } + + @Override + public void returnAfterUse(final TezSessionState session) throws Exception { + tezWorkloadManager.returnAfterUse(session); + } + + @Override + public TezSessionState reopen(final TezSessionState session, final Configuration conf, final String[] inputOutputJars) + throws Exception { + return tezWorkloadManager.reopen(session, conf, inputOutputJars); + } + + @Override + public void destroy(final TezSessionState session) throws Exception { + tezWorkloadManager.destroy(session); + } + + public SessionManager getSessionManager() { + return sessionManager; + } + + public void setSessionManager(final SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + @Override + public void setupPool(final HiveConf hiveConf) throws InterruptedException { + tezWorkloadManager.setupPool(hiveConf); + } + + @Override + public void startPool() throws Exception { + tezWorkloadManager.startPool(); + } +}