diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 363d872..772fdb3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -55,13 +56,15 @@ public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, Map acls, ByteBuffer key, - List containersFromPreviousAttempt, String queue) { + List containersFromPreviousAttempt, String queue, + List nmTokensFromPreviousAttempts) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); response.setClientToAMTokenMasterKey(key); response.setContainersFromPreviousAttempt(containersFromPreviousAttempt); + response.setNMTokensFromPreviousAttempts(nmTokensFromPreviousAttempts); response.setQueue(queue); return response; } @@ -134,6 +137,7 @@ public static RegisterApplicationMasterResponse newInstance( * * @return the list of running containers as viewed by * ResourceManager from previous application attempt + * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempts() */ @Public @Unstable @@ -151,4 +155,29 @@ public static RegisterApplicationMasterResponse newInstance( @Unstable public abstract void setContainersFromPreviousAttempt( List containersFromPreviousAttempt); + + /** + * Get the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + * + * @return the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + * + * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempt() + */ + @Public + @Stable + public abstract List getNMTokensFromPreviousAttempts(); + + /** + * Set the list of NMTokens for communicating with the NMs where the the + * containers of previous application attempts are running. + * + * @param nmTokens + * the list of NMTokens for communicating with the NMs where the + * containers of previous application attempts are running. + */ + @Private + @Unstable + public abstract void setNMTokensFromPreviousAttempts(List nmTokens); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index eff5cd7..265c431 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -46,6 +46,7 @@ message RegisterApplicationMasterResponseProto { repeated ApplicationACLMapProto application_ACLs = 3; repeated ContainerProto containers_from_previous_attempt = 4; optional string queue = 5; + repeated NMTokenProto nm_tokens_from_previous_attempts = 6; } message FinishApplicationMasterRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index ae488c4..557339f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -31,13 +31,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; @@ -57,6 +60,7 @@ private Resource maximumResourceCapability; private Map applicationACLS = null; private List containersFromPreviousAttempt = null; + private List nmTokens = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -111,7 +115,12 @@ private void mergeLocalToBuilder() { addApplicationACLs(); } if (this.containersFromPreviousAttempt != null) { - addRunningContainersToProto(); + addContainersFromPreviousAttemptToProto(); + } + if (nmTokens != null) { + builder.clearNmTokensFromPreviousAttempts(); + Iterable iterable = getTokenProtoIterable(nmTokens); + builder.addAllNmTokensFromPreviousAttempts(iterable); } } @@ -240,12 +249,13 @@ public ByteBuffer getClientToAMTokenMasterKey() { if (this.containersFromPreviousAttempt != null) { return this.containersFromPreviousAttempt; } - initRunningContainersList(); + initContainersPreviousAttemptList(); return this.containersFromPreviousAttempt; } @Override - public void setContainersFromPreviousAttempt(final List containers) { + public void + setContainersFromPreviousAttempt(final List containers) { if (containers == null) { return; } @@ -272,8 +282,10 @@ public void setQueue(String queue) { } } - private void initRunningContainersList() { - RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + + private void initContainersPreviousAttemptList() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; List list = p.getContainersFromPreviousAttemptList(); containersFromPreviousAttempt = new ArrayList(); for (ContainerProto c : list) { @@ -281,7 +293,7 @@ private void initRunningContainersList() { } } - private void addRunningContainersToProto() { + private void addContainersFromPreviousAttemptToProto() { maybeInitBuilder(); builder.clearContainersFromPreviousAttempt(); List list = new ArrayList(); @@ -290,7 +302,68 @@ private void addRunningContainersToProto() { } builder.addAllContainersFromPreviousAttempt(list); } + + + @Override + public List getNMTokensFromPreviousAttempts() { + if (nmTokens != null) { + return nmTokens; + } + initLocalNewNMTokenList(); + return nmTokens; + } + @Override + public void setNMTokensFromPreviousAttempts(final List nmTokens) { + if (nmTokens == null || nmTokens.isEmpty()) { + if (this.nmTokens != null) { + this.nmTokens.clear(); + } + builder.clearNmTokensFromPreviousAttempts(); + return; + } + this.nmTokens = new ArrayList(); + this.nmTokens.addAll(nmTokens); + } + + private synchronized void initLocalNewNMTokenList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNmTokensFromPreviousAttemptsList(); + nmTokens = new ArrayList(); + for (NMTokenProto t : list) { + nmTokens.add(convertFromProtoFormat(t)); + } + } + + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nmTokenList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public NMTokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } @@ -306,4 +379,12 @@ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { private ContainerProto convertToProtoFormat(Container t) { return ((ContainerPBImpl) t).getProto(); } + + private NMTokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl) token).getProto(); + } + + private NMToken convertFromProtoFormat(NMTokenProto proto) { + return new NMTokenPBImpl(proto); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java index bcbf0a3..bc74928 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java @@ -47,7 +47,7 @@ public NMTokenPBImpl(NMTokenProto proto) { this.proto = proto; viaProto = true; } - + @Override public synchronized NodeId getNodeId() { NMTokenProtoOrBuilder p = viaProto ? proto : builder; @@ -139,4 +139,35 @@ private synchronized TokenProto convertToProtoFormat(Token token) { private synchronized Token convertFromProtoFormat(TokenProto proto) { return new TokenPBImpl(proto); } + + @Override + public synchronized int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + result = prime * result + ((token == null) ? 0 : token.hashCode()); + return result; + } + + @Override + public synchronized boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + NMTokenPBImpl other = (NMTokenPBImpl) obj; + if (nodeId == null) { + if (other.nodeId != null) + return false; + } else if (!nodeId.equals(other.nodeId)) + return false; + if (token == null) { + if (other.token != null) + return false; + } else if (!token.equals(other.token)) + return false; + return true; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 0c56134..779bdf5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -281,10 +281,16 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMasterKey(applicationAttemptId).getEncoded())); } - List containerList = + List transferredContainers = ((AbstractYarnScheduler) rScheduler) .getTransferredContainers(applicationAttemptId); - response.setContainersFromPreviousAttempt(containerList); + response.setContainersFromPreviousAttempt(transferredContainers); + response.setNMTokensFromPreviousAttempts(rmContext + .getNMTokenSecretManager().createAndGetNMTokens(app.getUser(), + applicationAttemptId, transferredContainers)); + LOG.info("Application " + appID + " retrieved " + + transferredContainers.size() + " containers from previous" + + " attempts and the corresponding NM tokens."); return response; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index ab31eaf..ad1ed24 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; @@ -190,8 +191,9 @@ public void run() { LOG.debug("Sending NMToken for nodeId : " + container.getNodeId().toString() + " for application attempt : " + appAttemptId.toString()); - Token token = createNMToken(appAttemptId, container.getNodeId(), - applicationSubmitter); + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); NMToken nmToken = NMToken.newInstance(container.getNodeId(), token); nmTokens.add(nmToken); @@ -205,7 +207,33 @@ public void run() { this.readLock.unlock(); } } - + + public List getNMTokensFromPreviousAttempts( + String applicationSubmitter, ApplicationAttemptId appAttemptId, + List containers) { + try { + this.readLock.lock(); + List nmTokens = new ArrayList(); + HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); + for (Container container : containers) { + if (!nodeSet.contains(container.getNodeId())) { + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); + NMToken nmToken = NMToken.newInstance(container.getNodeId(), token); + nmTokens.add(nmToken); + nodeSet.add(container.getNodeId()); + } + } + LOG.info("Application " + appAttemptId.getApplicationId() + " retrieved " + + nmTokens.size() + " NM tokens for the NMs where the" + + " containers of previous attempts were running."); + return nmTokens; + } finally { + this.readLock.unlock(); + } + } + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { try { this.writeLock.lock(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 31035b4..a350b74 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -479,6 +479,7 @@ public static void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { + rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index ca9befd..d64f489 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -24,6 +24,7 @@ import junit.framework.Assert; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -232,4 +234,100 @@ public void testAMRestartWithExistingContainers() throws Exception { rm1.stop(); } + + @Test + public void testNMTokensRebindOnAMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "myname", "myuser", + new HashMap(), false, "default", -1, + null, "MAPREDUCE", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService()); + nm2.registerNode(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + int NUM_CONTAINERS = 1; + List containers = new ArrayList(); + // nmTokens keeps track of all the nmTokens issued in the allocate call. + List expectedNMTokens = new ArrayList(); + + // am1 allocate 1 container on nm1. + while (true) { + AllocateResponse response = + am1.allocate("127.0.0.1", 2000, NUM_CONTAINERS, + new ArrayList()); + nm1.nodeHeartbeat(true); + containers.addAll(response.getAllocatedContainers()); + expectedNMTokens.addAll(response.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + // launch the container + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newInstance(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // fail am1 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am1.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart the am + MockAM am2 = MockRM.launchAM(app1, rm1, nm1); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am2 get the nm token from am1. + Assert.assertEquals(expectedNMTokens, + registerResponse.getNMTokensFromPreviousAttempts()); + + // am2 allocate 1 container on nm2 + containers = new ArrayList(); + while (true) { + AllocateResponse allocateResponse = + am2.allocate("127.1.1.1", 4000, NUM_CONTAINERS, + new ArrayList()); + nm2.nodeHeartbeat(true); + containers.addAll(allocateResponse.getAllocatedContainers()); + expectedNMTokens.addAll(allocateResponse.getNMTokens()); + if (containers.size() == NUM_CONTAINERS) { + break; + } + Thread.sleep(200); + System.out.println("Waiting for container to be allocated."); + } + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING); + ContainerId am2ContainerId2 = + ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING); + + // fail am2. + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + + // restart am + MockAM am3 = MockRM.launchAM(app1, rm1, nm1); + registerResponse = am3.registerAppAttempt(); + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // check am3 get the NM token from both am1 and am2; + List transferredTokens = registerResponse.getNMTokensFromPreviousAttempts(); + Assert.assertEquals(2, transferredTokens.size()); + Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens)); + rm1.stop(); + } }