diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index e9649824f1..f653ace73c 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -37,6 +37,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; @@ -63,9 +64,14 @@ */ public class SessionManager extends CompositeService { + private static final String INACTIVE_ERROR_MESSAGE = + "Cannot open sessions on an inactive HS2 instance; use service discovery to connect"; public static final String HIVERCFILE = ".hiverc"; private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class); private HiveConf hiveConf; + /** The lock that synchronizes the allowSessions flag and handleToSession map. */ + private final Object sessionAddLock = new Object(); + private boolean allowSessions; private final Map handleToSession = new ConcurrentHashMap(); private final Map connectionsCount = new ConcurrentHashMap<>(); @@ -373,10 +379,18 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str return createSession(null, protocol, username, password, ipAddress, sessionConf, withImpersonation, delegationToken).getSessionHandle(); } + public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password, String ipAddress, Map sessionConf, boolean withImpersonation, String delegationToken) throws HiveSQLException { + // Check the flag opportunistically. + synchronized (sessionAddLock) { + if (!allowSessions) { + throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); + } + } + // Do the expensive operations outside of any locks; we'll recheck the flag again at the end. // if client proxies connection, use forwarded ip-addresses instead of just the gateway final List forwardedAddresses = getForwardedAddresses(); @@ -448,8 +462,23 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p session = null; throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e); } - handleToSession.put(session.getSessionHandle(), session); - LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount()); + boolean isAdded = false; + synchronized (sessionAddLock) { + if (allowSessions) { + handleToSession.put(session.getSessionHandle(), session); + isAdded = true; + } + } + if (!isAdded) { + try { + closeSessionInternal(session); + } catch (Exception e) { + LOG.warn("Failed to close the session opened during an HA state change; ignoring", e); + } + throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); + } + LOG.info("Session opened, " + session.getSessionHandle() + + ", current sessions:" + getOpenSessionCount()); return session; } @@ -548,6 +577,10 @@ public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQ throw new HiveSQLException("Session does not exist: " + sessionHandle); } LOG.info("Session closed, " + sessionHandle + ", current sessions:" + getOpenSessionCount()); + closeSessionInternal(session); + } + + private void closeSessionInternal(HiveSession session) throws HiveSQLException { try { session.close(); } finally { @@ -683,5 +716,11 @@ public String getHiveServer2HostName() throws Exception { } return hiveServer2.getServerHost(); } + + public void allowSessions(boolean b) { + synchronized (sessionAddLock) { + this.allowSessions = b; + } + } } diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 432a341bef..35e40b1357 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,14 +18,9 @@ package org.apache.hive.service.server; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -738,6 +733,10 @@ public synchronized void start() { } } + if (!activePassiveHA) { + allowClientSessions(); + } + if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (!activePassiveHA) { LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); @@ -775,6 +774,7 @@ public void isLeader() { } hiveServer2.startOrReconnectTezSessions(); LOG.info("Started/Reconnected tez sessions."); + hiveServer2.allowClientSessions(); // resolve futures used for testing if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) { @@ -787,7 +787,7 @@ public void isLeader() { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); - hiveServer2.closeHiveSessions(); + hiveServer2.closeAndDisallowHiveSessions(); hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); @@ -824,6 +824,10 @@ private void startOrReconnectTezSessions() { initAndStartWorkloadManager(resourcePlan); } + private void allowClientSessions() { + cliService.getSessionManager().allowSessions(true); + } + private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) { // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid // SessionState.get() return null during createTezDir @@ -860,17 +864,18 @@ private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) } } - private void closeHiveSessions() { + private void closeAndDisallowHiveSessions() { LOG.info("Closing all open hive sessions."); - if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) { - try { - for (HiveSession session : cliService.getSessionManager().getSessions()) { - cliService.getSessionManager().closeSession(session.getSessionHandle()); - } - LOG.info("Closed all open hive sessions"); - } catch (HiveSQLException e) { - LOG.error("Unable to close all open sessions.", e); + if (cliService == null) return; + cliService.getSessionManager().allowSessions(false); + // No sessions can be opened after the above call. Close the existing ones if any. + try { + for (HiveSession session : cliService.getSessionManager().getSessions()) { + cliService.getSessionManager().closeSession(session.getSessionHandle()); } + LOG.info("Closed all open hive sessions"); + } catch (HiveSQLException e) { + LOG.error("Unable to close all open sessions.", e); } }