diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 2b7cd5f..b82c8a1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -466,4 +466,11 @@ public ReservationDeleteResponse deleteReservation( throws YarnException, IOException { return client.getClusterNodeLabels(); } + + @Override + public YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) + throws YarnException, IOException { + return client.waitForApplicationState(appId, verbose, terminateStates); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java index 5ce626c..632d37d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java @@ -664,6 +664,27 @@ public abstract ReservationDeleteResponse deleteReservation( */ @Public @Unstable - public abstract List getClusterNodeLabels() + public abstract List getClusterNodeLabels() throws YarnException, + IOException; + + /** + *

+ * Wait for application until it's state becomes one of + * {terminateStates} OR stopped state {completed, failed, killed} + *

+ * + * @param appId + * Application to wait + * @param verbose + * Print verbose message when wait + * @param terminateStates + * States this method waiting for, pass null in if you only want to + * wait for stopped state - {completed, failed, killed} + * @return The latest state + * @throws YarnException + * @throws IOException + */ + public abstract YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 42dd5cd..7d5e0d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -134,6 +134,9 @@ protected boolean timelineServiceBestEffort; private static final String ROOT = "root"; + private static final Set STOPPED_APPLICATION_STATES = EnumSet + .of(YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED); public YarnClientImpl() { super(YarnClientImpl.class.getName()); @@ -820,4 +823,59 @@ public ReservationDeleteResponse deleteReservation( return rmClient.getClusterNodeLabels( GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); } + + public YarnApplicationState waitForApplicationState(ApplicationId appId, + boolean verbose, Set terminateStates) + throws YarnException, IOException { + YarnApplicationState lastState = null; + while (true) { + // Get application report for our specified appId + ApplicationReport report = getApplicationReport(appId); + YarnApplicationState state = report.getYarnApplicationState(); + + // print verbose message when state changed + if (verbose) { + if (lastState == null) { + LOG.info(String.format("Application %s's state now is [%s]", + appId.toString(), state.name())); + lastState = state; + } else if (lastState != state) { + LOG.info(String.format( + "Application %s's state transfered from [%s] to [%s]", + appId.toString(), lastState.name(), state.name())); + lastState = state; + } + } + + if (STOPPED_APPLICATION_STATES.contains(state) + || (null != terminateStates && terminateStates.contains(state))) { + + // print verbose message if it's stopped state + if (verbose && STOPPED_APPLICATION_STATES.contains(state)) { + // if it's not succeed, print diagnostic message + if (YarnApplicationState.FINISHED != state) { + String diagMsg = report.getDiagnostics(); + if (null != diagMsg) { + LOG.info("Diagnostic message:"); + LOG.info(diagMsg); + } + } + + LOG.info(String.format( + "Application %s's total running time is %f seconds", + appId.toString(), + ((report.getFinishTime() - report.getStartTime()) / 1000.0))); + } + + return state; + } + + // Check app status after asyncApiPollIntervalMillis + try { + Thread.sleep(asyncApiPollIntervalMillis); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 10b9bbb..9792b38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -955,7 +955,49 @@ public void testAsyncAPIPollTimeout() { testAsyncAPIPollTimeoutHelper(0L, true); testAsyncAPIPollTimeoutHelper(1L, true); } + + @Test(timeout = 10000) + public void testWaitForApplicationStates() throws YarnException, IOException { + testWaitForApplicationStateInternal(null, true); + + Set states = + EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.SUBMITTED); + testWaitForApplicationStateInternal(states, false); + } + + private void + testWaitForApplicationStateInternal(Set states, boolean verbose) + throws YarnException, IOException { + Configuration conf = new Configuration(); + + // set a shorter pull interval to make this test finished earlier + final int yarnAppStateChangeInterval = 50; + conf.setLong( + YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, + yarnAppStateChangeInterval); + MockYarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + + YarnApplicationState returnedState = null; + + if (null == states) { + // if user not set, simply use Finished + client.setYarnApplicationState(YarnApplicationState.FINISHED); + returnedState = YarnApplicationState.FINISHED; + } else { + // otherwise, use the first state passed in + returnedState = states.iterator().next(); + client.setYarnApplicationState(returnedState); + } + ApplicationId applicationId = ApplicationId.newInstance(1234, 5); + YarnApplicationState state = + client.waitForApplicationState(applicationId, verbose, states); + + Assert.assertEquals(returnedState, state); + } + private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout, boolean expectedTimeoutEnforcement) { YarnClientImpl client = new YarnClientImpl();