diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java index 075f3f5..0a54add 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; @@ -56,4 +57,6 @@ Clock getClock(); ClusterInfo getClusterInfo(); + + Map getNMTokens(); } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index eb4e6c3..b96056e 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -120,6 +120,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -893,6 +895,8 @@ public synchronized void stop() { private final Map jobs = new ConcurrentHashMap(); private final Configuration conf; private final ClusterInfo clusterInfo = new ClusterInfo(); + private final ConcurrentHashMap nmTokens = + new ConcurrentHashMap(); public RunningAppContext(Configuration config) { this.conf = config; @@ -947,6 +951,11 @@ public Clock getClock() { public ClusterInfo getClusterInfo() { return this.clusterInfo; } + + @Override + public Map getNMTokens() { + return this.nmTokens; + } } @SuppressWarnings("unchecked") diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 106f2f5..cdab559 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.YarnRuntimeException; import org.apache.hadoop.yarn.api.ContainerExitStatus; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -99,7 +102,7 @@ PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); PRIORITY_MAP.setPriority(20); } - + /* Vocabulary Used: pending -> requests which are NOT yet sent to RM @@ -556,6 +559,15 @@ public void rampDownReduces(int rampDown) { response = makeRemoteRequest(); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); + + // Updating NMTokens if newly received. + if (!response.getNMTokens().isEmpty()) { + for (NMToken nmToken : response.getNMTokens()) { + LOG.debug("Received NMToken for : " + nmToken.getNodeId()); + getContext().getNMTokens().put(nmToken.getNodeId().toString(), + nmToken.getToken()); + } + } } catch (Exception e) { // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. @@ -629,7 +641,7 @@ public void rampDownReduces(int rampDown) { } return newContainers; } - + @VisibleForTesting public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID) { diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java index 8cb1eac..2b8be46 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java @@ -27,6 +27,8 @@ import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import com.google.common.collect.Maps; @@ -52,11 +54,11 @@ public MockAppContext(int appid, int numTasks, int numAttempts, Path confPath) { map.put(job.getID(), job); jobs = map; } - + public MockAppContext(int appid, int numJobs, int numTasks, int numAttempts) { this(appid, numJobs, numTasks, numAttempts, false); } - + public MockAppContext(int appid, int numJobs, int numTasks, int numAttempts, boolean hasFailedTasks) { appID = MockJobs.newAppID(appid); @@ -115,4 +117,9 @@ public ClusterInfo getClusterInfo() { return null; } + @Override + public Map getNMTokens() { + // Not added. + return null; + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index f7cdd4f..ac55585 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -850,5 +851,11 @@ public long getStartTime() { public ClusterInfo getClusterInfo() { return new ClusterInfo(); } + + @Override + public Map getNMTokens() { + // Not implemented. + return null; + } } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobConf.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobConf.java index 45c5917..a6316ef 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobConf.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobConf.java @@ -41,14 +41,8 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.MockAppContext; -import org.apache.hadoop.mapreduce.v2.app.MockJobs; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.util.MRApps; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.ClusterInfo; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.WebServicesTestUtils; import org.codehaus.jettison.json.JSONArray; @@ -62,7 +56,6 @@ import org.w3c.dom.NodeList; import org.xml.sax.InputSource; -import com.google.common.collect.Maps; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceServletContextListener; diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 4ad42ad..5c78e04 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.YarnRuntimeException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; @@ -299,4 +301,10 @@ public Clock getClock() { public ClusterInfo getClusterInfo() { return null; } + + @Override + public Map getNMTokens() { + // Not implemented. + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 18ce01a..b9b86fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; /** @@ -67,7 +66,7 @@ public static AllocateResponse newInstance(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, Resource availResources, AMCommand command, int numClusterNodes, - PreemptionMessage preempt) { + PreemptionMessage preempt, List nmTokens) { AllocateResponse response = Records.newRecord(AllocateResponse.class); response.setNumClusterNodes(numClusterNodes); response.setResponseId(responseId); @@ -77,6 +76,7 @@ public static AllocateResponse newInstance(int responseId, response.setAvailableResources(availResources); response.setAMCommand(command); response.setPreemptionMessage(preempt); + response.setNMTokens(nmTokens); return response; } @@ -202,7 +202,7 @@ public static AllocateResponse newInstance(int responseId, @Public @Stable - public abstract void setNMTokens(List nmTokens); + public abstract void setNMTokens(List nmTokens); /** * Get the list of NMTokens required for communicating with NM. New NMTokens @@ -217,6 +217,6 @@ public static AllocateResponse newInstance(int responseId, */ @Public @Stable - public abstract List getNMTokens(); + public abstract List getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NMToken.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NMToken.java new file mode 100644 index 0000000..11120f7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NMToken.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; + +/** + * NMToken is returned by RM on AllocateResponse. + */ +public abstract class NMToken { + + @Public + @Stable + public abstract NodeId getNodeId(); + + @Public + @Stable + public abstract void setNodeId(NodeId nodeId); + + @Public + @Stable + public abstract Token getToken(); + + @Public + @Stable + public abstract void setToken(Token token); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 311bba5..15e5207 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -23,21 +23,19 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; @@ -46,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.util.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; public class AllocateResponsePBImpl extends AllocateResponse { @@ -56,7 +55,7 @@ Resource limit; private List allocatedContainers = null; - private List nmTokens = null; + private List nmTokens = null; private List completedContainersStatuses = null; private List updatedNodes = null; @@ -108,7 +107,7 @@ private synchronized void mergeLocalToBuilder() { } if (nmTokens != null) { builder.clearNmTokens(); - Iterable iterable = getTokenProtoIterable(nmTokens); + Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokens(iterable); } if (this.completedContainersStatuses != null) { @@ -248,9 +247,11 @@ public synchronized void setCompletedContainersStatuses( } @Override - public synchronized void setNMTokens(List nmTokens) { + public synchronized void setNMTokens(List nmTokens) { if (nmTokens == null || nmTokens.isEmpty()) { - this.nmTokens.clear(); + if (this.nmTokens != null) { + this.nmTokens.clear(); + } builder.clearNmTokens(); return; } @@ -260,7 +261,7 @@ public synchronized void setNMTokens(List nmTokens) { } @Override - public synchronized List getNMTokens() { + public synchronized List getNMTokens() { initLocalNewNMTokenList(); return nmTokens; } @@ -334,9 +335,9 @@ private synchronized void initLocalNewNMTokenList() { return; } AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getNmTokensList(); - nmTokens = new ArrayList(); - for (TokenProto t : list) { + List list = p.getNmTokensList(); + nmTokens = new ArrayList(); + for (NMTokenProto t : list) { nmTokens.add(convertFromProtoFormat(t)); } } @@ -372,15 +373,15 @@ public synchronized void remove() { }; } - private synchronized Iterable getTokenProtoIterable( - final List nmTokenList) { + private synchronized Iterable getTokenProtoIterable( + final List nmTokenList) { maybeInitBuilder(); - return new Iterable() { + return new Iterable() { @Override - public synchronized Iterator iterator() { - return new Iterator() { + public synchronized Iterator iterator() { + return new Iterator() { - Iterator iter = nmTokenList.iterator(); + Iterator iter = nmTokenList.iterator(); @Override public boolean hasNext() { @@ -388,7 +389,7 @@ public boolean hasNext() { } @Override - public TokenProto next() { + public NMTokenProto next() { return convertToProtoFormat(iter.next()); } @@ -524,11 +525,11 @@ private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessa return ((PreemptionMessagePBImpl)r).getProto(); } - private synchronized TokenProto convertToProtoFormat(Token token) { - return ((TokenPBImpl)token).getProto(); + private synchronized NMTokenProto convertToProtoFormat(NMToken token) { + return ((NMTokenPBImpl)token).getProto(); } - private synchronized Token convertFromProtoFormat(TokenProto proto) { - return new TokenPBImpl(proto); + private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) { + return new NMTokenPBImpl(proto); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NMTokenPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NMTokenPBImpl.java new file mode 100644 index 0000000..4c02565 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NMTokenPBImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProtoOrBuilder; + + +public class NMTokenPBImpl extends NMToken{ + + NMTokenProto proto = NMTokenProto.getDefaultInstance(); + NMTokenProto.Builder builder = null; + boolean viaProto = false; + + private Token token = null; + private NodeId nodeId = null; + + public NMTokenPBImpl() { + builder = NMTokenProto.newBuilder(); + } + + public NMTokenPBImpl(NMTokenProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public synchronized NodeId getNodeId() { + NMTokenProtoOrBuilder p = viaProto ? proto : builder; + return convertFromProtoFormat(p.getNodeId()); + } + + @Override + public synchronized void setNodeId(NodeId nodeId) { + maybeInitBuilder(); + builder.setNodeId(convertToProtoFormat(nodeId)); + } + + @Override + public synchronized Token getToken() { + NMTokenProtoOrBuilder p = viaProto ? proto : builder; + return convertFromProtoFormat(p.getToken()); + } + + @Override + public synchronized void setToken(Token token) { + maybeInitBuilder(); + builder.setToken(convertToProtoFormat(token)); + } + + public synchronized NMTokenProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (this.nodeId != null) { + builder.setNodeId(convertToProtoFormat(nodeId)); + } + if (this.token != null) { + builder.setToken(convertToProtoFormat(token)); + } + } + + private synchronized void maybeInitBuilder() { + if(viaProto || builder == null) { + builder = NMTokenProto.newBuilder(proto); + } + viaProto = false; + } + + private synchronized NodeId convertFromProtoFormat(NodeIdProto p) { + return new NodeIdPBImpl(p); + } + + private synchronized NodeIdProto convertToProtoFormat(NodeId nodeId) { + return ((NodeIdPBImpl)nodeId).getProto(); + } + + private synchronized TokenProto convertToProtoFormat(Token token) { + return ((TokenPBImpl)token).getProto(); + } + + private synchronized Token convertFromProtoFormat(TokenProto proto) { + return new TokenPBImpl(proto); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8e81f21..d697fb5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -59,6 +59,11 @@ message AllocateRequestProto { optional float progress = 6; } +message NMTokenProto { + optional NodeIdProto nodeId = 1; + optional hadoop.common.TokenProto token = 2; +} + message AllocateResponseProto { optional AMCommandProto a_m_command = 1; optional int32 response_id = 2; @@ -68,7 +73,7 @@ message AllocateResponseProto { repeated NodeReportProto updated_nodes = 6; optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; - repeated hadoop.common.TokenProto nm_tokens = 9; + repeated NMTokenProto nm_tokens = 9; } ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java index 71c59f9..7c9d4da 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.Service; @@ -208,4 +210,13 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String resourceName, Resource capability); + /** + * It returns the NMToken received on allocate call. It will not communicate + * with RM to get NMTokens. On allocate call whenever we receive new token + * along with container AMRMClient will cache this NMToken per node manager. + * This map returned should be shared with any application which is + * communicating with NodeManager (ex. NMClient) using NMTokens. If a new + * NMToken is received for the same node manager then it will be replaced. + */ + public ConcurrentHashMap getNMTokens(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java index f58e366..d16aa2f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.AbstractService; @@ -260,6 +262,19 @@ public Resource getClusterAvailableResources() { public int getClusterNodeCount() { return client.getClusterNodeCount(); } + + /** + * It returns the NMToken received on allocate call. It will not communicate + * with RM to get NMTokens. On allocate call whenever we receive new token + * along with new container AMRMClientAsync will cache this NMToken per node + * manager. This map returned should be shared with any application which is + * communicating with NodeManager (ex. NMClient / NMClientAsync) using + * NMTokens. If a new NMToken is received for the same node manager + * then it will be replaced. + */ + public ConcurrentHashMap getNMTokens() { + return client.getNMTokens(); + } private class HeartbeatThread extends Thread { public HeartbeatThread() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java index 9562135..7e32bb6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java @@ -33,9 +33,11 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; @@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -61,6 +65,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + // TODO check inputs for null etc. YARN-654 @Unstable @@ -73,6 +79,7 @@ RecordFactoryProvider.getRecordFactory(null); private int lastResponseId = 0; + private ConcurrentHashMap nmTokens; protected AMRMProtocol rmClient; protected final ApplicationAttemptId appAttemptId; @@ -238,6 +245,9 @@ public AllocateResponse allocate(float progressIndicator) clusterNodeCount = allocateResponse.getNumClusterNodes(); lastResponseId = allocateResponse.getResponseId(); clusterAvailableResources = allocateResponse.getAvailableResources(); + if (!allocateResponse.getNMTokens().isEmpty()) { + populateNMTokens(allocateResponse); + } } } finally { // TODO how to differentiate remote yarn exception vs error in rpc @@ -265,6 +275,20 @@ public AllocateResponse allocate(float progressIndicator) return allocateResponse; } + @Private + @VisibleForTesting + protected void populateNMTokens(AllocateResponse allocateResponse) { + for (NMToken token : allocateResponse.getNMTokens()) { + String nodeId = token.getNodeId().toString(); + if (nmTokens.containsKey(nodeId)) { + LOG.debug("Replacing token for : " + nodeId); + } else { + LOG.debug("Received new token for : " + nodeId); + } + nmTokens.put(nodeId, token.getToken()); + } + } + @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, @@ -512,4 +536,8 @@ private void decResourceRequest(Priority priority, } } + @Override + public ConcurrentHashMap getNMTokens() { + return nmTokens; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java index 6f3bbe1..1186f50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java @@ -25,9 +25,14 @@ import java.io.IOException; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.AMRMProtocol; @@ -49,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest; @@ -437,6 +443,11 @@ private void testAllocation(final AMRMClientImpl amClient) int allocatedContainerCount = 0; int iterationsLeft = 2; Set releases = new TreeSet(); + + ConcurrentHashMap nmTokens = amClient.getNMTokens(); + Assert.assertEquals(0, nmTokens.size()); + HashMap receivedNMTokens = new HashMap(); + while (allocatedContainerCount < containersRequestedAny && iterationsLeft-- > 0) { AllocateResponse allocResponse = amClient.allocate(0.1f); @@ -450,12 +461,31 @@ private void testAllocation(final AMRMClientImpl amClient) releases.add(rejectContainerId); amClient.releaseAssignedContainer(rejectContainerId); } + Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size()); + Iterator nodeI = nmTokens.keySet().iterator(); + while (nodeI.hasNext()) { + String nodeId = nodeI.next(); + if (!receivedNMTokens.containsKey(nodeId)) { + receivedNMTokens.put(nodeId, nmTokens.get(nodeId)); + } else { + Assert.fail("Received token again for : " + nodeId); + } + } + nodeI = receivedNMTokens.keySet().iterator(); + while (nodeI.hasNext()) { + nmTokens.remove(nodeI.next()); + } + if(allocatedContainerCount < containersRequestedAny) { // sleep to let NM's heartbeat to RM and trigger allocations sleep(1000); } } - + + Assert.assertEquals(0, amClient.getNMTokens().size()); + // Should receive atleast 1 token + Assert.assertTrue(receivedNMTokens.size() > 0); + assertTrue(allocatedContainerCount == containersRequestedAny); assertTrue(amClient.release.size() == 2); assertTrue(amClient.ask.size() == 0); @@ -523,7 +553,6 @@ public AllocateResponse answer(InvocationOnMock invocation) sleep(1000); } } - assertTrue(amClient.ask.size() == 0); assertTrue(amClient.release.size() == 0); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java index c5c687c..d5fdcb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java @@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -66,11 +67,11 @@ public void testAMRMClientAsync() throws Exception { List allocated1 = Arrays.asList( Container.newInstance(null, null, null, null, null, null)); final AllocateResponse response1 = createAllocateResponse( - new ArrayList(), allocated1); + new ArrayList(), allocated1, null); final AllocateResponse response2 = createAllocateResponse(completed1, - new ArrayList()); + new ArrayList(), null); final AllocateResponse emptyResponse = createAllocateResponse( - new ArrayList(), new ArrayList()); + new ArrayList(), new ArrayList(), null); TestCallbackHandler callbackHandler = new TestCallbackHandler(); final AMRMClient client = mock(AMRMClientImpl.class); @@ -146,7 +147,7 @@ public Resource answer(InvocationOnMock invocation) Assert.assertEquals(null, callbackHandler.takeAllocatedContainers()); Assert.assertEquals(null, callbackHandler.takeCompletedContainers()); } - + @Test(timeout=10000) public void testAMRMClientAsyncException() throws Exception { Configuration conf = new Configuration(); @@ -189,7 +190,7 @@ public void testAMRMClientAsyncReboot() throws Exception { AMRMClient client = mock(AMRMClientImpl.class); final AllocateResponse rebootResponse = createAllocateResponse( - new ArrayList(), new ArrayList()); + new ArrayList(), new ArrayList(), null); rebootResponse.setAMCommand(AMCommand.AM_RESYNC); when(client.allocate(anyFloat())).thenReturn(rebootResponse); @@ -215,9 +216,11 @@ public void testAMRMClientAsyncReboot() throws Exception { } private AllocateResponse createAllocateResponse( - List completed, List allocated) { - AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated, - new ArrayList(), null, null, 1, null); + List completed, List allocated, + List nmTokens) { + AllocateResponse response = + AllocateResponse.newInstance(0, completed, allocated, + new ArrayList(), null, null, 1, null, nmTokens); return response; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java index 0a35989..ccfe8f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java @@ -18,14 +18,11 @@ package org.apache.hadoop.yarn.server.security; -import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.crypto.SecretKey; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -34,7 +31,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.util.Records; /** * SecretManager for ContainerTokens. Extended by both RM and NM and hence is diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index d71c1de..d2667a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -373,6 +373,12 @@ public AllocateResponse allocate(AllocateRequest request) // add preemption to the allocateResponse message (if any) allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation)); + // Adding NMTokens for allocated containers. + if (!allocation.getContainers().isEmpty()) { + allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() + .getNMTokens(app.getUser(), appAttemptId, + allocation.getContainers())); + } return allocateResponse; } } @@ -433,12 +439,15 @@ public void registerAppAttempt(ApplicationAttemptId attemptId) { AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); response.setResponseId(0); - LOG.info("Registering " + attemptId); + LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, response); + rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } public void unregisterAttempt(ApplicationAttemptId attemptId) { + LOG.info("Unregistering app attempt : " + attemptId); responseMap.remove(attemptId); + rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId); } public void refreshServiceAcls(Configuration configuration, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 626cad1..c552656 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -218,7 +218,9 @@ public RegisterNodeManagerResponse registerNodeManager( this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeReconnectEvent(nodeId, rmNode)); } - + // On every node manager register we will be clearing NMToken keys if + // present for any running application. + this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); String message = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java index 6046f24..a1ecddb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java @@ -18,18 +18,31 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NMTokenPBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; +import com.google.common.annotations.VisibleForTesting; + public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager { @@ -42,6 +55,7 @@ private final Timer timer; private final long rollingInterval; private final long activationDelay; + private final ConcurrentHashMap> appAttemptToNodeKeyMap; public NMTokenSecretManagerInRM(Configuration conf) { this.conf = conf; @@ -70,6 +84,8 @@ public NMTokenSecretManagerInRM(Configuration conf) { + " should be more than 2 X " + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS); } + appAttemptToNodeKeyMap = + new ConcurrentHashMap>(); } /** @@ -119,11 +135,23 @@ public void activateNextMasterKey() { + this.nextMasterKey.getMasterKey().getKeyId()); this.currentMasterKey = this.nextMasterKey; this.nextMasterKey = null; + clearApplicationNMTokenKeys(); } finally { super.writeLock.unlock(); } } + private void clearApplicationNMTokenKeys() { + // We should clear all node entries from this set. + // TODO : Once we have per node master key then it will change to only + // remove specific node from it. + Iterator> nodeSetI = + this.appAttemptToNodeKeyMap.values().iterator(); + while (nodeSetI.hasNext()) { + nodeSetI.next().clear(); + } + } + public void start() { rollMasterKey(); this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, @@ -150,4 +178,96 @@ public void run() { activateNextMasterKey(); } } + + public List getNMTokens(String applicationSubmitter, + ApplicationAttemptId appAttemptId, List containers) { + try { + this.readLock.lock(); + List nmTokens = new ArrayList(); + HashSet nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId); + if (nodeSet != null) { + for (Container container : containers) { + if (!nodeSet.contains(container.getNodeId())) { + LOG.debug("Sending NMToken for nodeId : " + + container.getNodeId().toString()); + Token token = createNMToken(appAttemptId, container.getNodeId(), + applicationSubmitter); + NMToken nmToken = new NMTokenPBImpl(); + nmToken.setNodeId(container.getNodeId()); + nmToken.setToken(token); + nmTokens.add(nmToken); + nodeSet.add(container.getNodeId()); + } + } + } + return nmTokens; + } finally { + this.readLock.unlock(); + } + } + + public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) { + try { + this.writeLock.lock(); + this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet()); + } finally { + this.writeLock.unlock(); + } + } + + @Private + @VisibleForTesting + public boolean isApplicationAttemptRegistered( + ApplicationAttemptId appAttemptId) { + try { + this.readLock.lock(); + return this.appAttemptToNodeKeyMap.containsKey(appAttemptId); + } finally { + this.readLock.unlock(); + } + } + + @Private + @VisibleForTesting + public boolean isApplicationAttemptNMTokenPresent( + ApplicationAttemptId appAttemptId, NodeId nodeId) { + try { + this.readLock.lock(); + HashSet nodes = this.appAttemptToNodeKeyMap.get(appAttemptId); + if (nodes != null && nodes.contains(nodeId)) { + return true; + } else { + return false; + } + } finally { + this.readLock.unlock(); + } + } + + public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) { + try { + this.writeLock.lock(); + this.appAttemptToNodeKeyMap.remove(appAttemptId); + } finally { + this.writeLock.unlock(); + } + } + + /** + * This is to be called when NodeManager reconnects or goes down. This will + * remove if NMTokens if present for any running application from cache. + * @param nodeId + */ + public void removeNodeKey(NodeId nodeId) { + try { + this.writeLock.lock(); + Iterator> appNodeKeySetIterator = + this.appAttemptToNodeKeyMap.values().iterator(); + while (appNodeKeySetIterator.hasNext()) { + appNodeKeySetIterator.next().remove(nodeId); + } + } finally { + this.writeLock.unlock(); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 97f392e..e720a3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -65,12 +65,12 @@ public void waitForState(RMAppAttemptState finalState) throws Exception { RMAppAttempt attempt = app.getRMAppAttempt(attemptId); int timeoutSecs = 0; while (!finalState.equals(attempt.getAppAttemptState()) - && timeoutSecs++ < 20) { + && timeoutSecs++ < 40) { System.out .println("AppAttempt : " + attemptId + " State is : " + attempt.getAppAttemptState() + " Waiting for state : " + finalState); - Thread.sleep(500); + Thread.sleep(1000); } System.out.println("AppAttempt State is : " + attempt.getAppAttemptState()); Assert.assertEquals("AppAttempt state is not correct (timedout)", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 71d6d7f..478c4a0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -19,22 +19,27 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.NMToken; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -136,6 +141,184 @@ public void testAppOnMultiNode() throws Exception { rm.stop(); } + + @Test + public void testNMToken() throws Exception { + MockRM rm = new MockRM(); + try { + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 10000); + + NMTokenSecretManagerInRM nmTokenSecretManager = + rm.getRMContext().getNMTokenSecretManager(); + + // submitting new application + RMApp app = rm.submitApp(1000); + + // start scheduling. + nm1.nodeHeartbeat(true); + + // Starting application attempt and launching + // It should get registered with NMTokenSecretManager. + RMAppAttempt attempt = app.getCurrentAppAttempt(); + + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + + // This will register application master. + am.registerAppAttempt(); + + ArrayList containersReceivedForNM1 = + new ArrayList(); + List releaseContainerList = + new ArrayList(); + HashMap nmTokens = new HashMap(); + + // initially requesting 2 containers. + AllocateResponse response = + am.allocate("h1", 1000, 2, releaseContainerList); + nm1.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2, + nmTokens); + Assert.assertEquals(1, nmTokens.size()); + + + // requesting 2 more containers. + response = am.allocate("h1", 1000, 2, releaseContainerList); + nm1.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4, + nmTokens); + Assert.assertEquals(1, nmTokens.size()); + + + // We will be simulating NM restart so restarting newly added h2:1234 + // NM 2 now registers. + MockNM nm2 = rm.registerNode("h2:1234", 10000); + nm2.nodeHeartbeat(true); + ArrayList containersReceivedForNM2 = + new ArrayList(); + + response = am.allocate("h2", 1000, 2, releaseContainerList); + nm2.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2, + nmTokens); + Assert.assertEquals(2, nmTokens.size()); + + // Simulating NM-2 restart. + nm2 = rm.registerNode("h2:1234", 10000); + nm2.nodeHeartbeat(true); + + int interval = 40; + // Wait for nm Token to be cleared. + while (nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId()) && interval-- > 0) { + LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId()); + Thread.sleep(1000); + } + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + + // removing NMToken for h2:1234 + nmTokens.remove(nm2.getNodeId().toString()); + Assert.assertEquals(1, nmTokens.size()); + + // We should again receive the NMToken. + response = am.allocate("h2", 1000, 2, releaseContainerList); + nm2.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4, + nmTokens); + Assert.assertEquals(2, nmTokens.size()); + + // Now rolling over NMToken masterKey. it should resend the NMToken in + // next allocate call. + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm1.getNodeId())); + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId())); + + nmTokenSecretManager.rollMasterKey(); + nmTokenSecretManager.activateNextMasterKey(); + + Assert.assertFalse(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm1.getNodeId())); + Assert.assertFalse(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId())); + // It should not remove application attempt entry. + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + + nmTokens.clear(); + Assert.assertEquals(0, nmTokens.size()); + // We should again receive the NMToken. + response = am.allocate("h2", 1000, 1, releaseContainerList); + nm2.nodeHeartbeat(true); + Assert.assertEquals(0, response.getAllocatedContainers().size()); + allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5, + nmTokens); + Assert.assertEquals(1, nmTokens.size()); + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(), + nm2.getNodeId())); + + + // After AM is finished making sure that nmtoken entry for app + Assert.assertTrue(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + am.unregisterAppAttempt(); + // marking all the containers as finished. + for (Container container : containersReceivedForNM1) { + nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(), + ContainerState.COMPLETE); + } + for (Container container : containersReceivedForNM2) { + nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(), + ContainerState.COMPLETE); + } + am.waitForState(RMAppAttemptState.FINISHED); + Assert.assertFalse(nmTokenSecretManager + .isApplicationAttemptRegistered(attempt.getAppAttemptId())); + } finally { + rm.stop(); + } + } + + protected void allocateContainersAndValidateNMTokens(MockAM am, + ArrayList containersReceived, int totalContainerRequested, + HashMap nmTokens) throws Exception, InterruptedException { + ArrayList releaseContainerList = new ArrayList(); + AllocateResponse response; + ArrayList resourceRequest = + new ArrayList(); + while (containersReceived.size() < totalContainerRequested) { + LOG.info("requesting containers.."); + response = + am.allocate(resourceRequest, releaseContainerList); + containersReceived.addAll(response.getAllocatedContainers()); + if (!response.getNMTokens().isEmpty()) { + for (NMToken nmToken : response.getNMTokens()) { + String nodeId = nmToken.getNodeId().toString(); + if (nmTokens.containsKey(nodeId)) { + Assert.fail("Duplicate NMToken received for : " + nodeId); + } + nmTokens.put(nodeId, nmToken.getToken()); + } + } + LOG.info("Got " + containersReceived.size() + + " containers. Waiting to get " + totalContainerRequested); + Thread.sleep(500); + } + } @Test (timeout = 300000) public void testActivatingApplicationAfterAddingNM() throws Exception {