diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java index f75358a..94577ed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -77,7 +79,7 @@ private Configuration conf; // Handle to talk to the Resource Manager/Applications Manager - private YarnClient rmClient; + protected YarnClient rmClient; // Application master specific info to register a new Application with RM/ASM private String appName = ""; @@ -92,6 +94,7 @@ private volatile boolean amCompleted = false; + private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000; /** * @param args * Command line arguments @@ -340,16 +343,17 @@ public boolean run() throws IOException, YarnException { rmClient.submitApplication(appContext); // Monitor the application to wait for launch state - ApplicationReport appReport = monitorApplication(appId, - EnumSet.of(YarnApplicationState.ACCEPTED)); - ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId(); - LOG.info("Launching application with id: " + attemptId); + ApplicationAttemptReport attemptReport = + monitorCurrentAppAttempt(appId, + YarnApplicationAttemptState.LAUNCHED); + ApplicationAttemptId attemptId = attemptReport.getApplicationAttemptId(); + LOG.info("Launching AM with application attempt id " + attemptId); // launch AM launchAM(attemptId); // Monitor the application for end state - appReport = monitorApplication(appId, EnumSet.of( + ApplicationReport appReport = monitorApplication(appId, EnumSet.of( YarnApplicationState.KILLED, YarnApplicationState.FAILED, YarnApplicationState.FINISHED)); @@ -376,6 +380,43 @@ public boolean run() throws IOException, YarnException { } } + private ApplicationAttemptReport monitorCurrentAppAttempt( + ApplicationId appId, YarnApplicationAttemptState attemptState) + throws YarnException, IOException { + long startTime = System.currentTimeMillis(); + ApplicationAttemptId attemptId = null; + while (true) { + if (attemptId == null) { + attemptId = + rmClient.getApplicationReport(appId) + .getCurrentApplicationAttemptId(); + } + ApplicationAttemptReport attemptReport = null; + if (attemptId != null) { + attemptReport = rmClient.getApplicationAttemptReport(attemptId); + if (attemptState.equals(attemptReport.getYarnApplicationAttemptState())) { + return attemptReport; + } + } + LOG.info("Current attempt state of " + appId + " is " + attemptReport == null + ? "N/A" : attemptReport.getYarnApplicationAttemptState() + + ", waiting for current attempt to reach " + attemptState); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for current attempt of " + appId + + " to reach " + attemptState); + } + if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { + String errmsg = + "Timeout for waiting current attempt of " + appId + " to reach " + + attemptState; + LOG.error(errmsg); + throw new RuntimeException(errmsg); + } + } + } + /** * Monitor the submitted application for completion. Kill application if time * expires. @@ -391,7 +432,6 @@ private ApplicationReport monitorApplication(ApplicationId appId, IOException { long foundAMCompletedTime = 0; - final int timeToWaitMS = 10000; StringBuilder expectedFinalState = new StringBuilder(); boolean first = true; for (YarnApplicationState state : finalState) { @@ -438,8 +478,8 @@ private ApplicationReport monitorApplication(ApplicationId appId, if (foundAMCompletedTime == 0) { foundAMCompletedTime = System.currentTimeMillis(); } else if ((System.currentTimeMillis() - foundAMCompletedTime) - > timeToWaitMS) { - LOG.warn("Waited " + timeToWaitMS/1000 + > AM_STATE_WAIT_TIMEOUT_MS) { + LOG.warn("Waited " + AM_STATE_WAIT_TIMEOUT_MS/1000 + " seconds after process completed for AppReport" + " to reach desired final state. Not waiting anymore." + "CurrentState = " + state diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java index a4c7832..08cacee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java @@ -28,8 +28,6 @@ import java.io.OutputStream; import java.net.URL; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,11 +36,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -122,7 +124,7 @@ private static String getTestRuntimeClasspath() { } @Test(timeout=30000) - public void testDSShell() throws Exception { + public void testUMALauncher() throws Exception { String classpath = getTestRuntimeClasspath(); String javaHome = System.getenv("JAVA_HOME"); if (javaHome == null) { @@ -141,8 +143,18 @@ public void testDSShell() throws Exception { + " success" }; LOG.info("Initializing Launcher"); - UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration( - yarnCluster.getConfig())); + UnmanagedAMLauncher launcher = + new UnmanagedAMLauncher(new Configuration(yarnCluster.getConfig())) { + public void launchAM(ApplicationAttemptId attemptId) + throws IOException, YarnException { + YarnApplicationAttemptState attemptState = + rmClient.getApplicationAttemptReport(attemptId) + .getYarnApplicationAttemptState(); + Assert.assertTrue(attemptState + .equals(YarnApplicationAttemptState.LAUNCHED)); + super.launchAM(attemptId); + } + }; boolean initSuccess = launcher.init(args); Assert.assertTrue(initSuccess); LOG.info("Running Launcher"); @@ -154,7 +166,7 @@ public void testDSShell() throws Exception { } @Test(timeout=30000) - public void testDSShellError() throws Exception { + public void testUMALauncherError() throws Exception { String classpath = getTestRuntimeClasspath(); String javaHome = System.getenv("JAVA_HOME"); if (javaHome == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 697a180..fd03373 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1646,11 +1646,14 @@ public ApplicationAttemptReport createApplicationAttemptReport() { this.readLock.lock(); ApplicationAttemptReport attemptReport = null; try { + // AM container maybe not yet allocated. and also unmangedAM doesn't have + // am container. + ContainerId amId = + masterContainer == null ? null : masterContainer.getId(); attemptReport = ApplicationAttemptReport.newInstance(this .getAppAttemptId(), this.getHost(), this.getRpcPort(), this .getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState - .valueOf(this.getState().toString()), this.getMasterContainer() - .getId()); + .valueOf(this.getState().toString()), amId); } finally { this.readLock.unlock(); }