diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 012af3f..3c458f2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -647,6 +647,7 @@ public float getProgress() { @Override public void onError(Exception e) { done = true; + resourceManager.stop(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java index ae781b6..cc65b2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java @@ -227,6 +227,13 @@ public abstract void unregisterApplicationMaster( public float getProgress(); + /** + * Called when error comes from RM communications as well as from errors in + * the callback itself from the app. Calling + * @link{AMRMClientAsync.stop()} is the recommended action. + * + * @param e + */ public void onError(Exception e); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index f700286..5d096c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -269,50 +269,54 @@ public CallbackHandlerThread() { public void run() { while (keepRunning) { - AllocateResponse response; try { - if(savedException != null) { - LOG.error("Stopping callback due to: ", savedException); - handler.onError(savedException); - break; + AllocateResponse response; + try { + if(savedException != null) { + LOG.error("Stopping callback due to: ", savedException); + handler.onError(savedException); + break; + } + response = responseQueue.take(); + } catch (InterruptedException ex) { + LOG.info("Interrupted while waiting for queue", ex); + continue; } - response = responseQueue.take(); - } catch (InterruptedException ex) { - LOG.info("Interrupted while waiting for queue", ex); - continue; - } - if (response.getAMCommand() != null) { - switch(response.getAMCommand()) { - case AM_RESYNC: - case AM_SHUTDOWN: - handler.onShutdownRequest(); - LOG.info("Shutdown requested. Stopping callback."); - return; - default: - String msg = - "Unhandled value of AMCommand: " + response.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); + if (response.getAMCommand() != null) { + switch(response.getAMCommand()) { + case AM_RESYNC: + case AM_SHUTDOWN: + handler.onShutdownRequest(); + LOG.info("Shutdown requested. Stopping callback."); + return; + default: + String msg = + "Unhandled value of AMCommand: " + response.getAMCommand(); + LOG.error(msg); + throw new YarnRuntimeException(msg); + } + } + List updatedNodes = response.getUpdatedNodes(); + if (!updatedNodes.isEmpty()) { + handler.onNodesUpdated(updatedNodes); + } + + List completed = + response.getCompletedContainersStatuses(); + if (!completed.isEmpty()) { + handler.onContainersCompleted(completed); + } + + List allocated = response.getAllocatedContainers(); + if (!allocated.isEmpty()) { + handler.onContainersAllocated(allocated); } - } - List updatedNodes = response.getUpdatedNodes(); - if (!updatedNodes.isEmpty()) { - handler.onNodesUpdated(updatedNodes); - } - - List completed = - response.getCompletedContainersStatuses(); - if (!completed.isEmpty()) { - handler.onContainersCompleted(completed); - } - List allocated = response.getAllocatedContainers(); - if (!allocated.isEmpty()) { - handler.onContainersAllocated(allocated); + progress = handler.getProgress(); + } catch (Exception ex) { + handler.onError(ex); } - - progress = handler.getProgress(); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 8dd4ac6..356c4af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -21,10 +21,13 @@ import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +57,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -280,6 +284,30 @@ public void testCallAMRMClientAsyncStopFromCallbackHandler() } } + @Test (timeout = 5000) + public void testCallBackThrowOutException() throws YarnException, IOException { + Configuration conf = new Configuration(); + TestCallbackHandler2 callbackHandler = spy(new TestCallbackHandler2()); + @SuppressWarnings("unchecked") + AMRMClient client = mock(AMRMClientImpl.class); + + List completed = Arrays.asList( + ContainerStatus.newInstance(newContainerId(0, 0, 0, 0), + ContainerState.COMPLETE, "", 0)); + final AllocateResponse response = createAllocateResponse(completed, + new ArrayList(), null); + + when(client.allocate(anyFloat())).thenReturn(response); + AMRMClientAsync asyncClient = + AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler); + callbackHandler.registerAsyncClient(asyncClient); + callbackHandler.setThrowOutException(true); + asyncClient.init(conf); + asyncClient.start(); + asyncClient.registerApplicationMaster("localhost", 1234, null); + verify(callbackHandler, timeout(5000)).onError(any(Exception.class)); + } + private AllocateResponse createAllocateResponse( List completed, List allocated, List nmTokens) { @@ -391,9 +419,14 @@ public void onError(Exception e) { @SuppressWarnings("rawtypes") AMRMClientAsync asynClient; boolean stop = false; + boolean throwOutException = false; @Override - public void onContainersCompleted(List statuses) {} + public void onContainersCompleted(List statuses) { + if (throwOutException) { + throw new YarnRuntimeException("Exception from callback handler"); + } + } @Override public void onContainersAllocated(List containers) {} @@ -415,11 +448,18 @@ public float getProgress() { } @Override - public void onError(Exception e) {} + public void onError(Exception e) { + Assert.assertEquals(e.getMessage(), "Exception from callback handler"); + asynClient.stop(); + } public void registerAsyncClient( AMRMClientAsync asyncClient) { this.asynClient = asyncClient; } + + public void setThrowOutException(boolean throwOutException) { + this.throwOutException = throwOutException; + } } }