diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 703595c..2b4ca12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -710,8 +710,7 @@ NMCallbackHandler createNMCallbackHandler() { @VisibleForTesting protected boolean finish() { // wait for completion. - while (!done - && (numCompletedContainers.get() != numTotalContainers)) { + while (!done && !shouldExit()) { try { Thread.sleep(200); } catch (InterruptedException ex) {} @@ -840,8 +839,8 @@ public void onContainersCompleted(List completedContainers) { amRMClient.addContainerRequest(containerAsk); } } - - if (numCompletedContainers.get() == numTotalContainers) { + + if (shouldExit()) { done = true; } } @@ -1297,6 +1296,24 @@ boolean getDone() { return done; } + private boolean shouldExit() { + /* + * AM should exit when either of following happens: + * 1) completed >= allocated + requested (When all allocated containers are + * finished, and no more pending requested containers) + * 2) completed - failed >= total (number of succeeded containers is completed + * - total, so when #succeeded >= total, we should exit.) + */ + int completed = numCompletedContainers.get(); + int allocated = numAllocatedContainers.get(); + int requesting = numRequestedContainers.get(); + int failed = numFailedContainers.get(); + int succeeded = numCompletedContainers.get() - failed; + + return (completed >= allocated && requesting <= 0) || (succeeded + >= numTotalContainers); + } + @VisibleForTesting Thread createLaunchContainerThread(Container allocatedContainer, String shellId) {