diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java index 20235d0..afd21d0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java @@ -63,7 +63,9 @@ /** *

- * Submit a new application to YARN. + * Submit a new application to YARN. It is a blocking call, such + * that it will not return {@link ApplicationId} until the submitted + * application has been submitted and accepted by the ResourceManager. *

* * @param appContext diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java index eb84b31..6be9052 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java @@ -53,9 +53,11 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.Records; @@ -68,6 +70,7 @@ protected ClientRMProtocol rmClient; protected InetSocketAddress rmAddress; + protected long statePollIntervalMillis; private static final String ROOT = "root"; @@ -90,6 +93,9 @@ public synchronized void init(Configuration conf) { if (this.rmAddress == null) { this.rmAddress = getRmAddress(conf); } + statePollIntervalMillis = conf.getLong( + YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS); super.init(conf); } @@ -131,6 +137,29 @@ public GetNewApplicationResponse getNewApplication() Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); rmClient.submitApplication(request); + + int pollCount = 0; + while (true) { + YarnApplicationState state = + getApplicationReport(applicationId).getYarnApplicationState(); + if (!state.equals(YarnApplicationState.NEW) && + !state.equals(YarnApplicationState.NEW_SAVING)) { + break; + } + // Notify the client through the log every 10 poll, in case the client + // is blocked here too long. + if (++pollCount % 10 == 0) { + LOG.info("Application submission is not finished, " + + "submitted application " + applicationId + + " is still in " + state); + } + try { + Thread.sleep(statePollIntervalMillis); + } catch (InterruptedException ie) { + } + } + + LOG.info("Submitted application " + applicationId + " to ResourceManager" + " at " + rmAddress); return applicationId; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java index 3d7f120..ccfc8d9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java @@ -18,10 +18,25 @@ package org.apache.hadoop.yarn.client; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import junit.framework.Assert; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.client.YarnClient; -import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.util.Records; import org.junit.Test; public class TestYarnClient { @@ -43,4 +58,76 @@ public void testClientStop() { client.start(); client.stop(); } + + @Test (timeout = 30000) + public void testSubmitApplication() { + Configuration conf = new Configuration(); + conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, + 100); // speed up tests + final YarnClient client = new MockYarnClient(); + client.init(conf); + client.start(); + + YarnApplicationState[] exitStates = new YarnApplicationState[] + { + YarnApplicationState.SUBMITTED, + YarnApplicationState.ACCEPTED, + YarnApplicationState.RUNNING, + YarnApplicationState.FINISHED, + YarnApplicationState.FAILED, + YarnApplicationState.KILLED + }; + for (int i = 0; i < exitStates.length; ++i) { + ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + ApplicationId applicationId = Records.newRecord(ApplicationId.class); + applicationId.setClusterTimestamp(System.currentTimeMillis()); + applicationId.setId(i); + when(context.getApplicationId()).thenReturn(applicationId); + ((MockYarnClient) client).setYarnApplicationState(exitStates[i]); + try { + client.submitApplication(context); + } catch (YarnRemoteException e) { + Assert.fail("Exception is not expected."); + } + verify(((MockYarnClient) client).mockReport,times(4 * i + 4)) + .getYarnApplicationState(); + } + + client.stop(); + } + + private static class MockYarnClient extends YarnClientImpl { + private ApplicationReport mockReport; + + public MockYarnClient() { + super(); + } + + @Override + public void start() { + rmClient = mock(ClientRMProtocol.class); + GetApplicationReportResponse mockResponse = + mock(GetApplicationReportResponse.class); + mockReport = mock(ApplicationReport.class); + try{ + when(rmClient.getApplicationReport(any( + GetApplicationReportRequest.class))).thenReturn(mockResponse); + } catch (YarnRemoteException e) { + Assert.fail("Exception is not expected."); + } + when(mockResponse.getApplicationReport()).thenReturn(mockReport); + } + + @Override + public void stop() { + } + + public void setYarnApplicationState(YarnApplicationState state) { + when(mockReport.getYarnApplicationState()).thenReturn( + YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, + YarnApplicationState.NEW_SAVING, state); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1ff85ed..f9b017d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -692,6 +692,19 @@ */ public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false; + //////////////////////////////// + // Other Configs + //////////////////////////////// + + /** + * The interval of the yarn client's querying application state after + * application submission. The unit is millisecond. + */ + public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS = + YARN_PREFIX + "client.app-submission.poll-interval"; + public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS = + 1000; + public YarnConfiguration() { super(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 15775be..599f8a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -708,4 +708,12 @@ $HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$HADOOP_YARN_HOME/share/hadoop/yarn/*,$HADOOP_YARN_HOME/share/hadoop/yarn/lib/* + + + The interval of the yarn client's querying application state + after application submission. The unit is millisecond. + yarn.client.app-submission.poll-interval + 1000 + +