diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index f4dc129..888bfb76 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -19,6 +19,7 @@ package org.apache.hive.jdbc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.BufferedReader; @@ -60,6 +61,7 @@ private MiniHS2 miniHS2_2 = null; private static TestingServer zkServer; private Connection hs2Conn = null; + private static String serviceDiscoveryMode = "zooKeeperHA"; private static String zkHANamespace = "hs2ActivePassiveHATest"; private HiveConf hiveConf1; private HiveConf hiveConf2; @@ -270,7 +272,6 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { String zkJdbcUrl = miniHS2_1.getJdbcURL(); // getAllUrls will parse zkJdbcUrl and will plugin the active HS2's host:port String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); - final String serviceDiscoveryMode = "zooKeeperHA"; String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; assertTrue(zkJdbcUrl.contains(zkConnectString)); @@ -346,6 +347,78 @@ public void testManualFailover() throws Exception { assertEquals(false, miniHS2_2.isLeader()); } + @Test(timeout = 60000) + public void testClientConnectionsOnFailover() throws Exception { + String instanceId1 = UUID.randomUUID().toString(); + miniHS2_1.start(getConfOverlay(instanceId1)); + String instanceId2 = UUID.randomUUID().toString(); + Map confOverlay = getConfOverlay(instanceId2); + confOverlay.put(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, "http"); + confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); + miniHS2_2.start(confOverlay); + String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; + String zkJdbcUrl = miniHS2_1.getJdbcURL(); + String zkConnectString = zkServer.getConnectString(); + assertTrue(zkJdbcUrl.contains(zkConnectString)); + + // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1)); + + // before failover, check if we are getting connection from miniHS2_1 + String hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + String parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + assertEquals(hs2_1_directUrl, parsedUrl); + hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name")); + assertEquals(1, miniHS2_1.getOpenSessionsCount()); + + // trigger failover on miniHS2_1 and make sure the connections are closed + String resp = sendDelete(url1); + assertTrue(resp.contains("Failover successful!")); + // wait for failover to close sessions + while(miniHS2_1.getOpenSessionsCount() != 0) { + Thread.sleep(100); + } + + // make sure miniHS2_1 is not leader + assertEquals(false, miniHS2_1.isLeader()); + assertEquals("false", sendGet(url1)); + + // make sure miniHS2_2 is the new leader + assertEquals(true, miniHS2_2.isLeader()); + assertEquals("true", sendGet(url2)); + + // when we make a new connection we should get it from miniHS2_2 this time + String hs2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getHttpPort() + + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; + parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); + assertEquals(hs2_2_directUrl, parsedUrl); + hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name")); + assertEquals(1, miniHS2_2.getOpenSessionsCount()); + + // send failover request again to miniHS2_1 and get a failure + resp = sendDelete(url1); + assertTrue(resp.contains("Cannot failover an instance that is not a leader")); + assertEquals(false, miniHS2_1.isLeader()); + + // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners) + resp = sendDelete(url2); + assertTrue(resp.contains("Failover successful!")); + assertEquals(true, miniHS2_1.isLeader()); + assertEquals("true", sendGet(url1)); + assertEquals("false", sendGet(url2)); + assertEquals(false, miniHS2_2.isLeader()); + // make sure miniHS2_2 closes all its connections + while(miniHS2_2.getOpenSessionsCount() != 0) { + Thread.sleep(100); + } + // new connections goes to miniHS2_1 now + hs2Conn = getConnection(zkJdbcUrl, System.getProperty("user.name")); + assertEquals(1, miniHS2_1.getOpenSessionsCount()); + } + private Connection getConnection(String jdbcURL, String user) throws SQLException { return DriverManager.getConnection(jdbcURL, user, "bar"); } diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index e1c2dd0..56abbed 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -412,6 +412,10 @@ public boolean isLeader() { return hiveServer2.isLeader(); } + public int getOpenSessionsCount() { + return hiveServer2.getOpenSessionsCount(); + } + public CLIServiceClient getServiceClient() { verifyStarted(); return getServiceClientInternal(); diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index f91471a..593c0a5 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -92,7 +92,8 @@ import org.apache.hive.service.CompositeService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.cli.CLIService; -import org.apache.hive.service.cli.session.SessionManager; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftHttpCLIService; @@ -539,6 +540,10 @@ public boolean isLeader() { return isLeader.get(); } + public int getOpenSessionsCount() { + return cliService != null ? cliService.getSessionManager().getOpenSessionCount() : 0; + } + interface FailoverHandler { void failover() throws Exception; } @@ -685,9 +690,7 @@ public void isLeader() { public void notLeader() { LOG.info("HS2 instance {} LOST LEADERSHIP. Stopping/Disconnecting tez sessions..", hiveServer2.serviceUri); hiveServer2.isLeader.set(false); - // TODO: should we explicitly close client connections with appropriate error msg? SessionManager.closeSession() - // will shut itself down upon explicit --deregister after all connections are closed. Something similar but for - // failover. + hiveServer2.closeHiveSessions(); hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); } @@ -750,8 +753,22 @@ private void initAndStartWorkloadManager(final WMFullResourcePlan resourcePlan) } } + private void closeHiveSessions() { + LOG.info("Closing all open hive sessions."); + if (cliService != null && cliService.getSessionManager().getOpenSessionCount() > 0) { + try { + for (HiveSession session : cliService.getSessionManager().getSessions()) { + cliService.getSessionManager().closeSession(session.getSessionHandle()); + } + LOG.info("Closed all open hive sessions"); + } catch (HiveSQLException e) { + LOG.error("Unable to close all open sessions.", e); + } + } + } + private void stopOrDisconnectTezSessions() { - LOG.info("Stoppping/Disconnecting tez sessions."); + LOG.info("Stopping/Disconnecting tez sessions."); // There should already be an instance of the session pool manager. // If not, ignoring is fine while stopping HiveServer2. if (tezSessionPoolManager != null) { @@ -759,8 +776,7 @@ private void stopOrDisconnectTezSessions() { tezSessionPoolManager.stop(); LOG.info("Stopped tez session pool manager."); } catch (Exception e) { - LOG.error("Tez session pool manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); + LOG.error("Error while stopping tez session pool manager.", e); } } if (wm != null) { @@ -768,8 +784,7 @@ private void stopOrDisconnectTezSessions() { wm.stop(); LOG.info("Stopped workload manager."); } catch (Exception e) { - LOG.error("Workload manager stop had an error during stop of HiveServer2. " - + "Shutting down HiveServer2 anyway.", e); + LOG.error("Error while stopping workload manager.", e); } } }