diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 8da0d95..01fd916 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; @@ -176,5 +177,24 @@ @Private @Unstable public void setPreemptionMessage(PreemptionMessage request); + + @Public + @Stable + public void setNMTokens(List nmTokens); + + /** + * Get the list of NMTokens required for communicating with NM. New NMTokens + * issued only if + * 1) AM is receiving first container on underlying NodeManager. + * OR + * 2) NMToken master key rolled over in ResourceManager and AM is getting new + * container on the same underlying NodeManager. + * AM will receive one NMToken per NM irrespective of the number of containers + * issued on same NM. AM is expected to store these tokens until issued a + * new token for the same NM. + */ + @Public + @Stable + public List getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index dac8c73..9fc51cd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -23,15 +23,19 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProtoOrBuilder; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; @@ -52,6 +56,7 @@ Resource limit; private List allocatedContainers = null; + private List nmTokens = null; private List completedContainersStatuses = null; private List updatedNodes = null; @@ -81,6 +86,11 @@ private synchronized void mergeLocalToBuilder() { getProtoIterable(this.allocatedContainers); builder.addAllAllocatedContainers(iterable); } + if (nmTokens != null) { + builder.clearNmTokens(); + Iterable iterable = getTokenProtoIterable(nmTokens); + builder.addAllNmTokens(iterable); + } if (this.completedContainersStatuses != null) { builder.clearCompletedContainerStatuses(); Iterable iterable = @@ -211,6 +221,24 @@ public synchronized void setCompletedContainersStatuses( } @Override + public void setNMTokens(List nmTokens) { + if (nmTokens == null || nmTokens.isEmpty()) { + this.nmTokens.clear(); + builder.clearNmTokens(); + return; + } + // Implementing it as an append rather than set for consistency + initLocalNewNMTokenList(); + nmTokens.addAll(nmTokens); + } + + @Override + public List getNMTokens() { + initLocalNewNMTokenList(); + return nmTokens; + } + + @Override public synchronized int getNumClusterNodes() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; return p.getNumClusterNodes(); @@ -274,6 +302,17 @@ private synchronized void initLocalNewContainerList() { } } + private synchronized void initLocalNewNMTokenList() { + if (nmTokens != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNmTokensList(); + for (TokenProto t : list) { + nmTokens.add(convertFromProtoFormat(t)); + } + } + private synchronized Iterable getProtoIterable( final List newContainersList) { maybeInitBuilder(); @@ -305,6 +344,35 @@ public synchronized void remove() { }; } + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nmTokenList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public TokenProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + private synchronized Iterable getContainerStatusProtoIterable( final List newContainersList) { @@ -427,4 +495,12 @@ private synchronized PreemptionMessagePBImpl convertFromProtoFormat(PreemptionMe private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessage r) { return ((PreemptionMessagePBImpl)r).getProto(); } + + private synchronized TokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl)token).getProto(); + } + + private synchronized NMToken convertFromProtoFormat(TokenProto proto) { + return new NMTokenPBImpl(proto); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java new file mode 100644 index 0000000..1c8d0cb --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; + +/** + * NMToken is the security token used by the framework to verify authenticity + * of ApplicationMaster or any container trying to communicate with NodeManager. + * + *

+ * The ResourceManager issues an NMToken to AM on allocate call if + * 1) AM is receiving new container for the first time on underlying NodeManager. + * OR + * 2) AM is receiving new container on underlying NodeManager after the NMToken + * master key is rolled over in ResourceManager. + *

+ * + *

+ * ApplicationMaster needs to store the NMTokens for communicating with NM. NM + * uses these tokens for authenticating AM-NM communication. If AM receives new + * token for same NM then it should discard the older token and use latest token + * for future communication. + *

+ * + * @see {@link org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse}) + */ +@Public +@Stable + +public interface NMToken extends Token { + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java new file mode 100644 index 0000000..51d2102 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.records.NMToken; + + +public class NMTokenPBImpl extends TokenPBImpl implements NMToken{ + + public NMTokenPBImpl() { + super(); + } + + public NMTokenPBImpl(TokenProto p) { + super(p); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 6ac0274..480fe16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -67,6 +67,7 @@ message AllocateResponseProto { repeated NodeReportProto updated_nodes = 6; optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; + repeated hadoop.common.TokenProto nm_tokens = 9; } message PreemptionMessageProto {