diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 18ce01a..844924e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -29,10 +29,10 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; /** @@ -67,7 +67,7 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt) { + PreemptionMessage preempt, List nmTokens) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); @@ -77,6 +77,7 @@ public static AllocateResponse newInstance(int responseId, response.setAvailableResources(availResources); response.setAMCommand(command); response.setPreemptionMessage(preempt); + response.setNMTokens(nmTokens); return response; } @@ -202,7 +203,7 @@ public static AllocateResponse newInstance(int responseId, @Public @Stable - public abstract void setNMTokens(List nmTokens); + public abstract void setNMTokens(List nmTokens); /** * Get the list of NMTokens required for communicating with NM. New NMTokens @@ -217,6 +218,6 @@ public static AllocateResponse newInstance(int responseId, */ @Public @Stable - public abstract List getNMTokens(); + public abstract List getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 311bba5..112080f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -23,21 +23,20 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; @@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.util.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; public class AllocateResponsePBImpl extends AllocateResponse { @@ -56,7 +56,7 @@ Resource limit; private List allocatedContainers = null; - private List nmTokens = null; + private List nmTokens = null; private List completedContainersStatuses = null; private List updatedNodes = null; @@ -108,7 +108,7 @@ private synchronized void mergeLocalToBuilder() { } if (nmTokens != null) { builder.clearNmTokens(); - Iterable iterable = getTokenProtoIterable(nmTokens); + Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokens(iterable); } if (this.completedContainersStatuses != null) { @@ -248,9 +248,11 @@ public synchronized void setCompletedContainersStatuses( } @Override - public synchronized void setNMTokens(List nmTokens) { + public synchronized void setNMTokens(List nmTokens) { if (nmTokens == null || nmTokens.isEmpty()) { - this.nmTokens.clear(); + if (this.nmTokens != null) { + this.nmTokens.clear(); + } builder.clearNmTokens(); return; } @@ -260,7 +262,7 @@ public synchronized void setNMTokens(List nmTokens) { } @Override - public synchronized List getNMTokens() { + public synchronized List getNMTokens() { initLocalNewNMTokenList(); return nmTokens; } @@ -334,9 +336,9 @@ private synchronized void initLocalNewNMTokenList() { return; } AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getNmTokensList(); - nmTokens = new ArrayList(); - for (TokenProto t : list) { + List list = p.getNmTokensList(); + nmTokens = new ArrayList(); + for (NMTokenProto t : list) { nmTokens.add(convertFromProtoFormat(t)); } } @@ -372,15 +374,15 @@ public synchronized void remove() { }; } - private synchronized Iterable getTokenProtoIterable( - final List nmTokenList) { + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { maybeInitBuilder(); - return new Iterable() { + return new Iterable() { @Override - public synchronized Iterator iterator() { - return new Iterator() { + public synchronized Iterator iterator() { + return new Iterator() { - Iterator iter = nmTokenList.iterator(); + Iterator iter = nmTokenList.iterator(); @Override public boolean hasNext() { @@ -388,7 +390,7 @@ public boolean hasNext() { } @Override - public TokenProto next() { + public NMTokenProto next() { return convertToProtoFormat(iter.next()); } @@ -524,11 +526,11 @@ private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessa return ((PreemptionMessagePBImpl)r).getProto(); } - private synchronized TokenProto convertToProtoFormat(Token token) { - return ((TokenPBImpl)token).getProto(); + private synchronized NMTokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl)token).getProto(); } - private synchronized Token convertFromProtoFormat(TokenProto proto) { - return new TokenPBImpl(proto); + private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) { + return new NMTokenPBImpl(proto); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java new file mode 100644 index 0000000..a901447 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.util.Records; + +/** + * NMToken is returned by RM on AllocateResponse. + */ +public abstract class NMToken { + + @Public + @Stable + public abstract NodeId getNodeId(); + + @Public + @Stable + public abstract void setNodeId(NodeId nodeId); + + @Public + @Stable + public abstract Token getToken(); + + @Public + @Stable + public abstract void setToken(Token token); + + @Private + public static NMToken newInstance(NodeId nodeId, Token token) { + NMToken nmToken = Records.newRecord(NMToken.class); + nmToken.setNodeId(nodeId); + nmToken.setToken(token); + return nmToken; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java new file mode 100644 index 0000000..28876aa --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProtoOrBuilder; + + +public class NMTokenPBImpl extends NMToken{ + + NMTokenProto proto = NMTokenProto.getDefaultInstance(); + NMTokenProto.Builder builder = null; + boolean viaProto = false; + + private Token token = null; + private NodeId nodeId = null; + + public NMTokenPBImpl() { + builder = NMTokenProto.newBuilder(); + } + + public NMTokenPBImpl(NMTokenProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public synchronized NodeId getNodeId() { + NMTokenProtoOrBuilder p = viaProto ? proto : builder; + if (this.nodeId != null) { + return nodeId; + } + if (!p.hasNodeId()) { + return null; + } + this.nodeId = convertFromProtoFormat(p.getNodeId()); + return nodeId; + } + + @Override + public synchronized void setNodeId(NodeId nodeId) { + maybeInitBuilder(); + if (nodeId == null) { + builder.clearNodeId(); + } + this.nodeId = nodeId; + } + + @Override + public synchronized Token getToken() { + NMTokenProtoOrBuilder p = viaProto ? proto : builder; + if (this.token != null) { + return this.token; + } + if (!p.hasToken()) { + return null; + } + this.token = convertFromProtoFormat(p.getToken()); + return token; + } + + @Override + public synchronized void setToken(Token token) { + maybeInitBuilder(); + if (token == null) { + builder.clearToken(); + } + this.token = token; + } + + public synchronized NMTokenProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.nodeId != null) { + builder.setNodeId(convertToProtoFormat(nodeId)); + } + if (this.token != null) { + builder.setToken(convertToProtoFormat(token)); + } + } + + private synchronized void maybeInitBuilder() { + if(viaProto || builder == null) { + builder = NMTokenProto.newBuilder(proto); + } + viaProto = false; + } + + private synchronized NodeId convertFromProtoFormat(NodeIdProto p) { + return new NodeIdPBImpl(p); + } + + private synchronized NodeIdProto convertToProtoFormat(NodeId nodeId) { + return ((NodeIdPBImpl)nodeId).getProto(); + } + + private synchronized TokenProto convertToProtoFormat(Token token) { + return ((TokenPBImpl)token).getProto(); + } + + private synchronized Token convertFromProtoFormat(TokenProto proto) { + return new TokenPBImpl(proto); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8e81f21..d697fb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -59,6 +59,11 @@ message AllocateRequestProto { optional float progress = 6; } +message NMTokenProto { + optional NodeIdProto nodeId = 1; + optional hadoop.common.TokenProto token = 2; +} + message AllocateResponseProto { optional AMCommandProto a_m_command = 1; optional int32 response_id = 2; @@ -68,7 +73,7 @@ message AllocateResponseProto { repeated NodeReportProto updated_nodes = 6; optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; - repeated hadoop.common.TokenProto nm_tokens = 9; + repeated NMTokenProto nm_tokens = 9; } ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java index 71c59f9..5a0bc50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -21,15 +21,17 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.Service; @@ -208,4 +210,13 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String resourceName, Resource capability); + /** + * It returns the NMToken received on allocate call. It will not communicate + * with RM to get NMTokens. On allocate call whenever we receive new token + * along with container AMRMClient will cache this NMToken per node manager. + * This map returned should be shared with any application which is + * communicating with NodeManager (ex. NMClient) using NMTokens. If a new + * NMToken is received for the same node manager then it will be replaced. + */ + public ConcurrentMap getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java index 47a405c..425c984 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -260,6 +262,19 @@ public Resource getClusterAvailableResources() { public int getClusterNodeCount() { return client.getClusterNodeCount(); } + + /** + * It returns the NMToken received on allocate call. It will not communicate + * with RM to get NMTokens. On allocate call whenever we receive new token + * along with new container AMRMClientAsync will cache this NMToken per node + * manager. This map returned should be shared with any application which is + * communicating with NodeManager (ex. NMClient / NMClientAsync) using + * NMTokens. If a new NMToken is received for the same node manager + * then it will be replaced. + */ + public ConcurrentMap getNMTokens() { + return client.getNMTokens(); + } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index 86718b0..ac392d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -33,9 +33,11 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; @@ -49,9 +51,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -61,6 +65,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + // TODO check inputs for null etc. YARN-654 @Unstable @@ -73,6 +79,7 @@ RecordFactoryProvider.getRecordFactory(null); private int lastResponseId = 0; + private ConcurrentHashMap nmTokens; protected AMRMProtocol rmClient; protected final ApplicationAttemptId appAttemptId; @@ -148,6 +155,7 @@ static boolean canFit(Resource arg0, Resource arg1) { public AMRMClientImpl(ApplicationAttemptId appAttemptId) { super(AMRMClientImpl.class.getName()); this.appAttemptId = appAttemptId; + this.nmTokens = new ConcurrentHashMap(); } @Override @@ -238,6 +246,9 @@ public AllocateResponse allocate(float progressIndicator) clusterNodeCount = allocateResponse.getNumClusterNodes(); lastResponseId = allocateResponse.getResponseId(); clusterAvailableResources = allocateResponse.getAvailableResources(); + if (!allocateResponse.getNMTokens().isEmpty()) { + populateNMTokens(allocateResponse); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -265,6 +276,20 @@ public AllocateResponse allocate(float progressIndicator) return allocateResponse; } + @Private + @VisibleForTesting + protected void populateNMTokens(AllocateResponse allocateResponse) { + for (NMToken token : allocateResponse.getNMTokens()) { + String nodeId = token.getNodeId().toString(); + if (nmTokens.containsKey(nodeId)) { + LOG.debug("Replacing token for : " + nodeId); + } else { + LOG.debug("Received new token for : " + nodeId); + } + nmTokens.put(nodeId, token.getToken()); + } + } + @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, @@ -512,4 +537,8 @@ private void decResourceRequest(Priority priority, } } + @Override + public ConcurrentHashMap getNMTokens() { + return nmTokens; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java index 6f3bbe1..0db5eab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -25,9 +25,14 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.AMRMProtocol; @@ -49,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest; @@ -437,6 +443,11 @@ private void testAllocation(final AMRMClientImpl amClient) int allocatedContainerCount = 0; int iterationsLeft = 2; Set releases = new TreeSet(); + + ConcurrentHashMap nmTokens = amClient.getNMTokens(); + Assert.assertEquals(0, nmTokens.size()); + HashMap receivedNMTokens = new HashMap(); + while (allocatedContainerCount < containersRequestedAny && iterationsLeft-- > 0) { AllocateResponse allocResponse = amClient.allocate(0.1f); @@ -450,12 +461,32 @@ private void testAllocation(final AMRMClientImpl amClient) releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); } + Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size()); + Iterator nodeI = nmTokens.keySet().iterator(); + while (nodeI.hasNext()) { + String nodeId = nodeI.next(); + if (!receivedNMTokens.containsKey(nodeId)) { + receivedNMTokens.put(nodeId, nmTokens.get(nodeId)); + } else { + Assert.fail("Received token again for : " + nodeId); + } + } + nodeI = receivedNMTokens.keySet().iterator(); + while (nodeI.hasNext()) { + nmTokens.remove(nodeI.next()); + } + if(allocatedContainerCount < containersRequestedAny) { // sleep to let NM's heartbeat to RM and trigger allocations sleep(1000); } } - + + Assert.assertEquals(0, amClient.getNMTokens().size()); + // Should receive atleast 1 token + Assert.assertTrue(receivedNMTokens.size() > 0 + && receivedNMTokens.size() <= nodeCount); + assertTrue(allocatedContainerCount == containersRequestedAny); assertTrue(amClient.release.size() == 2); assertTrue(amClient.ask.size() == 0); @@ -523,7 +554,6 @@ public AllocateResponse answer(InvocationOnMock invocation) sleep(1000); } } - assertTrue(amClient.ask.size() == 0); assertTrue(amClient.release.size() == 0); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index c5c687c..dd8a1c9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; @@ -66,11 +67,11 @@ public void testAMRMClientAsync() throws Exception { List allocated1 = Arrays.asList( Container.newInstance(null, null, null, null, null, null)); final AllocateResponse response1 = createAllocateResponse( - new ArrayList(), allocated1); + new ArrayList(), allocated1, null); final AllocateResponse response2 = createAllocateResponse(completed1, - new ArrayList()); + new ArrayList(), null); final AllocateResponse emptyResponse = createAllocateResponse( - new ArrayList(), new ArrayList()); + new ArrayList(), new ArrayList(), null); TestCallbackHandler callbackHandler = new TestCallbackHandler(); final AMRMClient client = mock(AMRMClientImpl.class); @@ -146,7 +147,7 @@ public Resource answer(InvocationOnMock invocation) Assert.assertEquals(null, callbackHandler.takeAllocatedContainers()); Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); } - + @Test(timeout=10000) public void testAMRMClientAsyncException() throws Exception { Configuration conf = new Configuration(); @@ -189,7 +190,7 @@ public void testAMRMClientAsyncReboot() throws Exception { AMRMClient client = mock(AMRMClientImpl.class); final AllocateResponse rebootResponse = createAllocateResponse( - new ArrayList(), new ArrayList()); + new ArrayList(), new ArrayList(), null); rebootResponse.setAMCommand(AMCommand.AM_RESYNC); when(client.allocate(anyFloat())).thenReturn(rebootResponse); @@ -215,9 +216,11 @@ public void testAMRMClientAsyncReboot() throws Exception { } private AllocateResponse createAllocateResponse( - List completed, List allocated) { - AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null); + List completed, List allocated, + List nmTokens) { + AllocateResponse response = + AllocateResponse.newInstance(0, completed, allocated, + new ArrayList(), null, null, 1, null, nmTokens); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java index 3a93071..958a1c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java @@ -21,15 +21,19 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; public class NMTokenIdentifier extends TokenIdentifier { @@ -106,5 +110,4 @@ public Text getKind() { public UserGroupInformation getUser() { return UserGroupInformation.createRemoteUser(appAttemptId.toString()); } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java index 0a35989..ccfe8f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java @@ -18,14 +18,11 @@ package org.apache.hadoop.yarn.server.security; -import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.crypto.SecretKey; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -34,7 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.util.Records; /** * SecretManager for ContainerTokens. Extended by both RM and NM and hence is diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java index b483d02..1094541 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java @@ -114,39 +114,4 @@ public MasterKey getCurrentKey() { public NMTokenIdentifier createIdentifier() { return new NMTokenIdentifier(); } - - /** - * Helper function for creating NMTokens. - */ - public Token createNMToken(ApplicationAttemptId applicationAttemptId, - NodeId nodeId, String applicationSubmitter) { - byte[] password; - NMTokenIdentifier identifier; - - this.readLock.lock(); - try { - identifier = - new NMTokenIdentifier(applicationAttemptId, nodeId, - applicationSubmitter, this.currentMasterKey.getMasterKey() - .getKeyId()); - password = this.createPassword(identifier); - } finally { - this.readLock.unlock(); - } - return newNMToken(password, identifier); - } - - public static Token newNMToken(byte[] password, - NMTokenIdentifier identifier) { - NodeId nodeId = identifier.getNodeId(); - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = - NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); - Token nmToken = - Token.newInstance(identifier.getBytes(), - NMTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return nmToken; - - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4a86c2e..410fc8a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -373,6 +373,12 @@ public AllocateResponse allocate(AllocateRequest request) // add preemption to the allocateResponse message (if any) allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + // Adding NMTokens for allocated containers. + if (!allocation.getContainers().isEmpty()) { + allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() + .getNMTokens(app.getUser(), appAttemptId, + allocation.getContainers())); + } return allocateResponse; } } @@ -433,12 +439,15 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); response.setResponseId(0); - LOG.info("Registering " + attemptId); + LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, response); + rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } public void unregisterAttempt(ApplicationAttemptId attemptId) { + LOG.info("Unregistering app attempt : " + attemptId); responseMap.remove(attemptId); + rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId); } public void refreshServiceAcls(Configuration configuration, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 0036880..d011739 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -218,7 +218,9 @@ public RegisterNodeManagerResponse registerNodeManager( this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeReconnectEvent(nodeId, rmNode)); } - + // On every node manager register we will be clearing NMToken keys if + // present for any running application. + this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); String message = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index 6046f24..3931c6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -18,18 +18,35 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; +import com.google.common.annotations.VisibleForTesting; + public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager { @@ -42,6 +59,7 @@ private final Timer timer; private final long rollingInterval; private final long activationDelay; + private final ConcurrentHashMap> appAttemptToNodeKeyMap; public NMTokenSecretManagerInRM(Configuration conf) { this.conf = conf; @@ -70,6 +88,8 @@ public NMTokenSecretManagerInRM(Configuration conf) { + " should be more than 2 X " + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS); } + appAttemptToNodeKeyMap = + new ConcurrentHashMap>(); } /** @@ -119,11 +139,23 @@ public void activateNextMasterKey() { + this.nextMasterKey.getMasterKey().getKeyId()); this.currentMasterKey = this.nextMasterKey; this.nextMasterKey = null; + clearApplicationNMTokenKeys(); } finally { super.writeLock.unlock(); } } + private void clearApplicationNMTokenKeys() { + // We should clear all node entries from this set. + // TODO : Once we have per node master key then it will change to only + // remove specific node from it. + Iterator> nodeSetI = + this.appAttemptToNodeKeyMap.values().iterator(); + while (nodeSetI.hasNext()) { + nodeSetI.next().clear(); + } + } + public void start() { rollMasterKey(); this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, @@ -150,4 +182,129 @@ public void run() { activateNextMasterKey(); } } + + public List getNMTokens(String applicationSubmitter, + ApplicationAttemptId appAttemptId, List containers) { + try { + this.readLock.lock(); + List nmTokens = new ArrayList(); + HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); + if (nodeSet != null) { + for (Container container : containers) { + if (!nodeSet.contains(container.getNodeId())) { + LOG.debug("Sending NMToken for nodeId : " + + container.getNodeId().toString()); + Token token = createNMToken(appAttemptId, container.getNodeId(), + applicationSubmitter); + NMToken nmToken = + NMToken.newInstance(container.getNodeId(), token); + nmTokens.add(nmToken); + nodeSet.add(container.getNodeId()); + } + } + } + return nmTokens; + } finally { + this.readLock.unlock(); + } + } + + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { + try { + this.writeLock.lock(); + this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet()); + } finally { + this.writeLock.unlock(); + } + } + + @Private + @VisibleForTesting + public boolean isApplicationAttemptRegistered( + ApplicationAttemptId appAttemptId) { + try { + this.readLock.lock(); + return this.appAttemptToNodeKeyMap.containsKey(appAttemptId); + } finally { + this.readLock.unlock(); + } + } + + @Private + @VisibleForTesting + public boolean isApplicationAttemptNMTokenPresent( + ApplicationAttemptId appAttemptId, NodeId nodeId) { + try { + this.readLock.lock(); + HashSet nodes = this.appAttemptToNodeKeyMap.get(appAttemptId); + if (nodes != null && nodes.contains(nodeId)) { + return true; + } else { + return false; + } + } finally { + this.readLock.unlock(); + } + } + + public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) { + try { + this.writeLock.lock(); + this.appAttemptToNodeKeyMap.remove(appAttemptId); + } finally { + this.writeLock.unlock(); + } + } + + /** + * This is to be called when NodeManager reconnects or goes down. This will + * remove if NMTokens if present for any running application from cache. + * @param nodeId + */ + public void removeNodeKey(NodeId nodeId) { + try { + this.writeLock.lock(); + Iterator> appNodeKeySetIterator = + this.appAttemptToNodeKeyMap.values().iterator(); + while (appNodeKeySetIterator.hasNext()) { + appNodeKeySetIterator.next().remove(nodeId); + } + } finally { + this.writeLock.unlock(); + } + } + + public static Token newNMToken(byte[] password, + NMTokenIdentifier identifier) { + NodeId nodeId = identifier.getNodeId(); + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = + NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); + Token nmToken = + Token.newInstance(identifier.getBytes(), + NMTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return nmToken; + } + + /** + * Helper function for creating NMTokens. + */ + public Token createNMToken(ApplicationAttemptId applicationAttemptId, + NodeId nodeId, String applicationSubmitter) { + byte[] password; + NMTokenIdentifier identifier; + + this.readLock.lock(); + try { + identifier = + new NMTokenIdentifier(applicationAttemptId, nodeId, + applicationSubmitter, this.currentMasterKey.getMasterKey() + .getKeyId()); + password = this.createPassword(identifier); + } finally { + this.readLock.unlock(); + } + return newNMToken(password, identifier); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 7fe0711..a110597 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -65,12 +65,12 @@ public void waitForState(RMAppAttemptState finalState) throws Exception { RMAppAttempt attempt = app.getRMAppAttempt(attemptId); int timeoutSecs = 0; while (!finalState.equals(attempt.getAppAttemptState()) - && timeoutSecs++ < 20) { + && timeoutSecs++ < 40) { System.out .println("AppAttempt : " + attemptId + " State is : " + attempt.getAppAttemptState() + " Waiting for state : " + finalState); - Thread.sleep(500); + Thread.sleep(1000); } System.out.println("AppAttempt State is : " + attempt.getAppAttemptState()); Assert.assertEquals("AppAttempt state is not correct (timedout)", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 71d6d7f..295382e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -136,6 +141,184 @@ public void testAppOnMultiNode() throws Exception { rm.stop(); } + + @Test + public void testNMToken() throws Exception { + MockRM rm = new MockRM(); + try { + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 10000); + + NMTokenSecretManagerInRM nmTokenSecretManager = + rm.getRMContext().getNMTokenSecretManager(); + + // submitting new application + RMApp app = rm.submitApp(1000); + + // start scheduling. + nm1.nodeHeartbeat(true); + + // Starting application attempt and launching + // It should get registered with NMTokenSecretManager. + RMAppAttempt attempt = app.getCurrentAppAttempt(); + + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + + // This will register application master. + am.registerAppAttempt(); + + ArrayList containersReceivedForNM1 = + new ArrayList(); + List releaseContainerList = + new ArrayList(); + HashMap nmTokens = new HashMap(); + + // initially requesting 2 containers. + AllocateResponse response = + am.allocate("h1", 1000, 2, releaseContainerList); + nm1.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2, + nmTokens); + Assert.assertEquals(1, nmTokens.size()); + + + // requesting 2 more containers. + response = am.allocate("h1", 1000, 2, releaseContainerList); + nm1.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4, + nmTokens); + Assert.assertEquals(1, nmTokens.size()); + + + // We will be simulating NM restart so restarting newly added h2:1234 + // NM 2 now registers. + MockNM nm2 = rm.registerNode("h2:1234", 10000); + nm2.nodeHeartbeat(true); + ArrayList containersReceivedForNM2 = + new ArrayList(); + + response = am.allocate("h2", 1000, 2, releaseContainerList); + nm2.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2, + nmTokens); + Assert.assertEquals(2, nmTokens.size()); + + // Simulating NM-2 restart. + nm2 = rm.registerNode("h2:1234", 10000); + nm2.nodeHeartbeat(true); + + int interval = 40; + // Wait for nm Token to be cleared. + while (nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId()) && interval-- > 0) { + LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId()); + Thread.sleep(1000); + } + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + + // removing NMToken for h2:1234 + nmTokens.remove(nm2.getNodeId().toString()); + Assert.assertEquals(1, nmTokens.size()); + + // We should again receive the NMToken. + response = am.allocate("h2", 1000, 2, releaseContainerList); + nm2.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4, + nmTokens); + Assert.assertEquals(2, nmTokens.size()); + + // Now rolling over NMToken masterKey. it should resend the NMToken in + // next allocate call. + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm1.getNodeId())); + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId())); + + nmTokenSecretManager.rollMasterKey(); + nmTokenSecretManager.activateNextMasterKey(); + + Assert.assertFalse(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm1.getNodeId())); + Assert.assertFalse(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId())); + // It should not remove application attempt entry. + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + + nmTokens.clear(); + Assert.assertEquals(0, nmTokens.size()); + // We should again receive the NMToken. + response = am.allocate("h2", 1000, 1, releaseContainerList); + nm2.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5, + nmTokens); + Assert.assertEquals(1, nmTokens.size()); + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId())); + + + // After AM is finished making sure that nmtoken entry for app + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + am.unregisterAppAttempt(); + // marking all the containers as finished. + for (Container container : containersReceivedForNM1) { + nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(), + ContainerState.COMPLETE); + } + for (Container container : containersReceivedForNM2) { + nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(), + ContainerState.COMPLETE); + } + am.waitForState(RMAppAttemptState.FINISHED); + Assert.assertFalse(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + } finally { + rm.stop(); + } + } + + protected void allocateContainersAndValidateNMTokens(MockAM am, + ArrayList containersReceived, int totalContainerRequested, + HashMap nmTokens) throws Exception, InterruptedException { + ArrayList releaseContainerList = new ArrayList(); + AllocateResponse response; + ArrayList resourceRequest = + new ArrayList(); + while (containersReceived.size() < totalContainerRequested) { + LOG.info("requesting containers.."); + response = + am.allocate(resourceRequest, releaseContainerList); + containersReceived.addAll(response.getAllocatedContainers()); + if (!response.getNMTokens().isEmpty()) { + for (NMToken nmToken : response.getNMTokens()) { + String nodeId = nmToken.getNodeId().toString(); + if (nmTokens.containsKey(nodeId)) { + Assert.fail("Duplicate NMToken received for : " + nodeId); + } + nmTokens.put(nodeId, nmToken.getToken()); + } + } + LOG.info("Got " + containersReceived.size() + + " containers. Waiting to get " + totalContainerRequested); + Thread.sleep(500); + } + } @Test (timeout = 300000) public void testActivatingApplicationAfterAddingNM() throws Exception {