diff --git hadoop-yarn-project/CHANGES.txt hadoop-yarn-project/CHANGES.txt index a39face..a785bdd 100644 --- hadoop-yarn-project/CHANGES.txt +++ hadoop-yarn-project/CHANGES.txt @@ -85,6 +85,9 @@ Release 2.7.2 - UNRELEASED YARN-3624. ApplicationHistoryServer should not reverse the order of the filters it gets. (Mit Desai via xgong) + YARN-4180. AMLauncher does not retry on failures when talking to NM. + (adhoot) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 0dd9ba1..f5ecbaa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.NMProxy; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -150,10 +150,10 @@ protected ContainerManagementProtocol getContainerMgrProxy( final ContainerId containerId) { final NodeId node = masterContainer.getNodeId(); - final InetSocketAddress containerManagerBindAddress = + final InetSocketAddress containerManagerConnectAddress = NetUtils.createSocketAddrForHost(node.getHost(), node.getPort()); - final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. + final YarnRPC rpc = getYarnRPC(); UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(containerId @@ -167,18 +167,15 @@ protected ContainerManagementProtocol getContainerMgrProxy( rmContext.getNMTokenSecretManager().createNMToken( containerId.getApplicationAttemptId(), node, user); currentUser.addToken(ConverterUtils.convertFromYarn(token, - containerManagerBindAddress)); + containerManagerConnectAddress)); - return currentUser - .doAs(new PrivilegedAction() { + return NMProxy.createNMProxy(conf, ContainerManagementProtocol.class, + currentUser, rpc, containerManagerConnectAddress); + } - @Override - public ContainerManagementProtocol run() { - return (ContainerManagementProtocol) rpc.getProxy( - ContainerManagementProtocol.class, - containerManagerBindAddress, conf); - } - }); + @VisibleForTesting + protected YarnRPC getYarnRPC() { + return YarnRPC.create(conf); // TODO: Don't create again and again. } private ContainerLaunchContext createAMContainerLaunchContext( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 9f7bd88..18558e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -147,10 +147,15 @@ public void waitForState(ApplicationId appId, RMAppState finalState) Assert.assertEquals("App state is not correct (timedout)", finalState, app.getState()); } - - public void waitForState(ApplicationAttemptId attemptId, - RMAppAttemptState finalState) + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState) throws Exception { + waitForState(attemptId, finalState, 40000); + } + + public void waitForState(ApplicationAttemptId attemptId, + RMAppAttemptState finalState, int timeoutMsecs) throws Exception { RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId()); Assert.assertNotNull("app shouldn't be null", app); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 11cd1fd..e54caa4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -26,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -43,11 +45,18 @@ import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; +import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -58,6 +67,10 @@ import org.junit.Assert; import org.junit.Test; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestApplicationMasterLauncher { private static final Log LOG = LogFactory @@ -177,8 +190,62 @@ public void testAMLaunchAndCleanup() throws Exception { am.waitForState(RMAppAttemptState.FINISHED); rm.stop(); } - - + + @Test + public void testRetriesOnFailures() throws Exception { + final ContainerManagementProtocol mockProxy = + mock(ContainerManagementProtocol.class); + final StartContainersResponse mockResponse = + mock(StartContainersResponse.class); + when(mockProxy.startContainers(any(StartContainersRequest.class))) + .thenThrow(new NMNotYetReadyException("foo")).thenReturn(mockResponse); + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRMWithCustomAMLauncher(conf, null) { + @Override + protected ApplicationMasterLauncher createAMLauncher() { + return new ApplicationMasterLauncher(getRMContext()) { + @Override + protected Runnable createRunnableLauncher(RMAppAttempt application, + AMLauncherEventType event) { + return new AMLauncher(context, application, event, getConfig()) { + @Override + protected YarnRPC getYarnRPC() { + YarnRPC mockRpc = mock(YarnRPC.class); + + when(mockRpc.getProxy( + any(Class.class), + any(InetSocketAddress.class), + any(Configuration.class))) + .thenReturn(mockProxy); + return mockRpc; + } + }; + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120); + + RMApp app = rm.submitApp(2000); + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + + // kick the scheduling + nm1.nodeHeartbeat(true); + dispatcher.await(); + + rm.waitForState(appAttemptId, RMAppAttemptState.LAUNCHED, 500); + } + @SuppressWarnings("unused") @Test(timeout = 100000) public void testallocateBeforeAMRegistration() throws Exception {