diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index e53826d..d2d0bee 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -66,6 +66,7 @@ private Connection hs2Conn = null; private static String ADMIN_USER = "user1"; // user from TestPamAuthenticator private static String ADMIN_PASSWORD = "1"; + private static String serviceDiscoveryMode = "zooKeeperHA"; private static String zkHANamespace = "hs2ActivePassiveHATest"; private HiveConf hiveConf1; private HiveConf hiveConf2; @@ -276,7 +277,6 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { String zkJdbcUrl = miniHS2_1.getJdbcURL(); // getAllUrls will parse zkJdbcUrl and will plugin the active HS2's host:port String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); - final String serviceDiscoveryMode = "zooKeeperHA"; String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; assertTrue(zkJdbcUrl.contains(zkConnectString)); @@ -396,6 +396,96 @@ public void testManualFailoverUnauthorized() throws Exception { } } + @Test(timeout = 60000) + public void testClientConnectionsOnFailover() throws Exception { + setPamConfs(hiveConf1); + setPamConfs(hiveConf2); + PamAuthenticator pamAuthenticator1 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf1); + PamAuthenticator pamAuthenticator2 = new TestHS2HttpServerPam.TestPamAuthenticator(hiveConf2); + try { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.setPamAuthenticator(pamAuthenticator1); + miniHS2_1.start(getSecureConfOverlay(instanceId1)); + String instanceId2 = UUID.randomUUID().toString(); + Map confOverlay = getSecureConfOverlay(instanceId2); + confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); + confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); + miniHS2_2.setPamAuthenticator(pamAuthenticator2); + miniHS2_2.start(confOverlay); + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + String zkJdbcUrl = miniHS2_1.getJdbcURL(); + String zkConnectString = zkServer.getConnectString(); + assertTrue(zkJdbcUrl.contains(zkConnectString)); + + // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1, true)); + + // before failover, check if we are getting connection from miniHS2_1 + String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + assertEquals(hs2_1_directUrl, parsedUrl); + hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name")); + while (miniHS2_1.getOpenSessionsCount() != 1) { + Thread.sleep(100); + } + + // trigger failover on miniHS2_1 and make sure the connections are closed + String resp = sendDelete(url1, true); + assertTrue(resp.contains("Failover successful!")); + // wait for failover to close sessions + while (miniHS2_1.getOpenSessionsCount() != 0) { + Thread.sleep(100); + } + + // make sure miniHS2_1 is not leader + assertEquals(false, miniHS2_1.isLeader()); + assertEquals("false", sendGet(url1, true)); + + // make sure miniHS2_2 is the new leader + assertEquals(true, miniHS2_2.isLeader()); + assertEquals("true", sendGet(url2, true)); + + // when we make a new connection we should get it from miniHS2_2 this time + String hs2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getHttpPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + assertEquals(hs2_2_directUrl, parsedUrl); + hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name")); + while (miniHS2_2.getOpenSessionsCount() != 1) { + Thread.sleep(100); + } + + // send failover request again to miniHS2_1 and get a failure + resp = sendDelete(url1, true); + assertTrue(resp.contains("Cannot failover an instance that is not a leader")); + assertEquals(false, miniHS2_1.isLeader()); + + // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners) + resp = sendDelete(url2, true); + assertTrue(resp.contains("Failover successful!")); + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1, true)); + assertEquals("false", sendGet(url2, true)); + assertEquals(false, miniHS2_2.isLeader()); + // make sure miniHS2_2 closes all its connections + while (miniHS2_2.getOpenSessionsCount() != 0) { + Thread.sleep(100); + } + // new connections goes to miniHS2_1 now + hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name")); + while (miniHS2_1.getOpenSessionsCount() != 1) { + Thread.sleep(100); + } + } finally { + // revert configs to not affect other tests + unsetPamConfs(hiveConf1); + unsetPamConfs(hiveConf2); + } + } + private Connection getConnection(String jdbcURL, String user) throws SQLException { return DriverManager.getConnection(jdbcURL, user, "bar"); } diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index fa5edec..23fcbe8 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -420,6 +420,10 @@ public void setPamAuthenticator(final PamAuthenticator pamAuthenticator) { this.pamAuthenticator = pamAuthenticator; } + public int getOpenSessionsCount() { + return hiveServer2.getOpenSessionsCount(); + } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 90ba752..6308c5c 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -96,7 +96,8 @@ import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; -import org.apache.hive.service.cli.session.SessionManager; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; @@ -548,6 +549,10 @@ public boolean isLeader() { return isLeader.get(); } + public int getOpenSessionsCount() { + return cliService != null ? cliService.getSessionManager().getOpenSessionCount() : 0; + } + interface FailoverHandler { void failover() throws Exception; } @@ -694,9 +699,7 @@ public void isLeader() { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); - // TODO: should we explicitly close client connections with appropriate error msg? SessionManager.closeSession() - // will shut itself down upon explicit --deregister after all connections are closed. Something similar but for - // failover. + hiveServer2.closeHiveSessions(); hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); } @@ -759,8 +762,22 @@ private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) } } + private void closeHiveSessions() { + LOG.info("Closing all open hive sessions."); + if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) { + try { + for (HiveSession session : cliService.getSessionManager().getSessions()) { + cliService.getSessionManager().closeSession(session.getSessionHandle()); + } + LOG.info("Closed all open hive sessions"); + } catch (HiveSQLException e) { + LOG.error("Unable to close all open sessions.", e); + } + } + } + private void stopOrDisconnectTezSessions() { - LOG.info("Stoppping/Disconnecting tez sessions."); + LOG.info("Stopping/Disconnecting tez sessions."); // There should already be an instance of the session pool manager. // If not, ignoring is fine while stopping HiveServer2. if (tezSessionPoolManager != null) { @@ -768,8 +785,7 @@ private void stopOrDisconnectTezSessions() { tezSessionPoolManager.stop(); LOG.info("Stopped tez session pool manager."); } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); + LOG.error("Error while stopping tez session pool manager.", e); } } if (wm != null) { @@ -777,8 +793,7 @@ private void stopOrDisconnectTezSessions() { wm.stop(); LOG.info("Stopped workload manager."); } catch (Exception e) { - LOG.error("Workload manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); + LOG.error("Error while stopping workload manager.", e); } } }