diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java index 4efd8c1..2d83cfd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java @@ -32,10 +32,13 @@ /** Running container */ RUNNING, - + /** Completed container */ COMPLETE, /** Scheduled (awaiting resources) at the NM. */ - SCHEDULED + SCHEDULED, + + /** Paused at the NM. */ + PAUSED } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index cb37126..d7599ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -83,6 +83,7 @@ enum ContainerStateProto { C_RUNNING = 2; C_COMPLETE = 3; C_SCHEDULED = 4; + C_PAUSED = 5; } message ContainerProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index f880506..b4c667e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -684,6 +684,28 @@ public void deactivateContainer(ContainerId containerId) { } /** + * Pause the container. The default implementation is to raise a kill event. + * Specific executor implementations can override this behavior. + * @param container + * the Container + */ + public void pauseContainer(Container container) { + LOG.info(container.getContainerId() + " doesn't support pausing."); + throw new UnsupportedOperationException(); + } + + /** + * Resume the container from pause state. The default implementation ignores + * this event. Specific implementations can override this behavior. + * @param container + * the Container + */ + public void resumeContainer(Container container) { + LOG.info(container.getContainerId() + " doesn't support resume."); + throw new UnsupportedOperationException(); + } + + /** * Get the process-identifier for the container. * * @param containerID the container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index afea0e6..1475435 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -27,6 +27,8 @@ CONTAINER_DONE, REINITIALIZE_CONTAINER, ROLLBACK_REINIT, + PAUSE_CONTAINER, + RESUME_CONTAINER, // DownloadManager CONTAINER_INITED, @@ -38,5 +40,7 @@ CONTAINER_LAUNCHED, CONTAINER_EXITED_WITH_SUCCESS, CONTAINER_EXITED_WITH_FAILURE, - CONTAINER_KILLED_ON_REQUEST + CONTAINER_KILLED_ON_REQUEST, + CONTAINER_PAUSED, + CONTAINER_RESUMED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 4a6be32..16918f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -298,6 +298,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, new KillOnNewTransition()) + .addTransition(ContainerState.NEW, ContainerState.DONE, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, @@ -313,6 +315,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillBeforeRunningTransition()) + .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -326,7 +330,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, // container not launched so kill is a no-op .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // container cleanup triggers a release of all resources // regardless of whether they were localized or not // LocalizedResource handles release event in all states @@ -382,6 +387,76 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerState.EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.PAUSING, + ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition()) + + // From PAUSING State + .addTransition(ContainerState.PAUSING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.PAUSING, ContainerState.PAUSED, + ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition()) + // In case something goes wrong then container will exit from the + // PAUSING state + .addTransition(ContainerState.PAUSING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS) + .addTransition(ContainerState.PAUSING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + + // From PAUSED State + .addTransition(ContainerState.PAUSED, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.PAUSED, ContainerState.PAUSED, + ContainerEventType.PAUSE_CONTAINER) + .addTransition(ContainerState.PAUSED, ContainerState.RESUMING, + ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition()) + // In case something goes wrong then container will exit from the + // PAUSED state + .addTransition(ContainerState.PAUSED, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + .addTransition(ContainerState.PAUSED, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) + + // From RESUMING State + .addTransition(ContainerState.RESUMING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RESUMING, ContainerState.RUNNING, + ContainerEventType.CONTAINER_RESUMED) + .addTransition(ContainerState.RESUMING, ContainerState.RESUMING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + // In case something goes wrong then container will exit from the + // RESUMING state + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + new KilledExternallyTransition()) + .addTransition(ContainerState.RESUMING, + ContainerState.EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) // From REINITIALIZING State .addTransition(ContainerState.REINITIALIZING, @@ -405,6 +480,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) .addTransition(ContainerState.REINITIALIZING, ContainerState.SCHEDULED, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, @@ -422,6 +499,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) + .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition()) // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, @@ -433,7 +512,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // From EXITED_WITH_FAILURE State .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, @@ -445,7 +525,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) // From KILLING State. .addTransition(ContainerState.KILLING, @@ -480,6 +561,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.KILLING, ContainerState.KILLING, ContainerEventType.CONTAINER_LAUNCHED) + .addTransition(ContainerState.KILLING, + ContainerState.KILLING, + ContainerEventType.PAUSE_CONTAINER) // From CONTAINER_CLEANEDUP_AFTER_KILL State. .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, @@ -495,11 +579,13 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, EnumSet.of(ContainerEventType.KILL_CONTAINER, ContainerEventType.RESOURCE_FAILED, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, - ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.PAUSE_CONTAINER)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of(ContainerEventType.KILL_CONTAINER, + ContainerEventType.PAUSE_CONTAINER)) .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.INIT_CONTAINER) .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -534,7 +620,11 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, case KILLING: case CONTAINER_CLEANEDUP_AFTER_KILL: case CONTAINER_RESOURCES_CLEANINGUP: + case PAUSING: return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING; + case PAUSED: + case RESUMING: + return org.apache.hadoop.yarn.api.records.ContainerState.PAUSED; case DONE: default: return org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE; @@ -1473,6 +1563,26 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transitions upon receiving PAUSE_CONTAINER. + * - LOCALIZED -> KILLING. + * - REINITIALIZING -> KILLING. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class KillOnPauseTransition implements + SingleArcTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Kill the process/process-grp + container.setIsReInitializing(false); + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + } + + /** * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL * upon receiving CONTAINER_KILLED_ON_REQUEST. */ @@ -1661,6 +1771,57 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transitions upon receiving PAUSE_CONTAINER. + * - RUNNING -> PAUSED + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class PauseContainerTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Pause the process/process-grp if it is supported by the container + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.PAUSE_CONTAINER)); + ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event; + container.addDiagnostics(pauseEvent.getDiagnostic(), "\n"); + } + } + + /** + * Transitions upon receiving PAUSED_CONTAINER. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class PausedContainerTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Container was PAUSED so tell the scheduler + container.dispatcher.getEventHandler().handle( + new ContainerSchedulerEvent(container, + ContainerSchedulerEventType.CONTAINER_PAUSED)); + } + } + + /** + * Transitions upon receiving RESUME_CONTAINER. + * - PAUSED -> RUNNING + */ + @SuppressWarnings("unchecked") // dispatcher not typed + static class ResumeContainerTransition implements + SingleArcTransition { + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + // Pause the process/process-grp if it is supported by the container + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.RESUME_CONTAINER)); + ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event; + container.addDiagnostics(resumeEvent.getDiagnostic(), "\n"); + } + } + @Override public void handle(ContainerEvent event) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java new file mode 100644 index 0000000..898304e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * ContainerEvent for ContainerEventType.PAUSE_CONTAINER. + */ +public class ContainerPauseEvent extends ContainerEvent { + + private final String diagnostic; + + public ContainerPauseEvent(ContainerId cId, + String diagnostic) { + super(cId, ContainerEventType.PAUSE_CONTAINER); + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java new file mode 100644 index 0000000..d7c9e9a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * ContainerEvent for ContainerEventType.RESUME_CONTAINER. + */ +public class ContainerResumeEvent extends ContainerEvent { + + private final String diagnostic; + + public ContainerResumeEvent(ContainerId cId, + String diagnostic) { + super(cId, ContainerEventType.RESUME_CONTAINER); + this.diagnostic = diagnostic; + } + + public String getDiagnostic() { + return this.diagnostic; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 91d1356..7c3fea8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -21,5 +21,6 @@ public enum ContainerState { NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, - CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE + CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE, + PAUSING, PAUSED, RESUMING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 823457f..2516e11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.util.ConverterUtils; public class ContainerLaunch implements Callable { @@ -103,8 +105,10 @@ private final Configuration conf; private final Context context; private final ContainerManagerImpl containerManager; - + protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false); + protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false); + protected AtomicBoolean completed = new AtomicBoolean(false); private volatile boolean killedBeforeStart = false; @@ -746,6 +750,90 @@ public static Signal translateCommandToSignal( } /** + * Pause the container. + * Cancels the launch if the container isn't launched yet. Otherwise asks the + * executor to pause the container. + * @throws IOException in case of errors. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void pauseContainer() throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Pausing the container " + containerIdStr); + + // The pause event is only handled if the container is in the running state + // (the container state machine), so we don't check for + // shouldLaunchContainer over here + + if (!shouldPauseContainer.compareAndSet(false, true)) { + LOG.info("Container " + containerId + " not paused as " + + "resume already called"); + return; + } + + try { + // Pause the container + exec.pauseContainer(container); + + // PauseContainer is a blocking call. We are here almost means the + // container is paused, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_PAUSED)); + } catch (Exception e) { + String message = + "Exception when trying to pause container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.info(message); + container.handle(new ContainerKillEvent(container.getContainerId(), + ContainerExitStatus.PREEMPTED, "Container preempted as there was " + + " an exception in pausing it.")); + } + } + + /** + * Resume the container. + * Cancels the launch if the container isn't launched yet. Otherwise asks the + * executor to pause the container. + * @throws IOException in case of error. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + public void resumeContainer() throws IOException { + ContainerId containerId = container.getContainerId(); + String containerIdStr = containerId.toString(); + LOG.info("Resuming the container " + containerIdStr); + + // The resume event is only handled if the container is in a paused state + // so we don't check for the launched flag here. + + // paused flag will be set to true if process already paused + boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true); + if (!alreadyPaused) { + LOG.info("Container " + containerIdStr + " not paused." + + " No resume necessary"); + return; + } + + // If the container has already started + try { + exec.resumeContainer(container); + // ResumeContainer is a blocking call. We are here almost means the + // container is resumed, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_RESUMED)); + } catch (Exception e) { + String message = + "Exception when trying to resume container " + containerIdStr + + ": " + StringUtils.stringifyException(e); + LOG.info(message); + container.handle(new ContainerKillEvent(container.getContainerId(), + ContainerExitStatus.PREEMPTED, "Container preempted as there was " + + " an exception in pausing it.")); + } + } + + /** * Loop through for a time-bounded interval waiting to * read the process id from a file generated by a running process. * @param pidFilePath File from which to read the process id diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index d4a7bfd..eb6eaf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import com.google.common.annotations.VisibleForTesting; @@ -170,6 +172,36 @@ public void handle(ContainersLauncherEvent event) { + " with command " + signalEvent.getCommand()); } break; + case PAUSE_CONTAINER: + ContainerLaunch launchedContainer = running.get(containerId); + if (launchedContainer == null) { + // Container not launched. So nothing needs to be done. + return; + } + + // Pause the container + try { + launchedContainer.pauseContainer(); + } catch (Exception e) { + LOG.info("Got exception while pausing container: " + + StringUtils.stringifyException(e)); + } + break; + case RESUME_CONTAINER: + ContainerLaunch launchCont = running.get(containerId); + if (launchCont == null) { + // Container not launched. So nothing needs to be done. + return; + } + + // Resume the container. + try { + launchCont.resumeContainer(); + } catch (Exception e) { + LOG.info("Got exception while resuming container: " + + StringUtils.stringifyException(e)); + } + break; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index 380a032..1054e06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -25,4 +25,7 @@ CLEANUP_CONTAINER, // The process(grp) itself. CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself. SIGNAL_CONTAINER, + PAUSE_CONTAINER, + RESUME_CONTAINER + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java index 086cb9b..9ff731f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java @@ -26,4 +26,5 @@ CONTAINER_COMPLETED, // Producer: Node HB response - RM has asked to shed the queue SHED_QUEUED_CONTAINERS, + CONTAINER_PAUSED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 33f4609..8909088 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -103,6 +103,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; public class TestContainer { @@ -207,6 +208,42 @@ public void testExternalKill() throws Exception { @Test @SuppressWarnings("unchecked") // mocked generic + public void testContainerPauseAndResume() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(13, 314159265358979L, 4344, "yak"); + wc.initContainer(); + wc.localizeResources(); + int running = metrics.getRunningContainers(); + wc.launchContainer(); + assertEquals(running + 1, metrics.getRunningContainers()); + reset(wc.localizerBus); + wc.pauseContainer(); + assertEquals(ContainerState.PAUSED, + wc.c.getContainerState()); + wc.resumeContainer(); + assertEquals(ContainerState.RUNNING, + wc.c.getContainerState()); + wc.containerKilledOnRequest(); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + assertNull(wc.c.getLocalizedResources()); + verifyCleanupCall(wc); + int failed = metrics.getFailedContainers(); + wc.containerResourcesCleanup(); + assertEquals(ContainerState.DONE, wc.c.getContainerState()); + assertEquals(failed + 1, metrics.getFailedContainers()); + assertEquals(running, metrics.getRunningContainers()); + } + finally { + if (wc != null) { + wc.finished(); + } + } + } + + @Test + @SuppressWarnings("unchecked") // mocked generic public void testCleanupOnFailure() throws Exception { WrappedContainer wc = null; try { @@ -955,6 +992,8 @@ protected void scheduleContainer(Container container) { NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater); ContainerExecutor executor = mock(ContainerExecutor.class); + Mockito.doNothing().when(executor).pauseContainer(any(Container.class)); + Mockito.doNothing().when(executor).resumeContainer(any(Container.class)); launcher = new ContainersLauncher(context, dispatcher, executor, null, null); // create a mock ExecutorService, which will not really launch @@ -1143,6 +1182,18 @@ public void killContainer() { drainDispatcherEvents(); } + public void pauseContainer() { + c.handle(new ContainerPauseEvent(cId, + "PauseRequest")); + drainDispatcherEvents(); + } + + public void resumeContainer() { + c.handle(new ContainerResumeEvent(cId, + "ResumeRequest")); + drainDispatcherEvents(); + } + public void containerKilledOnRequest() { int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER; String diagnosticMsg = "Container completed with exit code " + exitCode; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index ab13000..16fa2ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1395,7 +1395,8 @@ private void handleContainerStatus(List containerStatuses) { // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING || - remoteContainer.getState() == ContainerState.SCHEDULED) { + remoteContainer.getState() == ContainerState.SCHEDULED || + remoteContainer.getState() == ContainerState.PAUSED) { ++numRemoteRunningContainers; if (!launchedContainers.contains(containerId)) { // Just launched container. RM knows about it the first time.