diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 363f666..8885769 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null); + YarnVersionInfo.getVersion(), null, null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 6ca3861..116dbcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -29,7 +30,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + List containerStatuses, List runningApplication) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -37,6 +38,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNodeId(nodeId); request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); + request.setRunningApplications(runningApplication); return request; } @@ -45,10 +47,12 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getContainerStatuses(); + public abstract List getRunningApplications(); public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); public abstract void setContainerStatuses(List containerStatuses); + public abstract void setRunningApplications(List apps); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 6f9c43d..9202c3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -23,16 +23,15 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -50,6 +49,7 @@ private Resource resource = null; private NodeId nodeId = null; private List containerStatuses = null; + private List runningApplications = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -71,6 +71,9 @@ private void mergeLocalToBuilder() { if (this.containerStatuses != null) { addContainerStatusesToProto(); } + if (this.runningApplications != null) { + addRunningApplicationsToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -152,6 +155,66 @@ public void setHttpPort(int httpPort) { maybeInitBuilder(); builder.setHttpPort(httpPort); } + + @Override + public List getRunningApplications() { + initRunningApplications(); + return runningApplications; + } + + private void initRunningApplications() { + if (this.runningApplications != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRunningApplicationsList(); + this.runningApplications = new ArrayList(); + for (ApplicationIdProto c : list) { + this.runningApplications.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setRunningApplications(List apps) { + if (apps == null) { + return; + } + initRunningApplications(); + this.runningApplications.addAll(apps); + } + + private void addRunningApplicationsToProto() { + maybeInitBuilder(); + builder.clearRunningApplications(); + if (runningApplications == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = runningApplications.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllRunningApplications(it); + } @Override public List getContainerStatuses() { @@ -242,6 +305,14 @@ public void setNMVersion(String version) { maybeInitBuilder(); builder.setNmVersion(version); } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) { return new NodeIdPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c544905..e412709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto resource = 4; optional string nm_version = 5; repeated ContainerStatusProto containerStatuses = 6; + repeated ApplicationIdProto runningApplications = 7; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java new file mode 100644 index 0000000..9f24fb4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -0,0 +1,81 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Arrays; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; +import org.junit.Assert; +import org.junit.Test; + +public class TestRegisterNodeManagerRequest { + @Test + public void testRegisterNodeManagerRequest() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), + 1234, Resource.newInstance(0, 0), "version", Arrays + .asList(ContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234L, 1), 1), 1), + ContainerState.RUNNING, "good", -1)), Arrays.asList( + ApplicationId.newInstance(1234L, 1), + ApplicationId.newInstance(1234L, 2))); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check values + Assert.assertEquals(request1.getContainerStatuses().size(), request + .getContainerStatuses().size()); + Assert.assertEquals(request1.getContainerStatuses().get(0).getContainerId(), + request.getContainerStatuses().get(0).getContainerId()); + Assert.assertEquals(request1.getRunningApplications().size(), request + .getRunningApplications().size()); + Assert.assertEquals(request1.getRunningApplications().get(0), request + .getRunningApplications().get(0)); + Assert.assertEquals(request1.getRunningApplications().get(1), request + .getRunningApplications().get(1)); + } + + @Test + public void testRegisterNodeManagerRequestWithNullArrays() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), + 1234, Resource.newInstance(0, 0), "version", null, null); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check values + Assert.assertEquals(0, request1.getContainerStatuses().size()); + Assert.assertEquals(0, request1.getRunningApplications().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4db000c..fa347d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -247,9 +247,10 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerStatuses = getContainerStatuses(); + List runningApplications = getRunningApplications(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); + nodeManagerVersionId, containerStatuses, runningApplications); if (containerStatuses != null) { LOG.info("Registering with RM using finished containers :" + containerStatuses); @@ -374,6 +375,12 @@ private NodeStatus getNodeStatus(int responseId) { } return containerStatuses; } + + private List getRunningApplications() { + List runningApplications = new ArrayList(); + runningApplications.addAll(this.context.getApplications().keySet()); + return runningApplications; + } private void addCompletedContainer(Container container) { synchronized (previousCompletedContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..0ebece8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -31,8 +31,8 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -229,6 +230,30 @@ void handleContainerStatus(ContainerStatus containerStatus) { } @SuppressWarnings("unchecked") + void handleRunningAppOnNode(ApplicationId appId, NodeId nodeId) { + RMApp app = rmContext.getRMApps().get(appId); + if (null == app) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanAppEvent(nodeId, appId)); + return; + } + + // Add this nodeId to appAttempt's ranNode + // Why add this nodeId to appAttempt's ranNode before check app.isAppFinalStateStored? + // Because app may become finalStateStored=true before we add it to ranNode, + // and after appFinalStateStored check. This should be a rared edge case. + RMAppAttempt appAttempt = app.getCurrentAppAttempt(); + appAttempt.addRanNode(nodeId); + + // Check is this application already finished. If yes, create a clean-up + // event in RMNode + if (app.isAppFinalStateStored()) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanAppEvent(nodeId, appId)); + } + } + + @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, @@ -240,13 +265,6 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -316,6 +334,22 @@ public RegisterNodeManagerResponse registerNodeManager( // present for any running application. this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); + + // Handle container statuses reported by NM + if (!request.getContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getContainerStatuses()); + for (ContainerStatus containerStatus : request.getContainerStatuses()) { + handleContainerStatus(containerStatus); + } + } + + // Handle running applications reported by NM + if (null != request.getRunningApplications()) { + for (ApplicationId appId : request.getRunningApplications()) { + handleRunningAppOnNode(appId, request.getNodeId()); + } + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b4bad12..3122707 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -119,6 +119,11 @@ * @return the set of nodes that ran any containers from this {@link RMAppAttempt} */ Set getRanNodes(); + + /** + * Add more node on which containers for this {@link RMAppAttempt} ran + */ + void addRanNode(NodeId nodes); /** * Return a list of the last set of finished containers, resetting the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index efe0721..1e87107 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,16 +26,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,6 +102,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings({"unchecked", "rawtypes"}) public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @@ -136,7 +137,7 @@ //nodes on while this attempt's containers ran private Set ranNodes = - new HashSet(); + new ConcurrentSkipListSet(); private List justFinishedContainers = new ArrayList(); private Container masterContainer; @@ -1663,4 +1664,9 @@ public ApplicationAttemptReport createApplicationAttemptReport() { } return attemptReport; } + + @Override + public void addRanNode(NodeId nodes) { + ranNodes.add(nodes); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac06..b09d3a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -56,6 +56,7 @@ private MasterKey currentContainerTokenMasterKey; private MasterKey currentNMTokenMasterKey; private String version; + private List runningApplications; public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { // scale vcores based on the requested memory @@ -67,17 +68,19 @@ public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTrack public MockNM(String nodeIdStr, int memory, int vcores, ResourceTrackerService resourceTracker) { - this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion()); + this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion(), null); } public MockNM(String nodeIdStr, int memory, int vcores, - ResourceTrackerService resourceTracker, String version) { + ResourceTrackerService resourceTracker, String version, + List runningApplications) { this.memory = memory; this.vCores = vcores; this.resourceTracker = resourceTracker; this.version = version; String[] splits = nodeIdStr.split(":"); nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1])); + this.runningApplications = runningApplications; } public NodeId getNodeId() { @@ -114,6 +117,7 @@ public RegisterNodeManagerResponse registerNode( req.setResource(resource); req.setContainerStatuses(containerStatus); req.setNMVersion(version); + req.setRunningApplications(runningApplications); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..a0f148d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -21,10 +21,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; @@ -73,9 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; @SuppressWarnings("unchecked") public class MockRM extends ResourceManager { @@ -329,6 +330,15 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) nm.registerNode(); return nm; } + + public MockNM registerNode(String nodeIdStr, int memory, int vCores, + List runningApplication) throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), + YarnVersionInfo.getVersion(), runningApplication); + nm.registerNode(); + return nm; + } public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 2f16b85..9ef3305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -18,13 +18,24 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; @@ -33,7 +44,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -50,22 +60,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; - import org.junit.After; import org.junit.Assert; import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestResourceTrackerService { @@ -598,6 +604,40 @@ protected Dispatcher createDispatcher() { Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertEquals(5120 + 10240, metrics.getAvailableMB()); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testReconnectNodeWithRunningApplications() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1234L, 1); + + // set ranNodes + final Set ranNodes = new HashSet(); + + // add a bunch of mocks + RMApp mockRMApp = mock(RMApp.class); + RMAppAttempt mockRMAppAttempt = mock(RMAppAttempt.class); + when(mockRMApp.getCurrentAppAttempt()).thenReturn(mockRMAppAttempt); + ConcurrentMap mockMap = mock(ConcurrentMap.class); + when(mockMap.get(appId)).thenReturn(mockRMApp); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + ranNodes.add((NodeId) invocation.getArguments()[0]); + return null; + } + }).when(mockRMAppAttempt).addRanNode(any(NodeId.class)); + + rm = new MockRM(); + rm.start(); + rm.getRMContext().getRMApps().put(appId, mockRMApp); + + // register nm1 + MockNM nm1 = rm.registerNode("host1:1234", 5120, 1, Arrays.asList(appId)); + + // We should have nm1 in ranNode + Assert.assertTrue(ranNodes.contains(nm1.getNodeId())); + } private void writeToHostsFile(String... hosts) throws IOException { if (!hostFile.exists()) {