diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index c8595fdba9b..24abf4f4aa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.security.AMSecretKeys; import org.apache.hadoop.yarn.server.webproxy.ProxyCA; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -73,7 +74,7 @@ /** * The launch of the AM itself. */ -public class AMLauncher implements Runnable { +public class AMLauncher implements Runnable, Comparable { private static final Logger LOG = LoggerFactory.getLogger(AMLauncher.class); @@ -140,6 +141,16 @@ private void launch() throws IOException, YarnException { } private void cleanup() throws IOException, YarnException { + // Skip container cleanup for containers on unusable nodes + // to avoid wasting time to communicate with NM. + SchedulerNode schedulerNode = + rmContext.getScheduler().getSchedulerNode(masterContainer.getNodeId()); + if (schedulerNode == null + || schedulerNode.getRMNode().getState().isInactiveState()) { + LOG.info("Skip container cleanup for " + masterContainer.getId() + + " on unusable node " + masterContainer.getNodeId()); + return; + } connect(); ContainerId containerId = masterContainer.getId(); List containerIds = new ArrayList(); @@ -386,4 +397,15 @@ protected void onAMLaunchFailed(ContainerId containerId, Exception ie) { handler.handle(new RMAppAttemptEvent(application .getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message)); } + + @Override + public int compareTo(AMLauncher o) { + if (this.eventType == o.eventType) { + return 0; + } else if (this.eventType == AMLauncherEventType.LAUNCH) { + return -1; + } else { + return 1; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java index 55c35e3a9e0..a30da29a544 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.amlauncher; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -41,9 +42,9 @@ ApplicationMasterLauncher.class); private ThreadPoolExecutor launcherPool; private LauncherThread launcherHandlingThread; - - private final BlockingQueue masterEvents - = new LinkedBlockingQueue(); + + private final BlockingQueue masterEvents = + new PriorityBlockingQueue<>(); protected final RMContext context; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 55b4935ab14..8db9112a7c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -24,8 +24,16 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -453,6 +461,77 @@ public void testSetupTokensWithHTTPS() throws Exception { testSetupTokens(true, conf); } + @Test + public void testLaunchFirstInPriorityQueue() throws InterruptedException { + RMContext mockRMContext = mock(RMContext.class); + Dispatcher mockDispatcer = mock(Dispatcher.class); + when(mockDispatcer.getEventHandler()).thenReturn(null); + when(mockRMContext.getDispatcher()).thenReturn(mockDispatcer); + Configuration mockConf = mock(Configuration.class); + BlockingQueue queue = new PriorityBlockingQueue<>(); + AMLauncher cleanupApp1 = + new AMLauncher(mockRMContext, createMockRMAppAttempt(1), + AMLauncherEventType.CLEANUP, mockConf); + AMLauncher launchApp2 = + new AMLauncher(mockRMContext, createMockRMAppAttempt(2), + AMLauncherEventType.LAUNCH, mockConf); + AMLauncher cleanupApp3 = + new AMLauncher(mockRMContext, createMockRMAppAttempt(3), + AMLauncherEventType.CLEANUP, mockConf); + AMLauncher launchApp4 = + new AMLauncher(mockRMContext, createMockRMAppAttempt(4), + AMLauncherEventType.LAUNCH, mockConf); + AMLauncher cleanupApp5 = + new AMLauncher(mockRMContext, createMockRMAppAttempt(5), + AMLauncherEventType.CLEANUP, mockConf); + AMLauncher launchApp6 = + new AMLauncher(mockRMContext, createMockRMAppAttempt(6), + AMLauncherEventType.LAUNCH, mockConf); + queue.add(cleanupApp1); + queue.add(launchApp2); + queue.add(cleanupApp3); + queue.add(launchApp4); + queue.add(cleanupApp5); + queue.add(launchApp6); + // verify that launch operations should be in front of cleanup operations + Assert.assertEquals(launchApp2, queue.take()); + Assert.assertEquals(launchApp4, queue.take()); + Assert.assertEquals(launchApp6, queue.take()); + } + + @Test + public void testAMLauncherSkipContainerCleanupOnUnusableNode() { + AbstractYarnScheduler scheduler = mock(AbstractYarnScheduler.class); + when(scheduler.getSchedulerNode(any())).thenReturn(null); + RMContext mockRMContext = mock(RMContext.class); + Dispatcher mockDispatcer = mock(Dispatcher.class); + when(mockDispatcer.getEventHandler()).thenReturn(null); + when(mockRMContext.getDispatcher()).thenReturn(mockDispatcer); + when(mockRMContext.getScheduler()).thenReturn(scheduler); + Configuration mockConf = mock(Configuration.class); + RMAppAttempt app = createMockRMAppAttempt(1); + Container mockContainer = mock(Container.class); + NodeId nodeId = NodeId.newInstance("h1", 0); + when(mockContainer.getNodeId()).thenReturn(nodeId); + when(app.getMasterContainer()).thenReturn(mockContainer); + AMLauncher amLauncher = + new AMLauncher(mockRMContext, app, + AMLauncherEventType.CLEANUP, mockConf); + // verify that container cleanup will be skipped before it truly + // communicate with NM, otherwise throw NPE exception since proxy has not + // been initialized. + amLauncher.run(); + } + + private RMAppAttempt createMockRMAppAttempt(int id) { + ApplicationId appId = ApplicationId.newInstance(0, id); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + return attempt; + } + private void testSetupTokens(boolean https, YarnConfiguration conf) throws Exception { MockRM rm = new MockRM(conf);