diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index cc3969d..b87a55d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -300,7 +300,13 @@ public void run() { throw new YarnRuntimeException(msg); } if(stop) { - // should probably stop heartbeating also YARN-763 + try { + keepRunning = false; + heartbeatThread.interrupt(); + heartbeatThread.join(); + } catch (InterruptedException ex) { + LOG.error("Error joining with hander thread", ex); + } break; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index b50742d..06cb63d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -218,6 +218,54 @@ public void testAMRMClientAsyncReboot() throws Exception { Assert.assertTrue(callbackHandler.callbackCount == 0); } + @Test (timeout=5000) + public void testAMRMClientAsyncShutDown() throws Exception { + Configuration conf = new Configuration(); + List completed1 = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + List allocated1 = Arrays.asList( + Container.newInstance(null, null, null, null, null, null)); + final AllocateResponse response1 = createAllocateResponse( + new ArrayList(), allocated1, null); + final AllocateResponse response2 = createAllocateResponse(completed1, + new ArrayList(), null); + final AllocateResponse shutDownResponse = createAllocateResponse( + new ArrayList(), new ArrayList(), null); + shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); + + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + final AMRMClient client = mock(AMRMClientImpl.class); + when(client.allocate(anyFloat())).thenReturn(shutDownResponse) + .thenReturn(response1).thenReturn(response2); + + when(client.registerApplicationMaster(anyString(), anyInt(), anyString())) + .thenReturn(null); + when(client.getAvailableResources()).thenAnswer(new Answer() { + @Override + public Resource answer(InvocationOnMock invocation) + throws Throwable { + // take client lock to simulate behavior of real impl + synchronized (client) { + Thread.sleep(10); + } + return null; + } + }); + + AMRMClientAsyncImpl asyncClient = + new AMRMClientAsyncImpl( + client, 2000, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + asyncClient.registerApplicationMaster("localhost", 1234, null); + + Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); + Assert.assertEquals(null, callbackHandler.takeAllocatedContainers()); + + asyncClient.stop(); + } + private AllocateResponse createAllocateResponse( List completed, List allocated, List nmTokens) {