diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 32f44a4..a334f06 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -26,9 +29,11 @@ int getHttpPort(); Resource getResource(); String getNMVersion(); + List getContainerStatuses(); void setNodeId(NodeId nodeId); void setHttpPort(int port); void setResource(Resource resource); void setNMVersion(String version); + void addAllContainerStatuses(List containerStatuses); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index b81a590..c51fd6f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -19,11 +19,21 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; @@ -39,6 +49,7 @@ private Resource resource = null; private NodeId nodeId = null; + private List containerStatuses = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -57,6 +68,9 @@ public RegisterNodeManagerRequestProto getProto() { } private void mergeLocalToBuilder() { + if (this.containerStatuses != null) { + addFinishedContainersToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -140,6 +154,66 @@ public void setHttpPort(int httpPort) { } @Override + public List getContainerStatuses() { + initFinishedContainers(); + return containerStatuses; + } + + private void initFinishedContainers() { + if (this.containerStatuses != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainerStatusesList(); + this.containerStatuses = new ArrayList(); + for (ContainerStatusProto c : list) { + this.containerStatuses.add(convertFromProtoFormat(c)); + } + } + + @Override + public void addAllContainerStatuses(List containers) { + if (containers == null) { + return; + } + initFinishedContainers(); + this.containerStatuses.addAll(containers); + } + + private void addFinishedContainersToProto() { + maybeInitBuilder(); + builder.clearContainerStatuses(); + if (containerStatuses == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containerStatuses.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainerStatuses(it); + } + + @Override public String getNMVersion() { RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; if (!p.hasNmVersion()) { @@ -170,6 +244,11 @@ private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } - - -} + private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { + return new ContainerStatusPBImpl(c); + } + + private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { + return ((ContainerStatusPBImpl)c).getProto(); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 70434c8..c544905 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto { optional int32 http_port = 3; optional ResourceProto resource = 4; optional string nm_version = 5; + repeated ContainerStatusProto containerStatuses = 6; } message RegisterNodeManagerResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a169c12..ae06cb3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -40,6 +42,8 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -228,9 +233,20 @@ protected void resyncWithRM() { public void run() { LOG.info("Notifying ContainerManager to block new container-requests"); containerManager.setBlockNewContainerRequests(true); + List runningContainers = new ArrayList(); + runningContainers.addAll(context.getContainers().keySet()); LOG.info("Cleaning up running containers on resync"); + List killedContainerStatuses = + new ArrayList(); + for (ContainerId container : runningContainers) { + killedContainerStatuses.add(ContainerStatus.newInstance(container, + ContainerState.COMPLETE, "Killed due to RM restart", + ExitCode.FORCE_KILLED.getExitCode())); + } containerManager.cleanupContainersOnNMResync(); - ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .rebootNodeStatusUpdater(((NodeStatusUpdaterImpl) nodeStatusUpdater) + .getContainerStatuses()); } }.start(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8a06418..c0af438 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -199,13 +199,13 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - protected void rebootNodeStatusUpdater() { + protected void rebootNodeStatusUpdater(List containers) { // Interrupt the updater. this.isStopped = true; try { statusUpdater.join(); - registerWithRM(); + registerWithRM(containers); statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); this.isStopped = false; statusUpdater.start(); @@ -239,12 +239,23 @@ protected ResourceTracker getRMClient() throws IOException { @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { + registerWithRM(null); + } + + @VisibleForTesting + protected void registerWithRM(List containers) + throws YarnException, IOException { RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); request.setNMVersion(this.nodeManagerVersionId); + request.addAllContainerStatuses(containers); + if (containers != null) { + LOG.info("Registering with RM using finished containers :" + + containers); + } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); this.rmIdentifier = regNMResponse.getRMIdentifier(); @@ -328,8 +339,35 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); - int numActiveContainers = 0; - List containersStatuses = new ArrayList(); + List containersStatuses = getContainerStatuses(); + nodeStatus.setContainersStatuses(containersStatuses); + + LOG.debug(this.nodeId + " sending out status for " + + containersStatuses.size() + " containers"); + + NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); + nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); + nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); + nodeHealthStatus.setLastHealthReportTime( + healthChecker.getLastHealthReportTime()); + if (LOG.isDebugEnabled()) { + LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() + + ", " + nodeHealthStatus.getHealthReport()); + } + nodeStatus.setNodeHealthStatus(nodeHealthStatus); + + List keepAliveAppIds = createKeepAliveApplicationList(); + nodeStatus.setKeepAliveApplications(keepAliveAppIds); + + return nodeStatus; + } + + /* + * It will return current container statuses. If any container has + * COMPLETED then it will be removed from context. + */ + public List getContainerStatuses() { + List containerStatuses = new ArrayList(); for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); @@ -339,8 +377,7 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { // Clone the container to send it to the RM org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); - containersStatuses.add(containerStatus); - ++numActiveContainers; + containerStatuses.add(containerStatus); if (LOG.isDebugEnabled()) { LOG.debug("Sending out status for container: " + containerStatus); } @@ -356,26 +393,7 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { LOG.info("Removed completed container " + containerId); } } - nodeStatus.setContainersStatuses(containersStatuses); - - LOG.debug(this.nodeId + " sending out status for " - + numActiveContainers + " containers"); - - NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); - nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); - nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); - nodeHealthStatus.setLastHealthReportTime( - healthChecker.getLastHealthReportTime()); - if (LOG.isDebugEnabled()) { - LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() - + ", " + nodeHealthStatus.getHealthReport()); - } - nodeStatus.setNodeHealthStatus(nodeHealthStatus); - - List keepAliveAppIds = createKeepAliveApplicationList(); - nodeStatus.setKeepAliveApplications(keepAliveAppIds); - - return nodeStatus; + return containerStatuses; } private void trackAppsForKeepAlive(List appIds) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3091c4a..fdd2f56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -371,17 +373,26 @@ public void cleanupContainersOnNMResync() { this.handle(new CMgrCompletedContainersEvent(containerIds, CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC)); - while (!containers.isEmpty()) { - try { - Thread.sleep(1000); - nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); - } catch (InterruptedException ex) { - LOG.warn("Interrupted while sleeping on container kill on resync", ex); + + boolean allConainersCompleted = false; + while (!containers.isEmpty() && !allConainersCompleted) { + allConainersCompleted = true; + for (Entry container : containers.entrySet()) { + if (container.getValue().cloneAndGetContainerStatus().getState() + != ContainerState.COMPLETE) { + allConainersCompleted = false; + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on resync", + ex); + } + break; + } } } - // All containers killed - if (containers.isEmpty()) { + if (allConainersCompleted) { LOG.info("All containers in DONE state"); } else { LOG.info("Done waiting for containers to be killed. Still alive: " + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index ce865e3..337e193 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,21 +31,16 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index f2090fa..4454d26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; @@ -174,20 +175,22 @@ public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher, } @Override - protected void registerWithRM() throws YarnException, IOException { - super.registerWithRM(); + protected void registerWithRM(List containers) + throws YarnException, IOException { + super.registerWithRM(containers); registrationCount++; } @Override - protected void rebootNodeStatusUpdater() { + protected void + rebootNodeStatusUpdater(List finishedContainers) { ConcurrentMap containers = getNMContext().getContainers(); try { // ensure that containers are empty before restart nodeStatusUpdater Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); + super.rebootNodeStatusUpdater(new ArrayList()); syncBarrier.await(); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { @@ -252,7 +255,8 @@ public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher, } @Override - protected void rebootNodeStatusUpdater() { + protected void rebootNodeStatusUpdater( + List finishedContainers) { ConcurrentMap containers = getNMContext().getContainers(); @@ -260,7 +264,7 @@ protected void rebootNodeStatusUpdater() { try { // ensure that containers are empty before restart nodeStatusUpdater Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); + super.rebootNodeStatusUpdater(new ArrayList()); // After this point new containers are free to be launched, except // containers from previous RM // Wait here so as to sync with the main test thread. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index 2f0565d..4d39677 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.spy; import java.io.BufferedReader; import java.io.File; @@ -73,7 +72,6 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 23f8754..a870a4e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -29,6 +29,10 @@ import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; @@ -183,6 +190,32 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); + if (!request.getContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getContainerStatuses()); + for (ContainerStatus containerStatus : request.getContainerStatuses()) { + ApplicationAttemptId appAttemptId = + containerStatus.getContainerId().getApplicationAttemptId(); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (rmApp != null) { + RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + if (rmAppAttempt.getMasterContainer().getId() + .equals(containerStatus.getContainerId())) { + // sending master container finished event. + RMAppAttemptContainerFinishedEvent evt = + new RMAppAttemptContainerFinishedEvent(appAttemptId, + containerStatus); + rmContext.getDispatcher().getEventHandler().handle(evt); + } + } else { + LOG.error("Received finished container :" + + containerStatus.getContainerId() + + " for non existing application :" + + appAttemptId.getApplicationId()); + } + } + } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083c..cce6a81 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -131,7 +132,7 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppSavingTransition()) - .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, + .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, RMAppState.FINAL_SAVING), RMAppEventType.RECOVER, new RMAppRecoveredTransition()) @@ -664,22 +665,37 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (app.recoveredFinalState != null) { FINAL_TRANSITION.transition(app, event); return app.recoveredFinalState; - } - // Directly call AttemptFailedTransition, since now we deem that an - // application fails because of RM restart as a normal AM failure. + } else { + // check if any application attempt was running + // if yes then don't start new application attempt. + for (Entry attempt : app.attempts + .entrySet()) { + boolean appAttemptInFinalState = + RMAppAttemptImpl.isAttemptInFinalState(attempt.getValue()); + LOG.info("attempt :" + attempt.getKey().toString() + + " in final state :" + appAttemptInFinalState); + if (!appAttemptInFinalState) { + // One of the application attempt is not in final state. + // Not starting new application attempt. + return RMAppState.RUNNING; + } + } + // Directly call AttemptFailedTransition, since now we deem that an + // application fails because of RM restart as a normal AM failure. - // Do not recover unmanaged applications since current recovery - // mechanism of restarting attempts does not work for them. - // This will need to be changed in work preserving recovery in which - // RM will re-connect with the running AM's instead of restarting them + // Do not recover unmanaged applications since current recovery + // mechanism of restarting attempts does not work for them. + // This will need to be changed in work preserving recovery in which + // RM will re-connect with the running AM's instead of restarting them - // In work-preserve restart, if attemptCount == maxAttempts, the job still - // needs to be recovered because the last attempt may still be running. + // In work-preserve restart, if attemptCount == maxAttempts, the job still + // needs to be recovered because the last attempt may still be running. - // As part of YARN-1210, we may return ACCECPTED state waiting for AM to - // reregister or fail and remove the following code. - return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app, - event); + // As part of YARN-1210, we may return ACCECPTED state waiting for AM to + // reregister or fail and remove the following code. + return new AttemptFailedTransition(RMAppState.RUNNING).transition(app, + event); + } } } @@ -958,11 +974,14 @@ public AttemptFailedTransition(RMAppState initialState) { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + LOG.info("Application received attempt failed transition "); if (!app.submissionContext.getUnmanagedAM() && app.attempts.size() < app.maxAppAttempts) { + LOG.info("launching new attempt"); app.createNewAttempt(true); return initialState; } else { + LOG.info("failing the application"); app.rememberTargetTransitionsAndStoreState(event, new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, RMAppState.FAILED); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1247bb7..d2b67d9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -179,7 +179,7 @@ new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) .addTransition( RMAppAttemptState.NEW, EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED, - RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED), + RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED), RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) // Transitions from SUBMITTED state @@ -386,25 +386,6 @@ RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, RMAppAttemptEventType.STATUS_UPDATE)) - - // Transitions from RECOVERED State - .addTransition( - RMAppAttemptState.RECOVERED, - RMAppAttemptState.RECOVERED, - EnumSet.of(RMAppAttemptEventType.START, - RMAppAttemptEventType.APP_ACCEPTED, - RMAppAttemptEventType.APP_REJECTED, - RMAppAttemptEventType.EXPIRE, - RMAppAttemptEventType.LAUNCHED, - RMAppAttemptEventType.LAUNCH_FAILED, - RMAppAttemptEventType.REGISTERED, - RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - RMAppAttemptEventType.ATTEMPT_NEW_SAVED, - RMAppAttemptEventType.CONTAINER_FINISHED, - RMAppAttemptEventType.UNREGISTERED, - RMAppAttemptEventType.KILL, - RMAppAttemptEventType.STATUS_UPDATE)) .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, @@ -865,11 +846,14 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + LOG.info("Recovering attempt : recoverdFinalState :" + + appAttempt.recoveredFinalState); if (appAttempt.recoveredFinalState != null) { appAttempt.progress = 1.0f; return appAttempt.recoveredFinalState; } else { - return RMAppAttemptState.RECOVERED; + (new AMLaunchedTransition()).transition(appAttempt, event); + return RMAppAttemptState.LAUNCHED; } } } @@ -1552,4 +1536,11 @@ private void removeCredentials(RMAppAttemptImpl appAttempt) { private static String sanitizeTrackingUrl(String url) { return (url == null || url.trim().isEmpty()) ? "N/A" : url; } + + public static boolean isAttemptInFinalState(RMAppAttempt attempt) { + return attempt.getAppAttemptState() == RMAppAttemptState.FAILED || + attempt.getAppAttemptState() == RMAppAttemptState.FINISHED || + attempt.getAppAttemptState() == RMAppAttemptState.KILLED; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b9fc15f..d391d95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -372,10 +372,10 @@ private void testAppAttemptKilledState(Container amContainer, } /** - * {@link RMAppAttemptState#RECOVERED} + * {@link RMAppAttemptState#LAUNCHED} */ private void testAppAttemptRecoveredState() { - assertEquals(RMAppAttemptState.RECOVERED, + assertEquals(RMAppAttemptState.LAUNCHED, applicationAttempt.getAppAttemptState()); }