diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java index c94c0e1..fb846b4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestActivePassiveHA.java @@ -134,6 +134,7 @@ public void testActivePassiveHA() throws Exception { String instanceId2 = UUID.randomUUID().toString(); miniHS2_2.start(getConfOverlay(instanceId2)); + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("true", sendGet(url)); @@ -175,6 +176,7 @@ public void testActivePassiveHA() throws Exception { miniHS2_1.stop(); + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_2.isLeader()); url = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("true", sendGet(url)); @@ -264,6 +266,7 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { confOverlay.put(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH.varname, "clidriverTest"); miniHS2_2.start(confOverlay); + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); String url = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; assertEquals("true", sendGet(url)); @@ -285,6 +288,7 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { // miniHS2_2 will become leader miniHS2_1.stop(); + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); String hs2_2_directUrl = "jdbc:hive2://" + miniHS2_2.getHost() + ":" + miniHS2_2.getHttpPort() + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; @@ -302,6 +306,7 @@ public void testConnectionActivePassiveHAServiceDiscovery() throws Exception { // miniHS2_1 will become leader miniHS2_2.stop(); + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); parsedUrl = HiveConnection.getAllUrls(zkJdbcUrl).get(0).getJdbcUriString(); hs2_1_directUrl = "jdbc:hive2://" + miniHS2_1.getHost() + ":" + miniHS2_1.getBinaryPort() + "/default;serviceDiscoveryMode=" + serviceDiscoveryMode + ";zooKeeperNamespace=" + zkHANamespace + ";"; @@ -330,6 +335,7 @@ public void testManualFailover() throws Exception { String url2 = "http://localhost:" + hiveConf2.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); assertEquals("true", sendGet(url1, true)); @@ -338,23 +344,28 @@ public void testManualFailover() throws Exception { assertTrue(resp.contains("Failover successful!")); // make sure miniHS2_1 is not leader + assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get()); assertEquals(false, miniHS2_1.isLeader()); assertEquals("false", sendGet(url1, true)); // make sure miniHS2_2 is the new leader + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_2.isLeader()); assertEquals("true", sendGet(url2, true)); // send failover request again to miniHS2_1 and get a failure resp = sendDelete(url1, true); assertTrue(resp.contains("Cannot failover an instance that is not a leader")); + assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get()); assertEquals(false, miniHS2_1.isLeader()); // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners) resp = sendDelete(url2, true); assertTrue(resp.contains("Failover successful!")); + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); assertEquals("true", sendGet(url1, true)); + assertEquals(true, miniHS2_2.getNotLeaderTestFuture().get()); assertEquals("false", sendGet(url2, true)); assertEquals(false, miniHS2_2.isLeader()); } finally { @@ -382,13 +393,16 @@ public void testManualFailoverUnauthorized() throws Exception { String url1 = "http://localhost:" + hiveConf1.get(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname) + "/leader"; // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); assertEquals("true", sendGet(url1, true)); // trigger failover on miniHS2_1 without authorization header assertEquals("Unauthorized", sendDelete(url1, false)); assertTrue(sendDelete(url1, true).contains("Failover successful!")); + assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get()); assertEquals(false, miniHS2_1.isLeader()); + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_2.isLeader()); } finally { // revert configs to not affect other tests @@ -419,6 +433,7 @@ public void testClientConnectionsOnFailover() throws Exception { assertTrue(zkJdbcUrl.contains(zkConnectString)); // when we start miniHS2_1 will be leader (sequential start) + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); assertEquals("true", sendGet(url1, true)); @@ -441,10 +456,12 @@ public void testClientConnectionsOnFailover() throws Exception { } // make sure miniHS2_1 is not leader + assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get()); assertEquals(false, miniHS2_1.isLeader()); assertEquals("false", sendGet(url1, true)); // make sure miniHS2_2 is the new leader + assertEquals(true, miniHS2_2.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_2.isLeader()); assertEquals("true", sendGet(url2, true)); @@ -461,13 +478,16 @@ public void testClientConnectionsOnFailover() throws Exception { // send failover request again to miniHS2_1 and get a failure resp = sendDelete(url1, true); assertTrue(resp.contains("Cannot failover an instance that is not a leader")); + assertEquals(true, miniHS2_1.getNotLeaderTestFuture().get()); assertEquals(false, miniHS2_1.isLeader()); // send failover request to miniHS2_2 and make sure miniHS2_1 takes over (returning back to leader, test listeners) resp = sendDelete(url2, true); assertTrue(resp.contains("Failover successful!")); + assertEquals(true, miniHS2_1.getIsLeaderTestFuture().get()); assertEquals(true, miniHS2_1.isLeader()); assertEquals("true", sendGet(url1, true)); + assertEquals(true, miniHS2_2.getNotLeaderTestFuture().get()); assertEquals("false", sendGet(url2, true)); assertEquals(false, miniHS2_2.isLeader()); // make sure miniHS2_2 closes all its connections diff --git a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 23fcbe8..cafdb98 100644 --- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeoutException; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -416,6 +418,14 @@ public boolean isLeader() { return hiveServer2.isLeader(); } + public SettableFuture getIsLeaderTestFuture() { + return hiveServer2.getIsLeaderTestFuture(); + } + + public SettableFuture getNotLeaderTestFuture() { + return hiveServer2.getNotLeaderTestFuture(); + } + public void setPamAuthenticator(final PamAuthenticator pamAuthenticator) { this.pamAuthenticator = pamAuthenticator; } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 47f84b5..4e88565 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -119,6 +119,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -149,6 +150,9 @@ private Hive sessionHive; private String wmQueue; private AtomicBoolean isLeader = new AtomicBoolean(false); + // used for testing + private SettableFuture isLeaderTestFuture = SettableFuture.create(); + private SettableFuture notLeaderTestFuture = SettableFuture.create(); public HiveServer2() { super(HiveServer2.class.getSimpleName()); @@ -167,6 +171,24 @@ public void setPamAuthenticator(PamAuthenticator pamAuthenticator) { this.pamAuthenticator = pamAuthenticator; } + @VisibleForTesting + public SettableFuture getIsLeaderTestFuture() { + return isLeaderTestFuture; + } + + @VisibleForTesting + public SettableFuture getNotLeaderTestFuture() { + return notLeaderTestFuture; + } + + private void resetIsLeaderTestFuture() { + isLeaderTestFuture = SettableFuture.create(); + } + + private void resetNotLeaderTestFuture() { + notLeaderTestFuture = SettableFuture.create(); + } + @Override public synchronized void init(HiveConf hiveConf) { //Initialize metrics first, as some metrics are for initialization stuff. @@ -698,6 +720,12 @@ public void isLeader() { } hiveServer2.startOrReconnectTezSessions(); LOG.info("Started/Reconnected tez sessions."); + + // resolve futures used for testing + if (HiveConf.getBoolVar(hiveServer2.hiveConf, ConfVars.HIVE_IN_TEST)) { + hiveServer2.isLeaderTestFuture.set(true); + hiveServer2.resetNotLeaderTestFuture(); + } } @Override @@ -707,6 +735,12 @@ public void notLeader() { hiveServer2.closeHiveSessions(); hiveServer2.stopOrDisconnectTezSessions(); LOG.info("Stopped/Disconnected tez sessions."); + + // resolve futures used for testing + if (HiveConf.getBoolVar(hiveServer2.hiveConf, ConfVars.HIVE_IN_TEST)) { + hiveServer2.notLeaderTestFuture.set(true); + hiveServer2.resetIsLeaderTestFuture(); + } } }