diff --git itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index 4055f13fd0..bf24ebf0af 100644 --- itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -43,7 +43,9 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.registry.impl.ZkRegistryBase; import org.apache.hive.http.security.PamAuthenticator; +import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.server.HS2ActivePassiveHARegistry; import org.apache.hive.service.server.HS2ActivePassiveHARegistryClient; import org.apache.hive.service.server.HiveServer2Instance; @@ -383,11 +385,7 @@ public void testManualFailover() throws Exception { assertEquals("false", sendGet(url2, true, true)); assertEquals(false, miniHS2_2.isLeader()); } finally { - // revert configs to not affect other tests - unsetPamConfs(hiveConf1); - unsetPamConfs(hiveConf2); - hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); - hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); + resetFailoverConfs(); } } @@ -427,6 +425,62 @@ public void testManualFailoverUnauthorized() throws Exception { } @Test(timeout = 60000) + public void testNoConnectionOnPassive() throws Exception { + hiveConf1.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true); + hiveConf2.setBoolVar(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS, true); + setPamConfs(hiveConf1); + setPamConfs(hiveConf2); + try { + PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1); + PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2); + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.setPamAuthenticator(pamAuthenticator1); + miniHS2_1.start(getSecureConfOverlay(instanceId1)); + String instanceId2 = UUID.randomUUID().toString(); + Map confOverlay = getSecureConfOverlay(instanceId2); + miniHS2_2.setPamAuthenticator(pamAuthenticator2); + miniHS2_2.start(confOverlay); + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); + assertEquals(true, miniHS2_1.isLeader()); + + // Don't get urls from ZK, it will actually be a service discovery URL that we don't want. + String hs1Url = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort(); + Connection hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); // Should work. + hs2Conn.close(); + + String resp = sendDelete(url1, true); + assertTrue(resp, resp.contains("Failover successful!")); + // wait for failover to close sessions + while (miniHS2_1.getOpenSessionsCount() != 0) { + Thread.sleep(100); + } + + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); + assertEquals(true, miniHS2_2.isLeader()); + + try { + hs2Conn = getConnection(hs1Url, System.getProperty("user.name")); + fail("Should throw"); + } catch (Exception e) { + if (!e.getMessage().contains("Cannot open sessions on an inactive HS2")) { + throw e; + } + } + } finally { + resetFailoverConfs(); + } + } + + private void resetFailoverConfs() { + // revert configs to not affect other tests + unsetPamConfs(hiveConf1); + unsetPamConfs(hiveConf2); + hiveConf1.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); + hiveConf2.unset(ConfVars.HIVE_SERVER2_WEBUI_ENABLE_CORS.varname); + } + + @Test(timeout = 60000) public void testClientConnectionsOnFailover() throws Exception { setPamConfs(hiveConf1); setPamConfs(hiveConf2); diff --git itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java index 8b28e2dec0..95b46a8149 100644 --- itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java +++ itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java @@ -48,7 +48,7 @@ public void setup() { conf = new HiveConf(); conf.set("hive.support.concurrency", "false"); - sessionManager = new SessionManager(null); + sessionManager = new SessionManager(null, true); sessionManager.init(conf); } diff --git itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index a78dd739b1..e4ac0a927e 100644 --- itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -621,6 +621,10 @@ private void waitForStartup() throws Exception { */ sessionHandle = hs2Client.openSession("foo", "bar", sessionConf); } catch (Exception e) { + if (e.getMessage().contains("Cannot open sessions on an inactive HS2")) { + // Passive HS2 has started. TODO: seems fragile + return; + } // service not started yet continue; } diff --git service/src/java/org/apache/hive/service/cli/CLIService.java service/src/java/org/apache/hive/service/cli/CLIService.java index dbfaf7154e..e28e5133a2 100644 --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -72,16 +72,19 @@ // The HiveServer2 instance running this service private final HiveServer2 hiveServer2; private int defaultFetchRows; + // This is necessary for tests and embedded mode, where HS2 init is not executed. + private boolean allowSessionsInitial; - public CLIService(HiveServer2 hiveServer2) { + public CLIService(HiveServer2 hiveServer2, boolean allowSessions) { super(CLIService.class.getSimpleName()); this.hiveServer2 = hiveServer2; + this.allowSessionsInitial = allowSessions; } @Override public synchronized void init(HiveConf hiveConf) { setHiveConf(hiveConf); - sessionManager = new SessionManager(hiveServer2); + sessionManager = new SessionManager(hiveServer2, allowSessionsInitial); defaultFetchRows = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE); addService(sessionManager); // If the hadoop cluster is secure, do a kerberos login for the service from the keytab @@ -450,7 +453,7 @@ public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getP HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS); final long elapsed = System.currentTimeMillis() - operation.getBeginTime(); - // A step function to increase the polling timeout by 500 ms every 10 sec, + // A step function to increase the polling timeout by 500 ms every 10 sec, // starting from 500 ms up to HIVE_SERVER2_LONG_POLLING_TIMEOUT final long timeout = Math.min(maxTimeout, (elapsed / TimeUnit.SECONDS.toMillis(10) + 1) * 500); @@ -491,7 +494,7 @@ private JobProgressUpdate progressUpdateLog(boolean isProgressLogRequested, Oper || !OperationType.EXECUTE_STATEMENT.equals(operation.getType())) { return new JobProgressUpdate(ProgressMonitor.NULL); } - + SessionState sessionState = operation.getParentSession().getSessionState(); long startTime = System.nanoTime(); int timeOutMs = 8; diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index e9649824f1..694a691450 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -37,6 +37,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; @@ -63,9 +64,16 @@ */ public class SessionManager extends CompositeService { + private static final String INACTIVE_ERROR_MESSAGE = + "Cannot open sessions on an inactive HS2 instance; use service discovery to connect"; public static final String HIVERCFILE = ".hiverc"; private static final Logger LOG = LoggerFactory.getLogger(CompositeService.class); private HiveConf hiveConf; + /** The lock that synchronizes the allowSessions flag and handleToSession map. + Active-passive HA first disables the connections, then closes existing one, making sure + there are no races between these two processes. */ + private final Object sessionAddLock = new Object(); + private boolean allowSessions; private final Map handleToSession = new ConcurrentHashMap(); private final Map connectionsCount = new ConcurrentHashMap<>(); @@ -87,9 +95,10 @@ private String sessionImplWithUGIclassName; private String sessionImplclassName; - public SessionManager(HiveServer2 hiveServer2) { + public SessionManager(HiveServer2 hiveServer2, boolean allowSessions) { super(SessionManager.class.getSimpleName()); this.hiveServer2 = hiveServer2; + this.allowSessions = allowSessions; } @Override @@ -373,10 +382,18 @@ public SessionHandle openSession(TProtocolVersion protocol, String username, Str return createSession(null, protocol, username, password, ipAddress, sessionConf, withImpersonation, delegationToken).getSessionHandle(); } + public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password, String ipAddress, Map sessionConf, boolean withImpersonation, String delegationToken) throws HiveSQLException { + // Check the flag opportunistically. + synchronized (sessionAddLock) { + if (!allowSessions) { + throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); + } + } + // Do the expensive operations outside of any locks; we'll recheck the flag again at the end. // if client proxies connection, use forwarded ip-addresses instead of just the gateway final List forwardedAddresses = getForwardedAddresses(); @@ -448,8 +465,23 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p session = null; throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e); } - handleToSession.put(session.getSessionHandle(), session); - LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount()); + boolean isAdded = false; + synchronized (sessionAddLock) { + if (allowSessions) { + handleToSession.put(session.getSessionHandle(), session); + isAdded = true; + } + } + if (!isAdded) { + try { + closeSessionInternal(session); + } catch (Exception e) { + LOG.warn("Failed to close the session opened during an HA state change; ignoring", e); + } + throw new HiveSQLException(INACTIVE_ERROR_MESSAGE); + } + LOG.info("Session opened, " + session.getSessionHandle() + + ", current sessions:" + getOpenSessionCount()); return session; } @@ -548,6 +580,10 @@ public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQ throw new HiveSQLException("Session does not exist: " + sessionHandle); } LOG.info("Session closed, " + sessionHandle + ", current sessions:" + getOpenSessionCount()); + closeSessionInternal(session); + } + + private void closeSessionInternal(HiveSession session) throws HiveSQLException { try { session.close(); } finally { @@ -683,5 +719,11 @@ public String getHiveServer2HostName() throws Exception { } return hiveServer2.getServerHost(); } + + public void allowSessions(boolean b) { + synchronized (sessionAddLock) { + this.allowSessions = b; + } + } } diff --git service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java index 8b61874f93..7ab7aee7b0 100644 --- service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java @@ -32,7 +32,8 @@ public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public EmbeddedThriftBinaryCLIService() { - super(new CLIService(null), null); + // The non-test path that allows connections for the embedded service. + super(new CLIService(null, true), null); isEmbedded = true; HiveConf.setLoadHiveServer2Config(true); } diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 432a341bef..c4f4e80b4b 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -18,14 +18,9 @@ package org.apache.hive.service.server; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Base64; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -211,7 +206,8 @@ public synchronized void init(HiveConf hiveConf) { LOG.warn("Could not initiate the HiveServer2 Metrics system. Metrics may not be reported.", t); } - cliService = new CLIService(this); + // Do not allow sessions - leader election or initialization will allow them for an active HS2. + cliService = new CLIService(this, false); addService(cliService); final HiveServer2 hiveServer2 = this; Runnable oomHook = new Runnable() { @@ -738,6 +734,10 @@ public synchronized void start() { } } + if (!activePassiveHA) { + allowClientSessions(); + } + if (hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { if (!activePassiveHA) { LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); @@ -775,6 +775,7 @@ public void isLeader() { } hiveServer2.startOrReconnectTezSessions(); LOG.info("Started/Reconnected tez sessions."); + hiveServer2.allowClientSessions(); // resolve futures used for testing if (HiveConf.getBoolVar(hiveServer2.getHiveConf(), ConfVars.HIVE_IN_TEST)) { @@ -787,7 +788,7 @@ public void isLeader() { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); - hiveServer2.closeHiveSessions(); + hiveServer2.closeAndDisallowHiveSessions(); hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); @@ -824,6 +825,10 @@ private void startOrReconnectTezSessions() { initAndStartWorkloadManager(resourcePlan); } + private void allowClientSessions() { + cliService.getSessionManager().allowSessions(true); + } + private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resourcePlan) { // starting Tez session pool in start here to let parent session state initialize on CliService state, to avoid // SessionState.get() return null during createTezDir @@ -860,17 +865,18 @@ private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) } } - private void closeHiveSessions() { + private void closeAndDisallowHiveSessions() { LOG.info("Closing all open hive sessions."); - if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) { - try { - for (HiveSession session : cliService.getSessionManager().getSessions()) { - cliService.getSessionManager().closeSession(session.getSessionHandle()); - } - LOG.info("Closed all open hive sessions"); - } catch (HiveSQLException e) { - LOG.error("Unable to close all open sessions.", e); + if (cliService == null) return; + cliService.getSessionManager().allowSessions(false); + // No sessions can be opened after the above call. Close the existing ones if any. + try { + for (HiveSession session : cliService.getSessionManager().getSessions()) { + cliService.getSessionManager().closeSession(session.getSessionHandle()); } + LOG.info("Closed all open hive sessions"); + } catch (HiveSQLException e) { + LOG.error("Unable to close all open sessions.", e); } } diff --git service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java index 14e28323e0..8bfa7dc157 100644 --- service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java +++ service/src/test/org/apache/hive/service/auth/TestPlainSaslHelper.java @@ -42,7 +42,7 @@ public void testDoAsSetting(){ hconf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS)); - CLIService cliService = new CLIService(null); + CLIService cliService = new CLIService(null, true); cliService.init(hconf); ThriftCLIService tcliService = new ThriftBinaryCLIService(cliService, null); tcliService.init(hconf); diff --git service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java index 5ecea9a08b..6ce40ec68d 100644 --- service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java +++ service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java @@ -329,7 +329,7 @@ public void testConnectionForwardedIpAddresses() throws HiveSQLException { private CLIService getService(HiveConf conf) { conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - CLIService service = new CLIService(null); + CLIService service = new CLIService(null, true); service.init(conf); service.start(); return service; diff --git service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java index 6b69d4d0af..1e0c4274ad 100644 --- service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java +++ service/src/test/org/apache/hive/service/cli/TestCLIServiceRestore.java @@ -45,7 +45,7 @@ public CLIService getService() { HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); - CLIService service = new CLIService(null); + CLIService service = new CLIService(null, true); service.init(conf); service.start(); return service; diff --git service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java index 7bae62d977..f3770c2d25 100644 --- service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java +++ service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java @@ -183,7 +183,7 @@ public void testSessionLifeAfterTransportClose() throws InterruptedException, Hi } } if (service == null) { - service = new CLIService(server); + service = new CLIService(server, true); } RetryingThriftCLIServiceClient.CLIServiceClientWrapper client = RetryingThriftCLIServiceClientTest.newRetryingCLIServiceClient(hiveConf); diff --git service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java index 90237c088f..ef096202a1 100644 --- service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java +++ service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java @@ -42,7 +42,7 @@ public void testSessionImpl() throws Exception { SampleHiveSessionImpl.class.getName()); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - CLIService cliService = new CLIService(null); + CLIService cliService = new CLIService(null, true); cliService.init(hiveConf); ThriftBinaryCLIService service = new ThriftBinaryCLIService(cliService, null); service.init(hiveConf); @@ -68,7 +68,7 @@ public void testSessionImplWithUGI() throws Exception { SampleHiveSessionImplWithUGI.class.getName()); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, true); - CLIService cliService = new CLIService(null); + CLIService cliService = new CLIService(null, true); cliService.init(hiveConf); ThriftBinaryCLIService service = new ThriftBinaryCLIService(cliService, null); service.init(hiveConf); diff --git service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index af7a72e70f..9d00ec4353 100644 --- service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -52,7 +52,7 @@ */ private class FakeEmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public FakeEmbeddedThriftBinaryCLIService(HiveConf hiveConf) { - super(new CLIService(null), null); + super(new CLIService(null, true), null); isEmbedded = true; cliService.init(hiveConf); cliService.start(); diff --git service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java index d954692e98..565545824a 100644 --- service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java +++ service/src/test/org/apache/hive/service/cli/session/TestSessionManagerMetrics.java @@ -79,7 +79,7 @@ public void setup() throws Exception { conf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false); MetricsFactory.init(conf); - sm = new SessionManager(null); + sm = new SessionManager(null, true); sm.init(conf); metrics = (CodahaleMetrics) MetricsFactory.getInstance();