diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 2755a9d..2823f11 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -119,6 +120,7 @@ public static RegisterApplicationMasterResponse newInstance( * * @return the list of running containers as viewed by * ResourceManager from previous application attempt + * @see RegisterApplicationMasterResponse#getNMTokensFromPreviousAttempt() */ @Public @Unstable @@ -136,4 +138,29 @@ public static RegisterApplicationMasterResponse newInstance( @Unstable public abstract void setContainersFromPreviousAttempt( List containersFromPreviousAttempt); + + /** + * Get the list of NMTokens for communicating with the NMs where the previous + * application attempt's containers are running. + * + * @return the list of NMTokens for communicating with the NMs where the + * previous application attempt's containers are running. + * + * @see RegisterApplicationMasterResponse#getContainersFromPreviousAttempt() + */ + @Public + @Stable + public abstract List getNMTokensFromPreviousAttempt(); + + /** + * Set the list of NMTokens for communicating with the NMs where the previous + * application attempt's containers are running. + * + * @param nmTokens + * the list of NMTokens for communicating with the NMs where the + * previous application attempt's containers are running. + */ + @Private + @Unstable + public abstract void setNMTokensFromPreviousAttempt(List nmTokens); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index dc97eec..fbf230f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -45,6 +45,7 @@ message RegisterApplicationMasterResponseProto { optional bytes client_to_am_token_master_key = 2; repeated ApplicationACLMapProto application_ACLs = 3; repeated ContainerProto containers_from_previous_attempt = 4; + repeated NMTokenProto nm_tokens_from_previous_attempt = 5; } message FinishApplicationMasterRequestProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 0e593d3..3bf3deb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -31,13 +31,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; @@ -57,6 +60,7 @@ private Resource maximumResourceCapability; private Map applicationACLS = null; private List containersFromPreviousAttempt = null; + private List nmTokens = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -111,7 +115,12 @@ private void mergeLocalToBuilder() { addApplicationACLs(); } if (this.containersFromPreviousAttempt != null) { - addRunningContainersToProto(); + addContainersFromPreviousAttemptToProto(); + } + if (nmTokens != null) { + builder.clearNmTokensFromPreviousAttempt(); + Iterable iterable = getTokenProtoIterable(nmTokens); + builder.addAllNmTokensFromPreviousAttempt(iterable); } } @@ -240,12 +249,13 @@ public ByteBuffer getClientToAMTokenMasterKey() { if (this.containersFromPreviousAttempt != null) { return this.containersFromPreviousAttempt; } - initRunningContainersList(); + initContainersPreviousAttemptList(); return this.containersFromPreviousAttempt; } @Override - public void setContainersFromPreviousAttempt(final List containers) { + public void + setContainersFromPreviousAttempt(final List containers) { if (containers == null) { return; } @@ -253,8 +263,10 @@ public void setContainersFromPreviousAttempt(final List containers) { this.containersFromPreviousAttempt.addAll(containers); } - private void initRunningContainersList() { - RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + + private void initContainersPreviousAttemptList() { + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; List list = p.getContainersFromPreviousAttemptList(); containersFromPreviousAttempt = new ArrayList(); for (ContainerProto c : list) { @@ -262,7 +274,7 @@ private void initRunningContainersList() { } } - private void addRunningContainersToProto() { + private void addContainersFromPreviousAttemptToProto() { maybeInitBuilder(); builder.clearContainersFromPreviousAttempt(); List list = new ArrayList(); @@ -271,7 +283,68 @@ private void addRunningContainersToProto() { } builder.addAllContainersFromPreviousAttempt(list); } + + + @Override + public List getNMTokensFromPreviousAttempt() { + if (nmTokens != null) { + return nmTokens; + } + initLocalNewNMTokenList(); + return nmTokens; + } + @Override + public void setNMTokensFromPreviousAttempt(final List nmTokens) { + if (nmTokens == null || nmTokens.isEmpty()) { + if (this.nmTokens != null) { + this.nmTokens.clear(); + } + builder.clearNmTokensFromPreviousAttempt(); + return; + } + this.nmTokens = new ArrayList(); + this.nmTokens.addAll(nmTokens); + } + + private synchronized void initLocalNewNMTokenList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNmTokensFromPreviousAttemptList(); + nmTokens = new ArrayList(); + for (NMTokenProto t : list) { + nmTokens.add(convertFromProtoFormat(t)); + } + } + + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nmTokenList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public NMTokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } @@ -287,4 +360,12 @@ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { private ContainerProto convertToProtoFormat(Container t) { return ((ContainerPBImpl) t).getProto(); } + + private NMTokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl) token).getProto(); + } + + private NMToken convertFromProtoFormat(NMTokenProto proto) { + return new NMTokenPBImpl(proto); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 761bdb1..5f306be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -274,10 +274,14 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getMasterKey(applicationAttemptId).getEncoded())); } - List containerList = + List transferredContainers = ((AbstractYarnScheduler) rScheduler) .getTransferredContainers(applicationAttemptId); - response.setContainersFromPreviousAttempt(containerList); + response.setContainersFromPreviousAttempt(transferredContainers); + response.setNMTokensFromPreviousAttempt(rmContext + .getNMTokenSecretManager().createAndGetNMTokens(app.getUser(), + applicationAttemptId, transferredContainers)); + return response; } }