diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1a49645..f77231f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -952,6 +952,17 @@ YARN_PREFIX + "client.application-client-protocol.poll-interval-ms"; public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS = 200; + + /** + * The timeout that the yarn client library uses on RPCs made to the + * ResourceManager. The value is specified in milliseconds. A negative value + * indicates no timeout will be enforced. + */ + public static final String YARN_CLIENT_RPC_TIMEOUT_MS = + YARN_PREFIX + "client.rpc-timeout-ms"; + public static final long DEFAULT_YARN_CLIENT_RPC_TIMEOUT_MS = + -1; + /** * Max number of threads in NMClientAsync to process container management * events diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index a5ff9f6..6c0547b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -80,6 +80,7 @@ protected ApplicationClientProtocol rmClient; protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; + private long rpcTimeoutMillis; private static final String ROOT = "root"; @@ -100,6 +101,9 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); } + rpcTimeoutMillis = conf.getLong( + YarnConfiguration.YARN_CLIENT_RPC_TIMEOUT_MS, + YarnConfiguration.DEFAULT_YARN_CLIENT_RPC_TIMEOUT_MS); super.serviceInit(conf); } @@ -152,11 +156,14 @@ public YarnClientApplication createApplication() rmClient.submitApplication(request); int pollCount = 0; + long startTime = System.currentTimeMillis(); + while (true) { YarnApplicationState state = getApplicationReport(applicationId).getYarnApplicationState(); if (!state.equals(YarnApplicationState.NEW) && !state.equals(YarnApplicationState.NEW_SAVING)) { + LOG.info("Submitted application " + applicationId); break; } // Notify the client through the log every 10 poll, in case the client @@ -166,13 +173,21 @@ public YarnClientApplication createApplication() "submitted application " + applicationId + " is still in " + state); } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (rpcTimeoutMillis > 0 && elapsedMillis >= rpcTimeoutMillis) { + throw new YarnException("Timed out waiting for application submission " + + "for application " + applicationId); + } + try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for application submission for " + + "application " + applicationId); } } - LOG.info("Submitted application " + applicationId); return applicationId; } @@ -185,14 +200,24 @@ public void killApplication(ApplicationId applicationId) try { int pollCount = 0; + long startTime = System.currentTimeMillis(); + while (true) { KillApplicationResponse response = rmClient.forceKillApplication(request); if (response.getIsKillCompleted()) { + LOG.info("Killed application " + applicationId); break; } + + long elapsedMillis = System.currentTimeMillis() - startTime; + if (rpcTimeoutMillis > 0 && elapsedMillis >= rpcTimeoutMillis) { + throw new YarnException("Timed out waiting to kill application " + + applicationId); + } + if (++pollCount % 10 == 0) { - LOG.info("Watiting for application " + applicationId + LOG.info("Waiting for application " + applicationId + " to be killed."); } Thread.sleep(asyncApiPollIntervalMillis); @@ -201,7 +226,6 @@ public void killApplication(ApplicationId applicationId) LOG.error("Interrupted while waiting for application " + applicationId + " to be killed."); } - LOG.info("Killed application " + applicationId); } @Override