diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index ab45020..86fedb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1212,9 +1212,14 @@ private void resetDispatcher() { ((Service)dispatcher).init(this.conf); ((Service)dispatcher).start(); removeService((Service)rmDispatcher); + // Need to stop previous rmDispatcher before assigning new dispatcher // otherwise causes "AsyncDispatcher event handler" thread leak - ((Service) rmDispatcher).stop(); + Thread dispatcherShutdownThread = + new Thread(new DispatcherShutdownThread(rmDispatcher)); + dispatcherShutdownThread.setName("DispatcherShutdown Handler"); + dispatcherShutdownThread.start(); + rmDispatcher = dispatcher; addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); @@ -1259,4 +1264,18 @@ private static void deleteRMStateStore(Configuration conf) throws Exception { rmStore.stop(); } } + + private static class DispatcherShutdownThread implements Runnable { + Dispatcher rmDispatcher; + + DispatcherShutdownThread(Dispatcher dispatcher) { + rmDispatcher = dispatcher; + } + + @Override + public void run() { + ((Service) rmDispatcher).stop(); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 1066b3a..4c0c351 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -398,6 +398,66 @@ public void testHAWithRMHostName() throws Exception { innerTestHAWithRMHostName(true); } + @Test(timeout = 90000) + public void testTransitionedToStandbyShouldNotHang() throws Exception { + configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + Configuration conf = new YarnConfiguration(configuration); + + rm = new MockRM(conf) { + @Override + void stopActiveServices() throws Exception { + Thread.sleep(10000); + super.stopActiveServices(); + } + }; + rm.init(conf); + final StateChangeRequestInfo requestInfo = + new StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER); + + assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService + .getServiceStatus().getState()); + assertFalse("RM is ready to become active before being started", + rm.adminService.getServiceStatus().isReadyToBecomeActive()); + checkMonitorHealth(); + + rm.start(); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + + // 2. Transition to Active. + rm.adminService.transitionToActive(requestInfo); + + // 3. Try Transition to standby + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + rm.transitionToStandby(true); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + t.start(); + + // Triggering this event cause transitionToStandby hang + rm.getRMContext() + .getDispatcher() + .getEventHandler() + .handle( + new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED, + "test failure")); + + t.join(); // wait for thread to finish + + rm.adminService.transitionToStandby(requestInfo); + checkStandbyRMFunctionality(); + } + public void innerTestHAWithRMHostName(boolean includeBindHost) { //this is run two times, with and without a bind host configured if (includeBindHost) {