diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
index 9e27de5..17e3f26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
@@ -474,4 +474,25 @@ public abstract ContainerReport getContainerReport(ContainerId containerId)
*/
public abstract void moveApplicationAcrossQueues(ApplicationId appId,
String queue) throws YarnException, IOException;
+
+ /**
+ *
+ * Wait for application until it's state becomes one of
+ * {terminateStates} OR stopped state {completed, failed, killed}
+ *
+ *
+ * @param appId
+ * Application to wait
+ * @param verbose
+ * Print verbose message when wait
+ * @param terminateStates
+ * States this method waiting for, pass null in if you only want to
+ * wait for stopped state - {completed, failed, killed}
+ * @return The latest state
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract YarnApplicationState waitForApplicationState(ApplicationId appId,
+ boolean verbose, Set terminateStates)
+ throws YarnException, IOException;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 8a0348b..1b89f5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -101,6 +101,9 @@
private boolean historyServiceEnabled;
private static final String ROOT = "root";
+ private static final Set STOPPED_APPLICATION_STATES = EnumSet
+ .of(YarnApplicationState.KILLED, YarnApplicationState.FAILED,
+ YarnApplicationState.FINISHED);
public YarnClientImpl() {
super(YarnClientImpl.class.getName());
@@ -566,4 +569,60 @@ public void moveApplicationAcrossQueues(ApplicationId appId,
MoveApplicationAcrossQueuesRequest.newInstance(appId, queue);
rmClient.moveApplicationAcrossQueues(request);
}
+
+ @Override
+ public YarnApplicationState waitForApplicationState(ApplicationId appId,
+ boolean verbose, Set terminateStates)
+ throws YarnException, IOException {
+ YarnApplicationState lastState = null;
+ while (true) {
+ // Get application report for our specified appId
+ ApplicationReport report = getApplicationReport(appId);
+ YarnApplicationState state = report.getYarnApplicationState();
+
+ // print verbose message when state changed
+ if (verbose) {
+ if (lastState == null) {
+ LOG.info(String.format("Application %s's state now is [%s]",
+ appId.toString(), state.name()));
+ lastState = state;
+ } else if (lastState != state) {
+ LOG.info(String.format(
+ "Application %s's state transfered from [%s] to [%s]",
+ appId.toString(), lastState.name(), state.name()));
+ lastState = state;
+ }
+ }
+
+ if (STOPPED_APPLICATION_STATES.contains(state)
+ || (null != terminateStates && terminateStates.contains(state))) {
+
+ // print verbose message if it's stopped state
+ if (verbose && STOPPED_APPLICATION_STATES.contains(state)) {
+ // if it's not succeed, print diagnostic message
+ if (YarnApplicationState.FINISHED != state) {
+ String diagMsg = report.getDiagnostics();
+ if (null != diagMsg) {
+ LOG.info("Diagnostic message:");
+ LOG.info(diagMsg);
+ }
+ }
+
+ LOG.info(String.format(
+ "Application %s's total running time is %f seconds",
+ appId.toString(),
+ ((report.getFinishTime() - report.getStartTime()) / 1000.0)));
+ }
+
+ return state;
+ }
+
+ // Check app status after asyncApiPollIntervalMillis
+ try {
+ Thread.sleep(asyncApiPollIntervalMillis);
+ } catch (InterruptedException e) {
+ LOG.debug("Thread sleep in monitoring loop interrupted");
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index cfee6f7..511a9db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -32,8 +32,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
-import org.junit.Assert;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -82,6 +82,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
import org.junit.Test;
public class TestYarnClient {
@@ -705,7 +706,49 @@ public void testAsyncAPIPollTimeout() {
testAsyncAPIPollTimeoutHelper(0L, true);
testAsyncAPIPollTimeoutHelper(1L, true);
}
+
+ @Test(timeout = 10000)
+ public void testWaitForApplicationStates() throws YarnException, IOException {
+ testWaitForApplicationStateInternal(null, true);
+
+ Set states =
+ EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.SUBMITTED);
+ testWaitForApplicationStateInternal(states, false);
+ }
+ private void
+ testWaitForApplicationStateInternal(Set states, boolean verbose)
+ throws YarnException, IOException {
+ Configuration conf = new Configuration();
+
+ // set a shorter pull interval to make this test finished earlier
+ final int yarnAppStateChangeInterval = 50;
+ conf.setLong(
+ YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
+ yarnAppStateChangeInterval);
+ MockYarnClient client = new MockYarnClient();
+ client.init(conf);
+ client.start();
+
+ YarnApplicationState returnedState = null;
+
+ if (null == states) {
+ // if user not set, simply use Finished
+ client.setYarnApplicationState(YarnApplicationState.FINISHED);
+ returnedState = YarnApplicationState.FINISHED;
+ } else {
+ // otherwise, use the first state passed in
+ returnedState = states.iterator().next();
+ client.setYarnApplicationState(returnedState);
+ }
+
+ ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
+ YarnApplicationState state =
+ client.waitForApplicationState(applicationId, verbose, states);
+
+ Assert.assertEquals(returnedState, state);
+ }
+
private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout,
boolean expectedTimeoutEnforcement) {
YarnClientImpl client = new YarnClientImpl();