diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 38dfa58..b418d47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -30,6 +30,7 @@ NodeAction getNodeAction(); List getContainersToCleanup(); + List getCleanedupContainersNotified(); List getApplicationsToCleanup(); @@ -43,6 +44,7 @@ void setNMTokenMasterKey(MasterKey secretKey); void addAllContainersToCleanup(List containers); + void addCleanedupContainersNotified(List containers); void addAllApplicationsToCleanup(List applications); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 775f95a..0a5e7a6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -46,6 +46,7 @@ boolean viaProto = false; private List containersToCleanup = null; + private List cleanedupContainersAck = null; private List applicationsToCleanup = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -73,6 +74,9 @@ private void mergeLocalToBuilder() { if (this.applicationsToCleanup != null) { addApplicationsToCleanupToProto(); } + if (this.cleanedupContainersAck != null) { + addCleanedupContainersAckToProto(); + } if (this.containerTokenMasterKey != null) { builder.setContainerTokenMasterKey( convertToProtoFormat(this.containerTokenMasterKey)); @@ -199,6 +203,12 @@ public void setDiagnosticsMessage(String diagnosticsMessage) { return this.containersToCleanup; } + @Override + public List getCleanedupContainersNotified() { + initCleanedupContainersAck(); + return this.cleanedupContainersAck; + } + private void initContainersToCleanup() { if (this.containersToCleanup != null) { return; @@ -212,6 +222,19 @@ private void initContainersToCleanup() { } } + private void initCleanedupContainersAck() { + if (this.cleanedupContainersAck != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getCleanedupContainersAckList(); + this.cleanedupContainersAck = new ArrayList(); + + for (ContainerIdProto c : list) { + this.cleanedupContainersAck.add(convertFromProtoFormat(c)); + } + } + @Override public void addAllContainersToCleanup( final List containersToCleanup) { @@ -221,6 +244,15 @@ public void addAllContainersToCleanup( this.containersToCleanup.addAll(containersToCleanup); } + @Override + public void addCleanedupContainersNotified( + final List cleanedupContainersAck) { + if (cleanedupContainersAck == null) + return; + initCleanedupContainersAck(); + this.cleanedupContainersAck.addAll(cleanedupContainersAck); + } + private void addContainersToCleanupToProto() { maybeInitBuilder(); builder.clearContainersToCleanup(); @@ -256,6 +288,42 @@ public void remove() { builder.addAllContainersToCleanup(iterable); } + + private void addCleanedupContainersAckToProto() { + maybeInitBuilder(); + builder.clearCleanedupContainersAck(); + if (cleanedupContainersAck == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = cleanedupContainersAck.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllCleanedupContainersAck(iterable); + } + @Override public List getApplicationsToCleanup() { initApplicationsToCleanup(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 29cd64e..26519a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 6; optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; + repeated ContainerIdProto cleanedup_containers_ack = 9; } message NMContainerStatusProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 0b8f5b4..4f246be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -408,6 +408,19 @@ private void addCompletedContainer(Container container) { } } + private void removeCompletedContainersFromContext(List + containerIds) { + synchronized (previousCompletedContainers) { + for (ContainerId cleanedUpContainerId : containerIds) { + this.context.getContainers().remove(cleanedUpContainerId); + } + if (!containerIds.isEmpty()) { + LOG.info("Removed acked completed containers from NM context: " + + containerIds); + } + } + } + private void removeCompletedContainersFromContext() { synchronized (previousCompletedContainers) { if (!previousCompletedContainers.isEmpty()) { @@ -542,7 +555,9 @@ public void run() { // don't want to remove the completed containers before resync // because these completed containers will be reported back to RM // when NM re-registers with RM. - removeCompletedContainersFromContext(); + // Only remove the cleanedup containers that are acked + removeCompletedContainersFromContext(response + .getCleanedupContainersNotified()); lastHeartBeatID = response.getResponseId(); List containersToCleanup = response diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 6eebb4d..45ecf4f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -200,7 +200,7 @@ protected void serviceStop() throws Exception { */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleNMContainerStatus(NMContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -231,7 +231,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) { containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, status); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status, + nodeId); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -326,7 +327,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getNMContainerStatuses()); for (NMContainerStatus status : request.getNMContainerStatuses()) { - handleNMContainerStatus(status); + handleNMContainerStatus(status, nodeId); } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b5ed92c..c211427 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; import java.util.List; +import java.util.Map; import javax.crypto.SecretKey; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -120,12 +122,12 @@ List pullJustFinishedContainers(); /** - * Return the list of last set of finished containers. This does not reset the - * finished containers. + * Return the map of last set of finished containers to the corresponding + * node. This does not reset the finished containers. * @return the list of just finished contianers, this does not reset the * finished containers. */ - List getJustFinishedContainers(); + Map getJustFinishedContainers(); /** * The container on which the Application Master is running. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java index ddf782e..93572b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java @@ -34,6 +34,7 @@ REGISTERED, STATUS_UPDATE, UNREGISTERED, + CONTAINER_CLEANUP_ACKED, // Source: Containers CONTAINER_ALLOCATED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 50a0755..1369806 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -26,7 +26,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -84,7 +87,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanedupContainerNotifiedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -131,9 +137,14 @@ private final ApplicationSubmissionContext submissionContext; private Token amrmToken = null; private SecretKey clientTokenMasterKey = null; - - private List justFinishedContainers = - new ArrayList(); + + private Map justFinishedContainers = + new HashMap(); + // Tracks the previous finished containers that are waiting to be + // verified as received by the AM. If the AM sends the next allocate + // request it implicitly acks this list. + private Map previousJustFinishedContainers = + new HashMap(); private Container masterContainer; private float progress = 0; @@ -608,7 +619,7 @@ public float getProgress() { } @Override - public List getJustFinishedContainers() { + public Map getJustFinishedContainers() { this.readLock.lock(); try { return this.justFinishedContainers; @@ -624,7 +635,16 @@ public float getProgress() { try { List returnList = new ArrayList( this.justFinishedContainers.size()); - returnList.addAll(this.justFinishedContainers); + returnList.addAll(this.justFinishedContainers.keySet()); + for (Map.Entry finishedContainerStatus: this + .previousJustFinishedContainers.entrySet()) { + // Implicitly acks the previous list as being received by the AM + eventHandler.handle(new RMNodeCleanedupContainerNotifiedEvent( + finishedContainerStatus.getValue(), finishedContainerStatus + .getKey().getContainerId())); + } + previousJustFinishedContainers.clear(); + previousJustFinishedContainers.putAll(this.justFinishedContainers); this.justFinishedContainers.clear(); return returnList; } finally { @@ -1497,7 +1517,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } // Normal container.Put it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return this.currentState; } } @@ -1513,7 +1534,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, ContainerStatus containerStatus = containerFinishedEvent.getContainerStatus(); // Normal container. Add it in completed containers list - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } @@ -1554,7 +1576,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return RMAppAttemptState.FINISHED; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); return RMAppAttemptState.FINISHING; } } @@ -1589,7 +1612,8 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, return; } // Normal container. - appAttempt.justFinishedContainers.add(containerStatus); + appAttempt.justFinishedContainers.put(containerStatus, + containerFinishedEvent.getNodeId()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java index 3660597..b1feae2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java @@ -20,21 +20,25 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { private final ContainerStatus containerStatus; + private final NodeId nodeId; public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, - ContainerStatus containerStatus) { + ContainerStatus containerStatus, NodeId nodeId) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.nodeId = nodeId; } public ContainerStatus getContainerStatus() { return this.containerStatus; } + public NodeId getNodeId() { return this.nodeId; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java index d760e7d..f10cff6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java @@ -20,18 +20,26 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; public class RMContainerFinishedEvent extends RMContainerEvent { private final ContainerStatus remoteContainerStatus; + private final NodeId nodeId; public RMContainerFinishedEvent(ContainerId containerId, - ContainerStatus containerStatus, RMContainerEventType event) { + ContainerStatus containerStatus, RMContainerEventType event, + NodeId nodeId) { super(containerId, event); this.remoteContainerStatus = containerStatus; + this.nodeId = nodeId; } public ContainerStatus getRemoteContainerStatus() { return this.remoteContainerStatus; } + + public NodeId getNodeId() { + return this.nodeId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index eef361f..b9cc3a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -76,13 +76,13 @@ RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state - .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, + .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, RMContainerEventType.START, new ContainerStartedTransition()) - .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, + .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, RMContainerEventType.KILL) // nothing to do - .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, + .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, RMContainerEventType.RELEASED) // nothing to do @@ -98,7 +98,7 @@ .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED, new LaunchedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) + RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, @@ -412,7 +412,8 @@ public RMContainerState transition(RMContainerImpl container, new FinishedTransition().transition(container, new RMContainerFinishedEvent(container.containerId, status, - RMContainerEventType.FINISHED)); + RMContainerEventType.FINISHED, ((RMContainerRecoverEvent) event) + .getNodeId())); return RMContainerState.COMPLETED; } else if (report.getContainerState().equals(ContainerState.RUNNING)) { // Tell the app @@ -491,7 +492,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { updateMetricsIfPreempted(container); container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, finishedEvent.getRemoteContainerStatus())); + container.appAttemptId, finishedEvent.getRemoteContainerStatus(), + finishedEvent.getNodeId())); container.rmContext.getRMApplicationHistoryWriter().containerFinished( container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java index 338ccbd..bb6a781 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java @@ -19,19 +19,24 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMContainerRecoverEvent extends RMContainerEvent { private final NMContainerStatus containerReport; + private NodeId nodeId; public RMContainerRecoverEvent(ContainerId containerId, - NMContainerStatus containerReport) { + NMContainerStatus containerReport, NodeId nodeId) { super(containerId, RMContainerEventType.RECOVER); this.containerReport = containerReport; + this.nodeId = nodeId; } public NMContainerStatus getContainerReport() { return containerReport; } + + public NodeId getNodeId() { return nodeId; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java new file mode 100644 index 0000000..ab8cdbd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeCleanedupContainerNotifiedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +// Happens after an implicit ack from AM that the container completion has +// been notified successfully to the AM +public class RMNodeCleanedupContainerNotifiedEvent extends RMNodeEvent { + + private ContainerId contId; + + public RMNodeCleanedupContainerNotifiedEvent(NodeId nodeId, + ContainerId contId) { + super(nodeId, RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED); + this.contId = contId; + } + + public ContainerId getContainerId() { + return this.contId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index ef644be..69a5d39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -36,6 +36,7 @@ // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, + CLEANEDUP_CONTAINER_NOTIFIED, // Source: NMLivelinessMonitor EXPIRE diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index e20adc5..3917ba9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -111,6 +111,10 @@ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); + /* set of containers that were notified to AM about their completion */ + private final Set cleanupContainersNotified = + new HashSet(); + /* the list of applications that have finished and need to be purged */ private final List finishedApplications = new ArrayList(); @@ -131,7 +135,7 @@ RMNodeEventType.STARTED, new AddNodeTransition()) //Transitions from RUNNING state - .addTransition(NodeState.RUNNING, + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, @@ -148,10 +152,13 @@ .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) //Transitions from UNHEALTHY state - .addTransition(NodeState.UNHEALTHY, + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, @@ -169,7 +176,10 @@ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) - + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + RMNodeEventType.CLEANEDUP_CONTAINER_NOTIFIED, + new CleanedupContainerNotifiedTransition()) + // create the topology tables .installTopology(); @@ -352,8 +362,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response response.addAllContainersToCleanup( new ArrayList(this.containersToClean)); response.addAllApplicationsToCleanup(this.finishedApplications); + response.addCleanedupContainersNotified( + new ArrayList(this.cleanupContainersNotified)); this.containersToClean.clear(); this.finishedApplications.clear(); + this.cleanupContainersNotified.clear(); } finally { this.writeLock.unlock(); } @@ -577,6 +590,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + public static class CleanedupContainerNotifiedTransition implements + SingleArcTransition { + + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + rmNode.cleanupContainersNotified.add((( + RMNodeCleanedupContainerNotifiedEvent) event).getContainerId()); + } + } + public static class DeactivateNodeTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5764c8c..505b919 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -250,7 +250,7 @@ public synchronized void recoverContainersOnNode( // recover RMContainer rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), - container)); + container, nm.getNodeID())); // recover scheduler node nodes.get(nm.getNodeID()).recoverContainer(rmContainer); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 846d1e1..8c43264 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -90,7 +90,8 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, new RMContainerFinishedEvent( containerId, containerStatus, - event) + event, + rmContainer.getAllocatedNode()) ); LOG.info("Completed container: " + rmContainer.getContainerId() + " in state: " + rmContainer.getState() + " event:" + event); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index 20cf395..bc1a5fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -91,7 +91,8 @@ synchronized public void containerCompleted(RMContainer rmContainer, new RMContainerFinishedEvent( containerId, containerStatus, - event) + event, + rmContainer.getAllocatedNode()) ); LOG.info("Completed container: " + rmContainer.getContainerId() + " in state: " + rmContainer.getState() + " event:" + event); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 6949a81..2b3d6e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; @@ -154,7 +155,8 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId) public void waitForContainerToComplete(RMAppAttempt attempt, NMContainerStatus completedContainer) throws InterruptedException { while (true) { - List containers = attempt.getJustFinishedContainers(); + Set containers = attempt.getJustFinishedContainers() + .keySet(); System.out.println("Received completed containers " + containers); for (ContainerStatus container : containers) { if (container.getContainerId().equals( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4360aef..959478e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -490,7 +490,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event) any()); // Case 1.2: Master container is null @@ -501,7 +501,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); verify(handler, never()).handle((Event)any()); // Case 2: Managed AM @@ -514,7 +514,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } @@ -529,7 +529,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception { ContainerState.COMPLETE, Resource.newInstance(1024, 1), "Dummy Completed", 0, Priority.newInstance(10), 1234); try { - rm.getResourceTrackerService().handleNMContainerStatus(report); + rm.getResourceTrackerService().handleNMContainerStatus(report, null); } catch (Exception e) { // expected - ignore } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index de09878..fceee47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -197,7 +197,8 @@ public void testAMRestartWithExistingContainers() throws Exception { waitForContainersToFinish(4, newAttempt); boolean container3Exists = false, container4Exists = false, container5Exists = false, container6Exists = false; - for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { + for(ContainerStatus status : newAttempt.getJustFinishedContainers() + .keySet()) { if(status.getContainerId().equals(containerId3)) { // containerId3 is the container ran by previous attempt but finished by the // new attempt. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 01a6973..273b2f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -681,7 +681,8 @@ private void testUnmanagedAMSuccess(String url) { application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class), + container.getNodeId())); // complete AM String diagnostics = "Successful"; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; @@ -816,7 +817,7 @@ public void testAMCrashAtAllocated() { BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, null)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -840,7 +841,7 @@ public void testRunningToFailed() { ContainerState.COMPLETE, containerDiagMsg, exitCode); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs)); + appAttemptId, cs, null)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Failed state. @@ -848,7 +849,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -885,7 +886,7 @@ public void testRunningToKilled() { applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -1061,7 +1062,7 @@ public void testFinishingToFinishing() { BuilderUtils.newContainerStatus( BuilderUtils.newContainerId( applicationAttempt.getAppAttemptId(), 42), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, diagnostics); } @@ -1080,7 +1081,7 @@ public void testSuccessfulFinishingToFinished() { new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), null)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, diagnostics, 0, false); } @@ -1108,7 +1109,7 @@ public void testSuccessfulFinishingToFinished() { // Container_finished event comes before Attempt_Saved event. applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), null)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // send attempt_saved @@ -1193,7 +1194,7 @@ public void testFailedToFailed() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -1209,10 +1210,10 @@ public void testFailedToFailed() { ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs2)); + appAttemptId, cs2, null)); assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); assertEquals(cs2.getContainerId(), applicationAttempt - .getJustFinishedContainers().get(0).getContainerId()); + .getJustFinishedContainers().keySet().iterator().next().getContainerId()); } @@ -1234,7 +1235,7 @@ public void testContainersCleanupForLastAttempt() { ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, null)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 44f8381..0486c5c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -123,7 +123,7 @@ public void testReleaseWhileRunning() { .createAbnormalContainerStatus(containerId, SchedulerUtils.RELEASED_CONTAINER); rmContainer.handle(new RMContainerFinishedEvent(containerId, - containerStatus, RMContainerEventType.RELEASED)); + containerStatus, RMContainerEventType.RELEASED, nodeId)); drainDispatcher.await(); assertEquals(RMContainerState.RELEASED, rmContainer.getState()); assertEquals(SchedulerUtils.RELEASED_CONTAINER, @@ -144,7 +144,7 @@ public void testReleaseWhileRunning() { // In RELEASED state. A FINIHSED event may come in. rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils .createAbnormalContainerStatus(containerId, "FinishedContainer"), - RMContainerEventType.FINISHED)); + RMContainerEventType.FINISHED, nodeId)); assertEquals(RMContainerState.RELEASED, rmContainer.getState()); } @@ -209,7 +209,7 @@ public void testExpireWhileRunning() { .createAbnormalContainerStatus(containerId, SchedulerUtils.EXPIRED_CONTAINER); rmContainer.handle(new RMContainerFinishedEvent(containerId, - containerStatus, RMContainerEventType.EXPIRE)); + containerStatus, RMContainerEventType.EXPIRE, nodeId)); drainDispatcher.await(); assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index 14385c4..6d95320 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -157,7 +157,7 @@ public void testTokenExpiry() throws Exception { .getEventHandler() .handle( new RMAppAttemptContainerFinishedEvent(applicationAttemptId, - containerStatus)); + containerStatus, nm1.getNodeId())); // Make sure the RMAppAttempt is at Finished State. // Both AMRMToken and ClientToAMToken have been removed.