diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 363f666..8885769 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null); + YarnVersionInfo.getVersion(), null, null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 6ca3861..72d8d03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -27,9 +28,10 @@ public abstract class RegisterNodeManagerRequest { - public static RegisterNodeManagerRequest newInstance(NodeId nodeId, - int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, + Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -37,6 +39,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNodeId(nodeId); request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); + request.setRunningApplications(runningApplications); return request; } @@ -46,9 +49,28 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract String getNMVersion(); public abstract List getContainerStatuses(); + /** + * We introduce this here because currently YARN RM doesn't persist nodes info + * for application running. When RM restart happened, we cannot determinate if + * a node should do application cleanup (like log-aggregation, status update, + * etc.) or not.

+ * When we have this running application list in node manager register + * request, we can recover nodes info for running applications. And then we + * can take actions accordingly + * + * @return running application list in this node + */ + public abstract List getRunningApplications(); + public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); public abstract void setContainerStatuses(List containerStatuses); + + /** + * Setter for {@link RegisterNodeManagerRequest#getRunningApplications()} + * @param runningApplications running application in this node + */ + public abstract void setRunningApplications(List runningApplications); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 6f9c43d..9202c3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -23,16 +23,15 @@ import java.util.Iterator; import java.util.List; -import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -50,6 +49,7 @@ private Resource resource = null; private NodeId nodeId = null; private List containerStatuses = null; + private List runningApplications = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -71,6 +71,9 @@ private void mergeLocalToBuilder() { if (this.containerStatuses != null) { addContainerStatusesToProto(); } + if (this.runningApplications != null) { + addRunningApplicationsToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -152,6 +155,66 @@ public void setHttpPort(int httpPort) { maybeInitBuilder(); builder.setHttpPort(httpPort); } + + @Override + public List getRunningApplications() { + initRunningApplications(); + return runningApplications; + } + + private void initRunningApplications() { + if (this.runningApplications != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRunningApplicationsList(); + this.runningApplications = new ArrayList(); + for (ApplicationIdProto c : list) { + this.runningApplications.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setRunningApplications(List apps) { + if (apps == null) { + return; + } + initRunningApplications(); + this.runningApplications.addAll(apps); + } + + private void addRunningApplicationsToProto() { + maybeInitBuilder(); + builder.clearRunningApplications(); + if (runningApplications == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = runningApplications.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllRunningApplications(it); + } @Override public List getContainerStatuses() { @@ -242,6 +305,14 @@ public void setNMVersion(String version) { maybeInitBuilder(); builder.setNmVersion(version); } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) { return new NodeIdPBImpl(p); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c544905..e412709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto resource = 4; optional string nm_version = 5; repeated ContainerStatusProto containerStatuses = 6; + repeated ApplicationIdProto runningApplications = 7; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java new file mode 100644 index 0000000..9f24fb4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java @@ -0,0 +1,81 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import java.util.Arrays; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; +import org.junit.Assert; +import org.junit.Test; + +public class TestRegisterNodeManagerRequest { + @Test + public void testRegisterNodeManagerRequest() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), + 1234, Resource.newInstance(0, 0), "version", Arrays + .asList(ContainerStatus.newInstance( + ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1234L, 1), 1), 1), + ContainerState.RUNNING, "good", -1)), Arrays.asList( + ApplicationId.newInstance(1234L, 1), + ApplicationId.newInstance(1234L, 2))); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check values + Assert.assertEquals(request1.getContainerStatuses().size(), request + .getContainerStatuses().size()); + Assert.assertEquals(request1.getContainerStatuses().get(0).getContainerId(), + request.getContainerStatuses().get(0).getContainerId()); + Assert.assertEquals(request1.getRunningApplications().size(), request + .getRunningApplications().size()); + Assert.assertEquals(request1.getRunningApplications().get(0), request + .getRunningApplications().get(0)); + Assert.assertEquals(request1.getRunningApplications().get(1), request + .getRunningApplications().get(1)); + } + + @Test + public void testRegisterNodeManagerRequestWithNullArrays() { + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234), + 1234, Resource.newInstance(0, 0), "version", null, null); + + // serialze to proto, and get request from proto + RegisterNodeManagerRequest request1 = + new RegisterNodeManagerRequestPBImpl( + ((RegisterNodeManagerRequestPBImpl) request).getProto()); + + // check values + Assert.assertEquals(0, request1.getContainerStatuses().size()); + Assert.assertEquals(0, request1.getRunningApplications().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4db000c..fa347d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -247,9 +247,10 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerStatuses = getContainerStatuses(); + List runningApplications = getRunningApplications(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); + nodeManagerVersionId, containerStatuses, runningApplications); if (containerStatuses != null) { LOG.info("Registering with RM using finished containers :" + containerStatuses); @@ -374,6 +375,12 @@ private NodeStatus getNodeStatus(int responseId) { } return containerStatuses; } + + private List getRunningApplications() { + List runningApplications = new ArrayList(); + runningApplications.addAll(this.context.getApplications().keySet()); + return runningApplications; + } private void addCompletedContainer(Container container) { synchronized (previousCompletedContainers) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..ab368d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -31,8 +31,8 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -53,9 +53,11 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeAddedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; @@ -229,6 +231,23 @@ void handleContainerStatus(ContainerStatus containerStatus) { } @SuppressWarnings("unchecked") + void handleRunningAppOnNode(ApplicationId appId, NodeId nodeId) { + RMApp app = rmContext.getRMApps().get(appId); + + // if we failed getting app by appId, maybe something wrong happened, just + // send a clean-up message to NM to make sure app will be cleaned. + if (null == app) { + LOG.warn("Cannot get RMApp by appId=" + appId); + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanAppEvent(nodeId, appId)); + return; + } + + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppNodeAddedEvent(appId, nodeId)); + } + + @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, @@ -240,13 +259,6 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -316,6 +328,22 @@ public RegisterNodeManagerResponse registerNodeManager( // present for any running application. this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); + + // Handle container statuses reported by NM + if (!request.getContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getContainerStatuses()); + for (ContainerStatus containerStatus : request.getContainerStatuses()) { + handleContainerStatus(containerStatus); + } + } + + // Handle running applications reported by NM + if (null != request.getRunningApplications()) { + for (ApplicationId appId : request.getRunningApplications()) { + handleRunningAppOnNode(appId, request.getNodeId()); + } + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 391ccf6..2b590a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -19,16 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; - import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -208,6 +208,14 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the flag indicating whether the applications's state is stored. */ boolean isAppFinalStateStored(); + + + /** + * Nodes on which the containers for this {@link RMApp} ran. + * @return the set of nodes that ran any containers from this {@link RMApp} + * Add more node on which containers for this {@link RMApp} ran + */ + Set getRanNodes(); /** * Create the external user-facing state of ApplicationMaster from the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 3ab5db4..9e055c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -38,6 +38,9 @@ ATTEMPT_FAILED, ATTEMPT_KILLED, NODE_UPDATE, + + // Source: Container and ResourceTracker + NODE_ADDED, // Source: RMStateStore APP_NEW_SAVED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..4be0cb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -25,6 +25,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -115,6 +116,7 @@ private EventHandler handler; private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); + private Set ranNodes = new ConcurrentSkipListSet(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -148,6 +150,8 @@ new AppKilledTransition()) .addTransition(RMAppState.NEW, RMAppState.FAILED, RMAppEventType.APP_REJECTED, new AppRejectedTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) // Transitions from NEW_SAVING state .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, @@ -162,6 +166,8 @@ RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, @@ -178,6 +184,8 @@ RMAppEventType.KILL, new FinalSavingTransition( new AppKilledTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) // Transitions from ACCEPTED state @@ -199,6 +207,8 @@ new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) // ACCECPTED state can once again receive APP_ACCEPTED event, because on // recovery the app returns ACCEPTED state and the app once again go // through the scheduler and triggers one more APP_ACCEPTED event at @@ -219,6 +229,8 @@ .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, // UnManagedAM directly jumps to finished RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) .addTransition(RMAppState.RUNNING, EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), RMAppEventType.ATTEMPT_FAILED, @@ -234,6 +246,8 @@ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_FINISHED, new AttemptFinishedAtFinalSavingTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, @@ -242,6 +256,8 @@ // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, @@ -250,6 +266,8 @@ RMAppEventType.KILL)) // Transitions from KILLING state + .addTransition(RMAppState.KILLING, RMAppState.KILLING, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_KILLED, new FinalSavingTransition( @@ -266,6 +284,8 @@ // Transitions from FINISHED state // ignorable transitions + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, EnumSet.of( RMAppEventType.NODE_UPDATE, @@ -275,11 +295,15 @@ // Transitions from FAILED state // ignorable transitions + .addTransition(RMAppState.FAILED, RMAppState.FAILED, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions + .addTransition(RMAppState.KILLED, RMAppState.KILLED, RMAppEventType.NODE_ADDED, + new NodeAddedTransition()) .addTransition( RMAppState.KILLED, RMAppState.KILLED, @@ -699,6 +723,23 @@ public void transition(RMAppImpl app, RMAppEvent event) { nodeUpdateEvent.getNode()); }; } + + private static final class NodeAddedTransition extends RMAppTransition { + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppNodeAddedEvent nodeAddedEvent = (RMAppNodeAddedEvent) event; + + // if final state already stored, notify RMNode + if (app.isAppFinalStateStored()) { + app.dispatcher.getEventHandler().handle( + new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent + .getApplicationId())); + return; + } + + // otherwise, add it to ranNodes for further process + app.ranNodes.add(nodeAddedEvent.getNodeId()); + }; + } /** * Move an app to a new queue. @@ -1037,17 +1078,8 @@ public FinalTransition(RMAppState finalState) { this.finalState = finalState; } - private Set getNodesOnWhichAttemptRan(RMAppImpl app) { - Set nodes = new HashSet(); - for (RMAppAttempt attempt : app.attempts.values()) { - nodes.addAll(attempt.getRanNodes()); - } - return nodes; - } - public void transition(RMAppImpl app, RMAppEvent event) { - Set nodes = getNodesOnWhichAttemptRan(app); - for (NodeId nodeId : nodes) { + for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); } @@ -1144,4 +1176,9 @@ public static boolean isAppInFinalState(RMApp rmApp) { private RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } + + @Override + public Set getRanNodes() { + return ranNodes; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeAddedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeAddedEvent.java new file mode 100644 index 0000000..eb29c1d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppNodeAddedEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMAppNodeAddedEvent extends RMAppEvent { + private final NodeId node; + + public RMAppNodeAddedEvent(ApplicationId appId, NodeId node) { + super(appId, RMAppEventType.NODE_ADDED); + this.node = node; + } + + public NodeId getNodeId() { + return node; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b4bad12..d472ad4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; -import java.util.Set; import javax.crypto.SecretKey; @@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -115,12 +113,6 @@ FinalApplicationStatus getFinalApplicationStatus(); /** - * Nodes on which the containers for this {@link RMAppAttempt} ran. - * @return the set of nodes that ran any containers from this {@link RMAppAttempt} - */ - Set getRanNodes(); - - /** * Return a list of the last set of finished containers, resetting the * finished containers to empty. * @return the list of just finished containers, re setting the finished containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java index e1522f1..ddf782e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java @@ -36,7 +36,6 @@ UNREGISTERED, // Source: Containers - CONTAINER_ACQUIRED, CONTAINER_ALLOCATED, CONTAINER_FINISHED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index efe0721..3217126 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,16 +26,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.crypto.SecretKey; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -80,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import com.google.common.annotations.VisibleForTesting; + @SuppressWarnings({"unchecked", "rawtypes"}) public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @@ -133,10 +130,7 @@ private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - //nodes on while this attempt's containers ran - private Set ranNodes = - new HashSet(); + private List justFinishedContainers = new ArrayList(); private Container masterContainer; @@ -219,10 +213,7 @@ .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) - .addTransition(RMAppAttemptState.ALLOCATED_SAVING, - RMAppAttemptState.ALLOCATED_SAVING, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) + // App could be killed by the client. So need to handle this. .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.FINAL_SAVING, @@ -249,10 +240,6 @@ RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) // Transitions from ALLOCATED State - .addTransition(RMAppAttemptState.ALLOCATED, - RMAppAttemptState.ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED, RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition()) .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, @@ -295,10 +282,6 @@ .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, RMAppAttemptEventType.CONTAINER_ALLOCATED) .addTransition( - RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - new ContainerAcquiredTransition()) - .addTransition( RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, @@ -333,7 +316,6 @@ // should be fixed to reject container allocate request at Final // Saving in scheduler RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED, RMAppAttemptEventType.KILL)) @@ -616,11 +598,6 @@ public float getProgress() { } @Override - public Set getRanNodes() { - return ranNodes; - } - - @Override public Container getMasterContainer() { this.readLock.lock(); @@ -703,7 +680,6 @@ public void recover(RMState state) throws Exception { public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { this.justFinishedContainers = attempt.getJustFinishedContainers(); - this.ranNodes = attempt.getRanNodes(); } private void recoverAppAttemptCredentials(Credentials appAttemptTokens) @@ -1395,17 +1371,6 @@ private void updateInfoOnAMUnregister(RMAppAttemptEvent event) { finalStatus = unregisterEvent.getFinalApplicationStatus(); } - private static final class ContainerAcquiredTransition extends - BaseTransition { - @Override - public void transition(RMAppAttemptImpl appAttempt, - RMAppAttemptEvent event) { - RMAppAttemptContainerAcquiredEvent acquiredEvent - = (RMAppAttemptContainerAcquiredEvent) event; - appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId()); - } - } - private static final class ContainerFinishedTransition implements MultipleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java deleted file mode 100644 index 5902f91..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; - -public class RMAppAttemptContainerAcquiredEvent extends RMAppAttemptEvent { - - private final Container container; - - public RMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId, - Container container) { - super(appAttemptId, RMAppAttemptEventType.CONTAINER_ACQUIRED); - this.container = container; - } - - public Container getContainer() { - return this.container; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2921891..d0e3fa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeAddedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; @@ -372,8 +372,9 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.containerAllocationExpirer.register(container.getContainerId()); // Tell the appAttempt - container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( - container.getApplicationAttemptId(), container.getContainer())); + container.eventHandler.handle(new RMAppNodeAddedEvent(container + .getApplicationAttemptId().getApplicationId(), container + .getAllocatedNode())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac06..d7ee3ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -101,11 +101,17 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception { } public RegisterNodeManagerResponse registerNode() throws Exception { - return registerNode(null); + return registerNode(null, null); + } + + public RegisterNodeManagerResponse registerNode( + List runningApplications) throws Exception { + return registerNode(null, runningApplications); } public RegisterNodeManagerResponse registerNode( - List containerStatus) throws Exception{ + List containerStatus, + List runningApplications) throws Exception { RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); @@ -114,6 +120,7 @@ public RegisterNodeManagerResponse registerNode( req.setResource(resource); req.setContainerStatuses(containerStatus); req.setNMVersion(version); + req.setRunningApplications(runningApplications); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..03ae0f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -21,10 +21,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; @@ -73,9 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; @SuppressWarnings("unchecked") public class MockRM extends ResourceManager { @@ -329,6 +330,15 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) nm.registerNode(); return nm; } + + public MockNM registerNode(String nodeIdStr, int memory, int vCores, + List runningApplications) throws Exception { + MockNM nm = + new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(), + YarnVersionInfo.getVersion()); + nm.registerNode(runningApplications); + return nm; + } public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 49eff8b..f457208 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -30,6 +30,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -309,7 +310,7 @@ public void testRMRestart() throws Exception { .getCurrentAppAttempt().getAppAttemptId(), 1), ContainerState.COMPLETE, "Killed AM container", 143); containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + nm1.registerNode(containerStatuses, null); nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); @@ -393,6 +394,96 @@ public void testRMRestart() throws Exception { // And finished app is also loaded back. Assert.assertEquals(4, rmAppState.size()); } + + @Test (timeout = 60000) + public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(app0.getApplicationId())); + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // sleep for a while before do next heartbeat + Thread.sleep(1000); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + // check if nm1 recved app's clean-up message from RM2 + Assert.assertNotNull(response.getApplicationsToCleanup()); + Assert.assertEquals(1, response.getApplicationsToCleanup().size()); + Assert.assertEquals(app0.getApplicationId(), response.getApplicationsToCleanup().get(0)); + + rm1.stop(); + rm2.stop(); + } + + @Test (timeout = 600000) + public void testAppCleanupWhenRestartedBeforeAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(ContainerStatus.newInstance( + ContainerId.newInstance(am0.getApplicationAttemptId(), 1), + ContainerState.COMPLETE, "", 0)), Arrays.asList(app0.getApplicationId())); + + // assert app state has been saved. + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // sleep for a while before next heartbeat + Thread.sleep(1000); + NodeHeartbeatResponse response = nm1.nodeHeartbeat(true); + + // check if nm1 recved app's clean-up message + Assert.assertNotNull(response.getApplicationsToCleanup()); + Assert.assertEquals(1, response.getApplicationsToCleanup().size()); + Assert.assertEquals(app0.getApplicationId(), response.getApplicationsToCleanup().get(0)); + + rm1.stop(); + rm2.stop(); + } @Test (timeout = 60000) public void testRMRestartAppRunningAMFailed() throws Exception { @@ -517,7 +608,7 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1), ContainerState.COMPLETE, "Killed AM container", 143); containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + nm1.registerNode(containerStatuses, null); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); Assert.assertEquals(3, rmApp.getAppAttempts().size()); @@ -1677,7 +1768,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { .getCurrentAppAttempt().getAppAttemptId(), 1), ContainerState.COMPLETE, "Killed AM container", 143); containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + nm1.registerNode(containerStatuses, null); while (loadedApp1.getAppAttempts().size() != 2) { Thread.sleep(200); } @@ -1806,7 +1897,7 @@ protected void serviceStart() throws Exception { ContainerId.newInstance(am0.getApplicationAttemptId(), 1); status.add(ContainerStatus.newInstance(amContainer, ContainerState.COMPLETE, "AM container exit", 143)); - nm1.registerNode(status); + nm1.registerNode(status, null); } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 2f16b85..57421b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -18,11 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -33,7 +36,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -56,17 +58,10 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; - import org.junit.After; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - public class TestResourceTrackerService { private final static File TEMP_DIR = new File(System.getProperty( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 2cdbf95..4349a23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -159,6 +160,11 @@ public boolean isAppFinalStateStored() { public YarnApplicationState createApplicationState() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Set getRanNodes() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index b07525d..8f26d10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -232,4 +233,9 @@ public boolean isAppFinalStateStored() { public YarnApplicationState createApplicationState() { return null; } + + @Override + public Set getRanNodes() { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 7868fa0..8c5785f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeAddedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -315,7 +316,7 @@ private void testAppAttemptNewState() { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); assertNotNull(applicationAttempt.getTrackingUrl()); assertFalse("N/A".equals(applicationAttempt.getTrackingUrl())); @@ -331,7 +332,7 @@ private void testAppAttemptSubmittedState() { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); if (UserGroupInformation.isSecurityEnabled()) { verify(clientToAMTokenManager).createMasterKey( @@ -359,7 +360,7 @@ private void testAppAttemptSubmittedToFailedState(String diagnostics) { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); // Check events @@ -385,7 +386,7 @@ private void testAppAttemptKilledState(Container amContainer, assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAttemptFinalStateSaved(); @@ -425,7 +426,7 @@ private void testAppAttemptScheduledState() { assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); } @@ -461,7 +462,7 @@ private void testAppAttemptFailedState(Container container, assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); // Check events verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); @@ -666,8 +667,10 @@ private void testUnmanagedAMSuccess(String url) { runApplicationAttempt(null, "host", 8042, url, true); // complete a container - applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( - applicationAttempt.getAppAttemptId(), mock(Container.class))); + Container container = mock(Container.class); + when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + application.handle(new RMAppNodeAddedEvent(application.getApplicationId(), + container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); // complete AM @@ -845,7 +848,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); @@ -882,7 +885,7 @@ public void testRunningToKilled() { applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());