diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 9f1655f..740ff29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -402,8 +402,9 @@ private void recoverContainer(RecoveredContainerState rcs) LOG.info("Recovering " + containerId + " in state " + rcs.getStatus() + " with exit code " + rcs.getExitCode()); - if (context.getApplications().containsKey(appId)) { - recoverActiveContainer(launchContext, token, rcs); + Application app = context.getApplications().get(appId); + if (app != null) { + recoverActiveContainer(app, launchContext, token, rcs); if (rcs.getRecoveryType() == RecoveredContainerType.KILL) { dispatcher.getEventHandler().handle( new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED, @@ -423,7 +424,7 @@ private void recoverContainer(RecoveredContainerState rcs) * Recover a running container. */ @SuppressWarnings("unchecked") - protected void recoverActiveContainer( + protected void recoverActiveContainer(Application app, ContainerLaunchContext launchContext, ContainerTokenIdentifier token, RecoveredContainerState rcs) throws IOException { Credentials credentials = YarnServerSecurityUtils.parseCredentials( @@ -431,8 +432,7 @@ protected void recoverActiveContainer( Container container = new ContainerImpl(getConfig(), dispatcher, launchContext, credentials, metrics, token, context, rcs); context.getContainers().put(token.getContainerID(), container); - dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent( - container)); + app.handle(new ApplicationContainerInitEvent(container)); } private void waitForRecoveredContainers() throws InterruptedException { @@ -1286,6 +1286,10 @@ protected void stopContainerInternal(ContainerId containerID) + " is not handled by this NodeManager"); } } else { + if (container.isRecovering()) { + throw RPCUtil.getRemoteException("Container " + containerIDStr + + " is recovering, try later"); + } context.getNMStateStore().storeContainerKilled(containerID); container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER, "Container killed by the ApplicationMaster."); @@ -1455,6 +1459,21 @@ public void handle(ContainerManagerEvent event) { + " FINISH_APPS event"); continue; } + + boolean shouldDropEvent = false; + for (Container container : app.getContainers().values()) { + if (container.isRecovering()) { + LOG.info("drop FINISH_APPS event to " + appID + ",because " + + "container " + container.getContainerId() + + " is recovering"); + shouldDropEvent = true; + break; + } + } + if (shouldDropEvent) { + continue; + } + String diagnostic = ""; if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) { diagnostic = "Application killed on shutdown"; @@ -1469,10 +1488,31 @@ public void handle(ContainerManagerEvent event) { case FINISH_CONTAINERS: CMgrCompletedContainersEvent containersFinishedEvent = (CMgrCompletedContainersEvent) event; - for (ContainerId container : containersFinishedEvent + for (ContainerId containerId : containersFinishedEvent .getContainersToCleanup()) { - this.dispatcher.getEventHandler().handle( - new ContainerKillEvent(container, + ApplicationId appId = containerId.getApplicationAttemptId().getApplicationId(); + Application app = this.context.getApplications().get(appId); + if (app == null) { + LOG.warn("couldn't find app " + appId + " while processing" + + " FINISH_CONTAINERS event"); + continue; + } + + Container container = app.getContainers().get(containerId); + if (container == null) { + LOG.warn("couldn't find container " + containerId + + " while processing FINISH_CONTAINERS event"); + continue; + } + + if (container.isRecovering()) { + LOG.info("drop FINISH_CONTAINERS event to " + containerId + + " because container is recovering"); + continue; + } + + this.dispatcher.getEventHandler().handle( + new ContainerKillEvent(containerId, ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, "Container Killed by ResourceManager")); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 112b43a..444581c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -20,8 +20,8 @@ import java.io.IOException; import java.util.EnumSet; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -89,7 +89,7 @@ private LogAggregationContext logAggregationContext; Map containers = - new HashMap(); + new ConcurrentHashMap<>(); /** * The timestamp when the log aggregation has started for this application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 8004f33..bd3f06d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -92,4 +92,6 @@ void sendLaunchEvent(); void sendKillEvent(int exitStatus, String description); + + boolean isRecovering(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index cae30cd..0583c48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -1756,4 +1756,11 @@ public boolean canRollback() { public void commitUpgrade() { this.reInitContext = null; } + + @Override + public boolean isRecovering() { + boolean isRecovering = (getContainerState() == ContainerState.NEW && + recoveredStatus != RecoveredContainerStatus.REQUESTED); + return isRecovering; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 686a0d9..022baea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -230,4 +230,9 @@ public void sendLaunchEvent() { public void sendKillEvent(int exitStatus, String description) { } + + @Override + public boolean isRecovering() { + return false; + } }