Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (revision 1617734) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (working copy) @@ -1133,6 +1133,9 @@ ((Service)dispatcher).init(this.conf); ((Service)dispatcher).start(); removeService((Service)rmDispatcher); + // Need to stop previous rmDispatcher before assigning new dispatcher + // otherwise causes "AsyncDispatcher event handler" thread leak + ((Service) rmDispatcher).stop(); rmDispatcher = dispatcher; addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); Index: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java =================================================================== --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (revision 1617734) +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (working copy) @@ -331,6 +331,10 @@ rm.adminService.transitionToStandby(requestInfo); rm.adminService.transitionToActive(requestInfo); rm.adminService.transitionToStandby(requestInfo); + + MyCountingDispatcher dispatcher = + (MyCountingDispatcher) rm.getRMContext().getDispatcher(); + assertTrue(!dispatcher.isStopped()); rm.adminService.transitionToActive(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, @@ -339,6 +343,11 @@ assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); + + // Keep the dispatcher reference before transitioning to standby + dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher(); + + rm.adminService.transitionToStandby(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) @@ -346,6 +355,8 @@ assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); + assertTrue(dispatcher.isStopped()); + rm.stop(); } @@ -492,6 +503,8 @@ private int eventHandlerCount; + private volatile boolean stopped = false; + public MyCountingDispatcher() { super("MyCountingDispatcher"); this.eventHandlerCount = 0; @@ -510,5 +523,15 @@ public int getEventHandlerCount() { return this.eventHandlerCount; } + + @Override + protected void serviceStop() throws Exception { + this.stopped = true; + super.serviceStop(); + } + + public boolean isStopped() { + return this.stopped; + } } }