diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 9e27de5..17e3f26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -474,4 +474,25 @@ public abstract ContainerReport getContainerReport(ContainerId containerId) */ public abstract void moveApplicationAcrossQueues(ApplicationId appId, String queue) throws YarnException, IOException; + + /** + *

+ * Wait for application until it's state becomes one of + * {terminateStates} OR stopped state {completed, failed, killed} + *

+ * + * @param appId + * Application to wait + * @param verbose + * Print verbose message when wait + * @param terminateStates + * States this method waiting for, pass null in if you only want to + * wait for stopped state - {completed, failed, killed} + * @return The latest state + * @throws YarnException + * @throws IOException + */ + public abstract YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 8a0348b..1b89f5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -101,6 +101,9 @@ private boolean historyServiceEnabled; private static final String ROOT = "root"; + private static final Set STOPPED_APPLICATION_STATES = EnumSet + .of(YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED); public YarnClientImpl() { super(YarnClientImpl.class.getName()); @@ -566,4 +569,60 @@ public void moveApplicationAcrossQueues(ApplicationId appId, MoveApplicationAcrossQueuesRequest.newInstance(appId, queue); rmClient.moveApplicationAcrossQueues(request); } + + @Override + public YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) + throws YarnException, IOException { + YarnApplicationState lastState = null; + while (true) { + // Get application report for our specified appId + ApplicationReport report = getApplicationReport(appId); + YarnApplicationState state = report.getYarnApplicationState(); + + // print verbose message when state changed + if (verbose) { + if (lastState == null) { + LOG.info(String.format("Application %s's state now is [%s]", + appId.toString(), state.name())); + lastState = state; + } else if (lastState != state) { + LOG.info(String.format( + "Application %s's state transfered from [%s] to [%s]", + appId.toString(), lastState.name(), state.name())); + lastState = state; + } + } + + if (STOPPED_APPLICATION_STATES.contains(state) + || (null != terminateStates && terminateStates.contains(state))) { + + // print verbose message if it's stopped state + if (verbose && STOPPED_APPLICATION_STATES.contains(state)) { + // if it's not succeed, print diagnostic message + if (YarnApplicationState.FINISHED != state) { + String diagMsg = report.getDiagnostics(); + if (null != diagMsg) { + LOG.info("Diagnostic message:"); + LOG.info(diagMsg); + } + } + + LOG.info(String.format( + "Application %s's total running time is %f seconds", + appId.toString(), + ((report.getFinishTime() - report.getStartTime()) / 1000.0))); + } + + return state; + } + + // Check app status after asyncApiPollIntervalMillis + try { + Thread.sleep(asyncApiPollIntervalMillis); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index cfee6f7..511a9db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -32,8 +32,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; - -import org.junit.Assert; +import java.util.Timer; +import java.util.TimerTask; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -82,6 +82,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.Test; public class TestYarnClient { @@ -705,7 +706,49 @@ public void testAsyncAPIPollTimeout() { testAsyncAPIPollTimeoutHelper(0L, true); testAsyncAPIPollTimeoutHelper(1L, true); } + + @Test(timeout = 10000) + public void testWaitForApplicationStates() throws YarnException, IOException { + testWaitForApplicationStateInternal(null, true); + + Set states = + EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.SUBMITTED); + testWaitForApplicationStateInternal(states, false); + } + private void + testWaitForApplicationStateInternal(Set states, boolean verbose) + throws YarnException, IOException { + Configuration conf = new Configuration(); + + // set a shorter pull interval to make this test finished earlier + final int yarnAppStateChangeInterval = 50; + conf.setLong( + YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + yarnAppStateChangeInterval); + MockYarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + + YarnApplicationState returnedState = null; + + if (null == states) { + // if user not set, simply use Finished + client.setYarnApplicationState(YarnApplicationState.FINISHED); + returnedState = YarnApplicationState.FINISHED; + } else { + // otherwise, use the first state passed in + returnedState = states.iterator().next(); + client.setYarnApplicationState(returnedState); + } + + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + YarnApplicationState state = + client.waitForApplicationState(applicationId, verbose, states); + + Assert.assertEquals(returnedState, state); + } + private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout, boolean expectedTimeoutEnforcement) { YarnClientImpl client = new YarnClientImpl();