diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 5343a8b..aaef538 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -389,7 +389,7 @@ private synchronized void closeZkClients() throws IOException { } @Override - protected synchronized void closeInternal() throws Exception { + protected void closeInternal() throws Exception { if (verifyActiveStatusThread != null) { verifyActiveStatusThread.interrupt(); verifyActiveStatusThread.join(1000); @@ -976,9 +976,12 @@ Void run() throws KeeperException, InterruptedException { /** * Helper method that creates fencing node, executes the passed operations, * and deletes the fencing node. + * + * @param opList the list of ZK operations to perform + * @throws Exception if any of the ZK operations fail */ - private synchronized void doMultiWithRetries( - final List opList) throws Exception { + @VisibleForTesting + synchronized void doMultiWithRetries(final List opList) throws Exception { final List execOpList = new ArrayList(opList.size() + 2); execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 6af0edd..1848058 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,12 +37,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Op; import org.apache.zookeeper.data.Stat; import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertFalse; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -47,7 +53,6 @@ private static final int ZK_TIMEOUT_MS = 1000; class TestZKRMStateStoreTester implements RMStateStoreHelper { - ZooKeeper client; TestZKRMStateStoreInternal store; String workingZnode; @@ -58,7 +63,7 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode) throws Exception { init(conf); start(); - assertTrue(znodeWorkingPath.equals(workingZnode)); + assertEquals(workingZnode, znodeWorkingPath); } @Override @@ -238,4 +243,88 @@ public void testFencing() throws Exception { HAServiceProtocol.HAServiceState.ACTIVE, rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); } + + @Test + public void testTransitionWithUnreachableZK() throws Exception { + final AtomicBoolean zkUnreachable = new AtomicBoolean(false); + final CountDownLatch threadHung = new CountDownLatch(1); + final Configuration conf = createHARMConf("rm1,rm2", "rm1", 1234); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + + // Create a state store that can simulate losing contact with the ZK node + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() { + @Override + public RMStateStore getRMStateStore() throws Exception { + YarnConfiguration storeConf = new YarnConfiguration(conf); + storeConf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + storeConf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, + workingZnode); + storeConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 500); + this.client = createClient(); + this.store = new TestZKRMStateStoreInternal(storeConf, workingZnode) { + @Override + synchronized void doMultiWithRetries(final List opList) + throws Exception { + if (zkUnreachable.get()) { + // Let the test know that it can now proceed + threadHung.countDown(); + + // Take a long nap while holding the lock to simulate the ZK node + // being unreachable. This behavior models what happens in + // super.doStoreMultiWithRetries() when the ZK node it unreachble. + // If that behavior changes, then this test should also change or + // be phased out. + Thread.sleep(60000); + } else { + // Business as usual + super.doMultiWithRetries(opList); + } + } + }; + return this.store; + } + }; + + // Start with a single RM in HA mode + final RMStateStore store = zkTester.getRMStateStore(); + final MockRM rm = new MockRM(conf, store); + rm.start(); + + // Make the RM active + final StateChangeRequestInfo req = new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + rm.getRMContext().getRMAdminService().transitionToActive(req); + assertEquals("RM with ZKStore didn't start", + Service.STATE.STARTED, rm.getServiceState()); + assertEquals("RM should be Active", + HAServiceProtocol.HAServiceState.ACTIVE, + rm.getRMContext().getRMAdminService().getServiceStatus().getState()); + + // Simulate the ZK node going dark and wait for the + // VerifyActiveStatusThread to hang + zkUnreachable.set(true); + + assertTrue("Unable to perform test because Verify Active Status Thread " + + "did not run", threadHung.await(2, TimeUnit.SECONDS)); + + // Try to transition the RM to standby. Give up after 2000ms. + Thread standby = new Thread(new Runnable() { + @Override + public void run() { + try { + rm.getRMContext().getRMAdminService().transitionToStandby(req); + } catch (IOException ex) { + // OK to exit + } + } + }, "Test Unreachable ZK Thread"); + + standby.start(); + standby.join(2000); + + assertFalse("The thread initiating the transition to standby is hung", + standby.isAlive()); + zkUnreachable.set(false); + } }