diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6b2cb7f..94dc474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -298,9 +298,12 @@ public RegisterApplicationMasterResponse registerApplicationMaster( List nmTokens = new ArrayList(); for (Container container : transferredContainers) { try { - nmTokens.add(rmContext.getNMTokenSecretManager() - .createAndGetNMToken(app.getUser(), applicationAttemptId, - container)); + NMToken token = rmContext.getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container); + if (null != token) { + nmTokens.add(token); + } } catch (IllegalArgumentException e) { // if it's a DNS issue, throw UnknowHostException directly and that // will be automatically retried by RMProxy in RPC layer. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6b7b464..66ed414 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -340,4 +340,106 @@ public void testNMTokensRebindOnAMRestart() throws Exception { Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); rm1.stop(); } + + @Test + public void testNMTokensRebindOnAMRestartWithMultipleContainerOnSameNode() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "myname", "myuser", + new HashMap(), false, "default", -1, + null, "MAPREDUCE", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + List containers = new ArrayList(); + // nmTokens keeps track of all the nmTokens issued in the allocate call. + List expectedNMTokens = new ArrayList(); + + // am1 allocate 2 container on nm1. + // first container + while (true) { + AllocateResponse response = + am1.allocate("127.0.0.1", 2000, 2, + new ArrayList()); + nm1.nodeHeartbeat(true); + containers.addAll(response.getAllocatedContainers()); + expectedNMTokens.addAll(response.getNMTokens()); + if (containers.size() == 2) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + // launch the container-2 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + // launch the container-3 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING); + ContainerId containerId3 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING); + + // fail am1 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + MockAM am2 = MockRM.launchAM(app1, rm1, nm1); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am2 get the nm token from am1. + Assert.assertEquals(expectedNMTokens, + registerResponse.getNMTokensFromPreviousAttempts()); + + // am2 allocate 1 container on nm2 + containers = new ArrayList(); + while (true) { + AllocateResponse allocateResponse = + am2.allocate("127.1.1.1", 4000, 1, + new ArrayList()); + nm2.nodeHeartbeat(true); + containers.addAll(allocateResponse.getAllocatedContainers()); + expectedNMTokens.addAll(allocateResponse.getNMTokens()); + if (containers.size() == 1) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId am2ContainerId2 = + ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING); + + // fail am2. + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart am + MockAM am3 = MockRM.launchAM(app1, rm1, nm1); + registerResponse = am3.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am3 get the NM token from both am1 and am2; + List transferredTokens = registerResponse.getNMTokensFromPreviousAttempts(); + Assert.assertEquals(2, transferredTokens.size()); + Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); + rm1.stop(); + } }