diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 713e75f..b1d8506 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.NMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -151,10 +151,10 @@ protected ContainerManagementProtocol getContainerMgrProxy( final ContainerId containerId) { final NodeId node = masterContainer.getNodeId(); - final InetSocketAddress containerManagerBindAddress = + final InetSocketAddress containerManagerConnectAddress = NetUtils.createSocketAddrForHost(node.getHost(), node.getPort()); - final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. + final YarnRPC rpc = getYarnRPC(); UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(containerId @@ -168,18 +168,15 @@ protected ContainerManagementProtocol getContainerMgrProxy( rmContext.getNMTokenSecretManager().createNMToken( containerId.getApplicationAttemptId(), node, user); currentUser.addToken(ConverterUtils.convertFromYarn(token, - containerManagerBindAddress)); - - return currentUser - .doAs(new PrivilegedAction() { - - @Override - public ContainerManagementProtocol run() { - return (ContainerManagementProtocol) rpc.getProxy( - ContainerManagementProtocol.class, - containerManagerBindAddress, conf); - } - }); + containerManagerConnectAddress)); + + return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, + currentUser, rpc, containerManagerConnectAddress); + } + + @VisibleForTesting + protected YarnRPC getYarnRPC() { + return YarnRPC.create(conf); // TODO: Don't create again and again. } private ContainerLaunchContext createAMContainerLaunchContext( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 249f093..413b02c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -160,14 +160,18 @@ public void waitForState(ApplicationId appId, RMAppState finalState) " for the application " + appId); } } - - public void waitForState(ApplicationAttemptId attemptId, - RMAppAttemptState finalState) + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws Exception { + waitForState(attemptId, finalState, 40000); + } + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) throws Exception { RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); - final int timeoutMsecs = 40000; final int minWaitMsecs = 1000; final int waitMsPerLoop = 10; int loop = 0; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index d4f8e93..d19bfbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -47,14 +48,19 @@ import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -65,6 +71,10 @@ import org.junit.Assert; import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestApplicationMasterLauncher { private static final Log LOG = LogFactory @@ -184,6 +194,62 @@ public void testAMLaunchAndCleanup() throws Exception { rm.stop(); } + @Test + public void testRetriesOnFailures() throws Exception { + final ContainerManagementProtocol mockProxy = + mock(ContainerManagementProtocol.class); + final StartContainersResponse mockResponse = + mock(StartContainersResponse.class); + when(mockProxy.startContainers(any(StartContainersRequest.class))) + .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse); + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRMWithCustomAMLauncher(conf, null) { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(getRMContext()) { + @Override + protected Runnable createRunnableLauncher(RMAppAttempt application, + AMLauncherEventType event) { + return new AMLauncher(context, application, event, getConfig()) { + @Override + protected YarnRPC getYarnRPC() { + YarnRPC mockRpc = mock(YarnRPC.class); + + when(mockRpc.getProxy( + any(Class.class), + any(InetSocketAddress.class), + any(Configuration.class))) + .thenReturn(mockProxy); + return mockRpc; + } + }; + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120); + + RMApp app = rm.submitApp(2000); + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + + // kick the scheduling + nm1.nodeHeartbeat(true); + dispatcher.await(); + + rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500); + } + + @SuppressWarnings("unused") @Test(timeout = 100000)