diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 32f44a4..a334f06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -26,9 +29,11 @@ int getHttpPort(); Resource getResource(); String getNMVersion(); + List getContainerStatuses(); void setNodeId(NodeId nodeId); void setHttpPort(int port); void setResource(Resource resource); void setNMVersion(String version); + void addAllContainerStatuses(List containerStatuses); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index b81a590..c51fd6f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -19,11 +19,21 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; @@ -39,6 +49,7 @@ private Resource resource = null; private NodeId nodeId = null; + private List containerStatuses = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -57,6 +68,9 @@ public RegisterNodeManagerRequestProto getProto() { } private void mergeLocalToBuilder() { + if (this.containerStatuses != null) { + addFinishedContainersToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -140,6 +154,66 @@ public void setHttpPort(int httpPort) { } @Override + public List getContainerStatuses() { + initFinishedContainers(); + return containerStatuses; + } + + private void initFinishedContainers() { + if (this.containerStatuses != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainerStatusesList(); + this.containerStatuses = new ArrayList(); + for (ContainerStatusProto c : list) { + this.containerStatuses.add(convertFromProtoFormat(c)); + } + } + + @Override + public void addAllContainerStatuses(List containers) { + if (containers == null) { + return; + } + initFinishedContainers(); + this.containerStatuses.addAll(containers); + } + + private void addFinishedContainersToProto() { + maybeInitBuilder(); + builder.clearContainerStatuses(); + if (containerStatuses == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containerStatuses.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainerStatuses(it); + } + + @Override public String getNMVersion() { RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNmVersion()) { @@ -170,6 +244,11 @@ private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } - - -} + private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { + return new ContainerStatusPBImpl(c); + } + + private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { + return ((ContainerStatusPBImpl)c).getProto(); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 70434c8..c544905 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto { optional int32 http_port = 3; optional ResourceProto resource = 4; optional string nm_version = 5; + repeated ContainerStatusProto containerStatuses = 6; } message RegisterNodeManagerResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8a06418..56bafec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -238,13 +238,20 @@ protected ResourceTracker getRMClient() throws IOException { } @VisibleForTesting - protected void registerWithRM() throws YarnException, IOException { + protected void registerWithRM() + throws YarnException, IOException { RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); request.setNMVersion(this.nodeManagerVersionId); + List containerStatuses = this.getContainerStatuses(); + request.addAllContainerStatuses(containerStatuses); + if (containerStatuses != null) { + LOG.info("Registering with RM using finished containers :" + + containerStatuses); + } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); this.rmIdentifier = regNMResponse.getRMIdentifier(); @@ -328,8 +335,35 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); - int numActiveContainers = 0; - List containersStatuses = new ArrayList(); + List containersStatuses = getContainerStatuses(); + nodeStatus.setContainersStatuses(containersStatuses); + + LOG.debug(this.nodeId + " sending out status for " + + containersStatuses.size() + " containers"); + + NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); + nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); + nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); + nodeHealthStatus.setLastHealthReportTime( + healthChecker.getLastHealthReportTime()); + if (LOG.isDebugEnabled()) { + LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() + + ", " + nodeHealthStatus.getHealthReport()); + } + nodeStatus.setNodeHealthStatus(nodeHealthStatus); + + List keepAliveAppIds = createKeepAliveApplicationList(); + nodeStatus.setKeepAliveApplications(keepAliveAppIds); + + return nodeStatus; + } + + /* + * It will return current container statuses. If any container has + * COMPLETED then it will be removed from context. + */ + public List getContainerStatuses() { + List containerStatuses = new ArrayList(); for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); @@ -339,8 +373,7 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { // Clone the container to send it to the RM org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); - containersStatuses.add(containerStatus); - ++numActiveContainers; + containerStatuses.add(containerStatus); if (LOG.isDebugEnabled()) { LOG.debug("Sending out status for container: " + containerStatus); } @@ -356,26 +389,7 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { LOG.info("Removed completed container " + containerId); } } - nodeStatus.setContainersStatuses(containersStatuses); - - LOG.debug(this.nodeId + " sending out status for " - + numActiveContainers + " containers"); - - NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); - nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); - nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); - nodeHealthStatus.setLastHealthReportTime( - healthChecker.getLastHealthReportTime()); - if (LOG.isDebugEnabled()) { - LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() - + ", " + nodeHealthStatus.getHealthReport()); - } - nodeStatus.setNodeHealthStatus(nodeHealthStatus); - - List keepAliveAppIds = createKeepAliveApplicationList(); - nodeStatus.setKeepAliveApplications(keepAliveAppIds); - - return nodeStatus; + return containerStatuses; } private void trackAppsForKeepAlive(List appIds) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3091c4a..fdd2f56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -371,17 +373,26 @@ public void cleanupContainersOnNMResync() { this.handle(new CMgrCompletedContainersEvent(containerIds, CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC)); - while (!containers.isEmpty()) { - try { - Thread.sleep(1000); - nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); - } catch (InterruptedException ex) { - LOG.warn("Interrupted while sleeping on container kill on resync", ex); + + boolean allConainersCompleted = false; + while (!containers.isEmpty() && !allConainersCompleted) { + allConainersCompleted = true; + for (Entry container : containers.entrySet()) { + if (container.getValue().cloneAndGetContainerStatus().getState() + != ContainerState.COMPLETE) { + allConainersCompleted = false; + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on resync", + ex); + } + break; + } } } - // All containers killed - if (containers.isEmpty()) { + if (allConainersCompleted) { LOG.info("All containers in DONE state"); } else { LOG.info("Done waiting for containers to be killed. Still alive: " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 23f8754..a870a4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -29,6 +29,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; @@ -183,6 +190,32 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); + if (!request.getContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getContainerStatuses()); + for (ContainerStatus containerStatus : request.getContainerStatuses()) { + ApplicationAttemptId appAttemptId = + containerStatus.getContainerId().getApplicationAttemptId(); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (rmApp != null) { + RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + if (rmAppAttempt.getMasterContainer().getId() + .equals(containerStatus.getContainerId())) { + // sending master container finished event. + RMAppAttemptContainerFinishedEvent evt = + new RMAppAttemptContainerFinishedEvent(appAttemptId, + containerStatus); + rmContext.getDispatcher().getEventHandler().handle(evt); + } + } else { + LOG.error("Received finished container :" + + containerStatus.getContainerId() + + " for non existing application :" + + appAttemptId.getApplicationId()); + } + } + } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083c..4c059c5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -18,11 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -132,8 +134,8 @@ .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppSavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, - RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, - RMAppState.FINAL_SAVING), + RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING), RMAppEventType.RECOVER, new RMAppRecoveredTransition()) .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL, new FinalSavingTransition( @@ -238,7 +240,8 @@ // Transitions from FAILED state // ignorable transitions .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.ATTEMPT_FAILED, + RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions @@ -611,11 +614,11 @@ public void recover(RMState state) throws Exception{ this.diagnostics.append(appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); + for(int i=0; i { + @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - if (app.recoveredFinalState != null) { - FINAL_TRANSITION.transition(app, event); - return app.recoveredFinalState; + if (app.currentAttempt == null) { + // Saved application was not running any attempts. + app.createNewAttempt(true); + return RMAppState.SUBMITTED; + } else { + /* + * If last attempt recovered final state is + * 1) null .. it means attempt was started but AM container may or + * may not have started /finished. Therefore we should wait for it + * to finish. + * 2) FAILED/KILLED/FINISHED .. we will replay the logic when we + * recover final application attempt. + * Before recovering the last attempt we will recover all other + * attempts. + */ + + for (RMAppAttempt attempt : app.getAppAttempts().values()) { + if (attempt != app.currentAttempt) { + attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + } + app.currentAttempt.handle(new RMAppAttemptEvent(app.currentAttempt + .getAppAttemptId(), RMAppAttemptEventType.RECOVER)); + if (app.recoveredFinalState != null) { + FINAL_TRANSITION.transition(app, event); + return app.recoveredFinalState; + } else { + return RMAppState.RUNNING; + } } - // Directly call AttemptFailedTransition, since now we deem that an - // application fails because of RM restart as a normal AM failure. - - // Do not recover unmanaged applications since current recovery - // mechanism of restarting attempts does not work for them. - // This will need to be changed in work preserving recovery in which - // RM will re-connect with the running AM's instead of restarting them - - // In work-preserve restart, if attemptCount == maxAttempts, the job still - // needs to be recovered because the last attempt may still be running. - - // As part of YARN-1210, we may return ACCECPTED state waiting for AM to - // reregister or fail and remove the following code. - return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app, - event); } } @@ -958,11 +974,14 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + LOG.info("Application received attempt failed transition "); if (!app.submissionContext.getUnmanagedAM() && app.attempts.size() < app.maxAppAttempts) { + LOG.info("launching new attempt"); app.createNewAttempt(true); return initialState; } else { + LOG.info("failing the application"); app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1247bb7..dd1b4d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -179,7 +179,7 @@ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) .addTransition( RMAppAttemptState.NEW, EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED, - RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED), + RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED), RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) // Transitions from SUBMITTED state @@ -386,25 +386,6 @@ RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, RMAppAttemptEventType.STATUS_UPDATE)) - - // Transitions from RECOVERED State - .addTransition( - RMAppAttemptState.RECOVERED, - RMAppAttemptState.RECOVERED, - EnumSet.of(RMAppAttemptEventType.START, - RMAppAttemptEventType.APP_ACCEPTED, - RMAppAttemptEventType.APP_REJECTED, - RMAppAttemptEventType.EXPIRE, - RMAppAttemptEventType.LAUNCHED, - RMAppAttemptEventType.LAUNCH_FAILED, - RMAppAttemptEventType.REGISTERED, - RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - RMAppAttemptEventType.ATTEMPT_NEW_SAVED, - RMAppAttemptEventType.CONTAINER_FINISHED, - RMAppAttemptEventType.UNREGISTERED, - RMAppAttemptEventType.KILL, - RMAppAttemptEventType.STATUS_UPDATE)) .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, @@ -694,8 +675,6 @@ public void recover(RMState state) throws Exception { this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.finalStatus = attemptState.getFinalApplicationStatus(); this.startTime = attemptState.getStartTime(); - handle(new RMAppAttemptEvent(getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); } private void recoverAppAttemptCredentials(Credentials appAttemptTokens) @@ -865,11 +844,20 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + LOG.info("Recovering attempt : recoverdFinalState :" + + appAttempt.recoveredFinalState); if (appAttempt.recoveredFinalState != null) { appAttempt.progress = 1.0f; + if (appAttempt.rmContext.getRMApps() + .get(appAttempt.getAppAttemptId().getApplicationId()) + .getCurrentAppAttempt() == appAttempt) { + (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition( + appAttempt, event); + } return appAttempt.recoveredFinalState; } else { - return RMAppAttemptState.RECOVERED; + (new AMLaunchedTransition()).transition(appAttempt, event); + return RMAppAttemptState.LAUNCHED; } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index c2cf147..d3a798f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -98,21 +100,27 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception { } public RegisterNodeManagerResponse registerNode() throws Exception { + return registerNode(null); + } + + public RegisterNodeManagerResponse registerNode( + List containerStatus) throws Exception{ RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); req.setHttpPort(httpPort); Resource resource = BuilderUtils.newResource(memory, vCores); req.setResource(resource); + req.addAllContainerStatuses(containerStatus); req.setNMVersion(version); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = registrationResponse.getContainerTokenMasterKey(); this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey(); - return registrationResponse; + return registrationResponse; } - + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(new HashMap>(), isHealthy, ++responseId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index f87f689..24a2177 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -57,10 +58,14 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -83,6 +88,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mortbay.log.Log; public class TestRMRestart { @@ -104,6 +110,7 @@ public void setup() throws UnknownHostException { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); } + @SuppressWarnings("rawtypes") @Test (timeout=180000) public void testRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, @@ -252,11 +259,14 @@ public void testRMRestart() throws Exception { .getApplicationId()); // verify state machine kicked into expected states - rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED); - // verify new attempts created - Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); + // verify attempts for apps + // The app for which AM was started will wait for previous am + // container finish event to arrive. However for an application for which + // no am container was running will start new application attempt. + Assert.assertEquals(1, loadedApp1.getAppAttempts().size()); Assert.assertEquals(1, loadedApp2.getAppAttempts().size()); // verify old AM is not accepted @@ -274,8 +284,22 @@ public void testRMRestart() throws Exception { Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register - nm1 = rm2.registerNode("127.0.0.1:1234", 15120); - nm2 = rm2.registerNode("127.0.0.2:5678", 15120); + nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService()); + + List containerStatuses = new ArrayList(); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 + .getCurrentAppAttempt().getAppAttemptId(), 1), + ContainerState.COMPLETE, "Killed AM container", 143); + containerStatuses.add(containerStatus); + nm1.registerNode(containerStatuses); + nm2.registerNode(); + +// waitForRMToProcessAllEvents(eventQueue, 20); + + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); // verify no more reboot response sent hbResponse = nm1.nodeHeartbeat(true); @@ -398,6 +422,158 @@ public void testRMRestartAppRunningAMFailed() throws Exception { .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); } + @SuppressWarnings("rawtypes") + @Test + public void testRMRestartWaitForPreviousAMToFinish() throws Exception { + // testing 3 cases + // After RM restarts + // 1) New application attempt is not started until previous AM container + // finish event is reported back to RM as a part of nm registration. + // 2) If previous AM container finish event is never reported back (i.e. + // node manager on which this AM container was running also went down) in + // that case AMLivenessMonitor should time out previous attempt and start + // new attempt. + // 3) If all the stored attempts had finished then new attempt should + // be started immediately. + YarnConfiguration conf = new YarnConfiguration(this.conf); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + final MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submitting app + RMApp app1 = rm1.submitApp(200); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am1 = launchAM(app1, rm1, nm1); + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + // Fail first AM. + am1.waitForState(RMAppAttemptState.FAILED); + + // launch another AM. + MockAM am2 = launchAM(app1, rm1, nm1); + + Assert.assertEquals(1, rmAppState.size()); + Assert.assertEquals(app1.getState(), RMAppState.RUNNING); + Assert.assertEquals(app1.getAppAttempts() + .get(app1.getCurrentAppAttempt().getAppAttemptId()) + .getAppAttemptState(), RMAppAttemptState.RUNNING); + + // start new RM. + MockRM rm2 = null; + rm2 = new MockRM(conf, memStore); + rm2.start(); + + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NodeHeartbeatResponse res = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction()); + + RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + // application should be in running state + rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + // new attempt should not be started + Assert.assertEquals(2, rmApp.getAppAttempts().size()); + // am1 attempt should be in FAILED state where as am2 attempt should be in + // LAUNCHED state + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(am1.getApplicationAttemptId()) + .getAppAttemptState()); + Assert.assertEquals(RMAppAttemptState.LAUNCHED, + rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) + .getAppAttemptState()); + + List containerStatuses = new ArrayList(); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus( + BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1), + ContainerState.COMPLETE, "Killed AM container", 143); + containerStatuses.add(containerStatus); + nm1.registerNode(containerStatuses); + rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); + MockAM am3 = launchAM(rmApp, rm2, nm1); + Assert.assertEquals(3, rmApp.getAppAttempts().size()); + rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.RUNNING); + // Now restart RM ... + // Setting AMLivelinessMonitor interval to be 10 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); + MockRM rm3 = null; + rm3 = new MockRM(conf, memStore); + rm3.start(); + + // Wait for RM to process all the events as a part of rm recovery. + nm1.setResourceTrackerService(rm3.getResourceTrackerService()); + + rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId()); + // application should be in running state + rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING); + // new attempt should not be started + Assert.assertEquals(3, rmApp.getAppAttempts().size()); + // am1 and am2 attempts should be in FAILED state where as am3 should be + // in LAUNCHED state + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(am1.getApplicationAttemptId()) + .getAppAttemptState()); + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) + .getAppAttemptState()); + ApplicationAttemptId latestAppAttemptId = + rmApp.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts() + .get(latestAppAttemptId).getAppAttemptState()); + + rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED); + rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(4, rmApp.getAppAttempts().size()); + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState()); + + latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId(); + + // The 4th attempt has started but is not yet saved into RMStateStore + // It will be saved only when we launch AM. + + // submitting app but not starting AM for it. + RMApp app2 = rm3.submitApp(200); + rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(1, app2.getAppAttempts().size()); + Assert.assertEquals(0, + memStore.getState().getApplicationState().get(app2.getApplicationId()) + .getAttemptCount()); + + MockRM rm4 = null; + rm4 = new MockRM(conf, memStore); + rm4.start(); + + rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId()); + rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(4, rmApp.getAppAttempts().size()); + Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState()); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts() + .get(latestAppAttemptId).getAppAttemptState()); + + // The initial application for which an AM was not started should be in + // ACCEPTED state with one application attempt started. + app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId()); + rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(RMAppState.ACCEPTED, app2.getState()); + Assert.assertEquals(1, app2.getAppAttempts().size()); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2 + .getCurrentAppAttempt().getAppAttemptState()); + + } + @Test public void testRMRestartFailedApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); @@ -720,6 +896,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception { Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); + // Setting AMLivelinessMonitor interval to be 10 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); // start new RM MockRM rm2 = new MockRM(conf, memStore); rm2.start(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b9fc15f..d391d95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -372,10 +372,10 @@ private void testAppAttemptKilledState(Container amContainer, } /** - * {@link RMAppAttemptState#RECOVERED} + * {@link RMAppAttemptState#LAUNCHED} */ private void testAppAttemptRecoveredState() { - assertEquals(RMAppAttemptState.RECOVERED, + assertEquals(RMAppAttemptState.LAUNCHED, applicationAttempt.getAppAttemptState()); }