diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index cc3969d..3a6a9c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -92,6 +93,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { + handlerThread.setDaemon(true); handlerThread.start(); client.start(); super.serviceStart(); @@ -248,6 +250,10 @@ public void run() { while (true) { try { responseQueue.put(response); + if (response.getAMCommand() == AMCommand.AM_RESYNC + || response.getAMCommand() == AMCommand.AM_SHUTDOWN) { + return; + } break; } catch (InterruptedException ex) { LOG.info("Interrupted while waiting to put on response queue", ex); @@ -285,24 +291,18 @@ public void run() { } if (response.getAMCommand() != null) { - boolean stop = false; switch(response.getAMCommand()) { case AM_RESYNC: case AM_SHUTDOWN: handler.onShutdownRequest(); LOG.info("Shutdown requested. Stopping callback."); - stop = true; - break; + return; default: String msg = "Unhandled value of AMCommand: " + response.getAMCommand(); LOG.error(msg); throw new YarnRuntimeException(msg); } - if(stop) { - // should probably stop heartbeating also YARN-763 - break; - } } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { @@ -324,4 +324,9 @@ public void run() { } } } + + @VisibleForTesting + boolean isHeartbeatThreadAlive() { + return heartbeatThread.isAlive(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index b50742d..6ae5d7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -23,6 +23,8 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.Arrays; @@ -204,7 +206,7 @@ public void testAMRMClientAsyncReboot() throws Exception { synchronized (callbackHandler.notifier) { asyncClient.registerApplicationMaster("localhost", 1234, null); - while(callbackHandler.reboot == false) { + while(callbackHandler.rebootOrshutDown == false) { try { callbackHandler.notifier.wait(); } catch (InterruptedException e) { @@ -218,6 +220,41 @@ public void testAMRMClientAsyncReboot() throws Exception { Assert.assertTrue(callbackHandler.callbackCount == 0); } + @SuppressWarnings("rawtypes") + @Test (timeout = 10000) + public void testAMRMClientAsyncShutDown() throws Exception { + Configuration conf = new Configuration(); + TestCallbackHandler callbackHandler = new TestCallbackHandler(); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + final AllocateResponse shutDownResponse = createAllocateResponse( + new ArrayList(), new ArrayList(), null); + shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); + when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + asyncClient.init(conf); + asyncClient.start(); + + synchronized (callbackHandler.notifier) { + asyncClient.registerApplicationMaster("localhost", 1234, null); + while(callbackHandler.rebootOrshutDown == false) { + try { + callbackHandler.notifier.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + Assert.assertFalse(((AMRMClientAsyncImpl) asyncClient) + .isHeartbeatThreadAlive()); + verify(client, times(1)).allocate(anyFloat()); + asyncClient.stop(); + } + private AllocateResponse createAllocateResponse( List completed, List allocated, List nmTokens) { @@ -239,7 +276,7 @@ public static ContainerId newContainerId(int appId, int appAttemptId, private volatile List completedContainers; private volatile List allocatedContainers; Exception savedException = null; - boolean reboot = false; + boolean rebootOrshutDown = false; Object notifier = new Object(); int callbackCount = 0; @@ -300,7 +337,7 @@ public void onContainersAllocated(List containers) { @Override public void onShutdownRequest() { - reboot = true; + rebootOrshutDown = true; synchronized (notifier) { notifier.notifyAll(); }