diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index 4055f13fd0..bf24ebf0af 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -43,7 +43,9 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; import org.apache.hive.http.security.PamAuthenticator; +import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.server.HS2ActivePassiveHARegistry; import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; import org.apache.hive.service.server.HiveServer2Instance; @@ -383,11 +385,7 @@ public void testManualFailover() throws Exception { assertEquals("false", sendGet(url2, true, true)); assertEquals(false, miniHS2_2.isLeader()); } finally { - // revert configs to not affect other tests - unsetPamConfs(hiveConf1); - unsetPamConfs(hiveConf2); - hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); - hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); + resetFailoverConfs(); } } @@ -427,6 +425,62 @@ public void testManualFailoverUnauthorized() throws Exception { } @Test(timeout = 60000) + public void testNoConnectionOnPassive() throws Exception { + hiveConf1.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true); + hiveConf2.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true); + setPamConfs(hiveConf1); + setPamConfs(hiveConf2); + try { + PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1); + PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2); + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.setPamAuthenticator(pamAuthenticator1); + miniHS2_1.start(getSecureConfOverlay(instanceId1)); + String instanceId2 = UUID.randomUUID().toString(); + Map confOverlay = getSecureConfOverlay(instanceId2); + miniHS2_2.setPamAuthenticator(pamAuthenticator2); + miniHS2_2.start(confOverlay); + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); + assertEquals(true, miniHS2_1.isLeader()); + + // Don't get urls from ZK, it will actually be a service discovery URL that we don't want. + String hs1Url = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort(); + Connection hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); // Should work. + hs2Conn.close(); + + String resp = sendDelete(url1, true); + assertTrue(resp, resp.contains("Failover successful!")); + // wait for failover to close sessions + while (miniHS2_1.getOpenSessionsCount() != 0) { + Thread.sleep(100); + } + + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); + assertEquals(true, miniHS2_2.isLeader()); + + try { + hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); + fail("Should throw"); + } catch (Exception e) { + if (!e.getMessage().contains("Cannot open sessions on an inactive HS2")) { + throw e; + } + } + } finally { + resetFailoverConfs(); + } + } + + private void resetFailoverConfs() { + // revert configs to not affect other tests + unsetPamConfs(hiveConf1); + unsetPamConfs(hiveConf2); + hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); + hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); + } + + @Test(timeout = 60000) public void testClientConnectionsOnFailover() throws Exception { setPamConfs(hiveConf1); setPamConfs(hiveConf2); diff --git itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index a78dd739b1..e4ac0a927e 100644 --- itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -621,6 +621,10 @@ private void waitForStartup() throws Exception { */ sessionHandle = hs2Client.openSession("foo", "bar", sessionConf); } catch (Exception e) { + if (e.getMessage().contains("Cannot open sessions on an inactive HS2")) { + // Passive HS2 has started. TODO: seems fragile + return; + } // service not started yet continue; } diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index e9649824f1..dccaa34c7f 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -37,6 +37,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; @@ -63,9 +64,16 @@ */ public class SessionManager extends CompositeService { + private static final String INACTIVE_ERROR_MESSAGE = + "Cannot open sessions on an inactive HS2 instance; use service discovery to connect"; public static final String HIVERCFILE = ".hiverc"; private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class); private HiveConf hiveConf; + /** The lock that synchronizes the allowSessions flag and handleToSession map. + Active-passive HA first disables the connections, then closes existing one, making sure + there are no races between these two processes. */ + private final Object sessionAddLock = new Object(); + private boolean allowSessions; private final Map handleToSession = new ConcurrentHashMap(); private final Map connectionsCount = new ConcurrentHashMap<>(); @@ -373,10 +381,18 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str return createSession(null, protocol, username, password, ipAddress, sessionConf, withImpersonation, delegationToken).getSessionHandle(); } + public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password, String ipAddress, Map sessionConf, boolean withImpersonation, String delegationToken) throws HiveSQLException { + // Check the flag opportunistically. + synchronized (sessionAddLock) { + if (!allowSessions) { + throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); + } + } + // Do the expensive operations outside of any locks; we'll recheck the flag again at the end. // if client proxies connection, use forwarded ip-addresses instead of just the gateway final List forwardedAddresses = getForwardedAddresses(); @@ -448,8 +464,23 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p session = null; throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e); } - handleToSession.put(session.getSessionHandle(), session); - LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount()); + boolean isAdded = false; + synchronized (sessionAddLock) { + if (allowSessions) { + handleToSession.put(session.getSessionHandle(), session); + isAdded = true; + } + } + if (!isAdded) { + try { + closeSessionInternal(session); + } catch (Exception e) { + LOG.warn("Failed to close the session opened during an HA state change; ignoring", e); + } + throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); + } + LOG.info("Session opened, " + session.getSessionHandle() + + ", current sessions:" + getOpenSessionCount()); return session; } @@ -548,6 +579,10 @@ public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQ throw new HiveSQLException("Session does not exist: " + sessionHandle); } LOG.info("Session closed, " + sessionHandle + ", current sessions:" + getOpenSessionCount()); + closeSessionInternal(session); + } + + private void closeSessionInternal(HiveSession session) throws HiveSQLException { try { session.close(); } finally { @@ -683,5 +718,11 @@ public String getHiveServer2HostName() throws Exception { } return hiveServer2.getServerHost(); } + + public void allowSessions(boolean b) { + synchronized (sessionAddLock) { + this.allowSessions = b; + } + } } diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 432a341bef..35e40b1357 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,14 +18,9 @@ package org.apache.hive.service.server; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -738,6 +733,10 @@ public synchronized void start() { } } + if (!activePassiveHA) { + allowClientSessions(); + } + if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (!activePassiveHA) { LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); @@ -775,6 +774,7 @@ public void isLeader() { } hiveServer2.startOrReconnectTezSessions(); LOG.info("Started/Reconnected tez sessions."); + hiveServer2.allowClientSessions(); // resolve futures used for testing if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) { @@ -787,7 +787,7 @@ public void isLeader() { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); - hiveServer2.closeHiveSessions(); + hiveServer2.closeAndDisallowHiveSessions(); hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); @@ -824,6 +824,10 @@ private void startOrReconnectTezSessions() { initAndStartWorkloadManager(resourcePlan); } + private void allowClientSessions() { + cliService.getSessionManager().allowSessions(true); + } + private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) { // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid // SessionState.get() return null during createTezDir @@ -860,17 +864,18 @@ private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) } } - private void closeHiveSessions() { + private void closeAndDisallowHiveSessions() { LOG.info("Closing all open hive sessions."); - if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) { - try { - for (HiveSession session : cliService.getSessionManager().getSessions()) { - cliService.getSessionManager().closeSession(session.getSessionHandle()); - } - LOG.info("Closed all open hive sessions"); - } catch (HiveSQLException e) { - LOG.error("Unable to close all open sessions.", e); + if (cliService == null) return; + cliService.getSessionManager().allowSessions(false); + // No sessions can be opened after the above call. Close the existing ones if any. + try { + for (HiveSession session : cliService.getSessionManager().getSessions()) { + cliService.getSessionManager().closeSession(session.getSessionHandle()); } + LOG.info("Closed all open hive sessions"); + } catch (HiveSQLException e) { + LOG.error("Unable to close all open sessions.", e); } }