diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9a0549d..59b69ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -89,7 +89,7 @@ public DefaultContainerExecutor() { } protected void copyFile(Path src, Path dst, String owner) throws IOException { - lfs.util().copy(src, dst); + lfs.util().copy(src, dst, false, true); } protected void setScriptExecutable(Path script, String owner) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 52d8566..160c467 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -110,6 +110,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerReInitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; @@ -1535,10 +1537,13 @@ public ResourceLocalizationResponse localize( } if (!container.getContainerState() .equals(org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING)) { + containermanager.container.ContainerState.RUNNING) + || container.isReInitializing()) { throw new YarnException( - containerId + " is at " + container.getContainerState() - + " state. Not able to localize new resources."); + containerId + " is at [" + container.getContainerState() + + "] state or container might be upgrading ? [" + + container.isReInitializing() + "]." + + " Not able to localize new resources."); } try { @@ -1556,6 +1561,29 @@ public ResourceLocalizationResponse localize( return ResourceLocalizationResponse.newInstance(); } + public void upgradeContainer(ContainerId containerId, + ContainerLaunchContext upgradeLaunchContext) throws YarnException { + preUpgradeCheck(containerId, "UPGRADE"); + dispatcher.getEventHandler().handle( + new ContainerReInitEvent(containerId, upgradeLaunchContext)); + } + + private void preUpgradeCheck(ContainerId containerId, String op) + throws YarnException { + Container container = context.getContainers().get(containerId); + if (container == null) { + throw new YarnException("Specified " + containerId + " does not exist!"); + } + if (!container.getContainerState() + .equals(org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING) + || container.isReInitializing()) { + throw new YarnException("Cannot perform " + op + " on [" + containerId + + "]. Current state is [" + container.getContainerState() + ", " + + "isUgrading=" + container.isReInitializing() + "]."); + } + } + @SuppressWarnings("unchecked") private void internalSignalToContainer(SignalContainerRequest request, String sentBy) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index c4cea18..23b4109 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -76,4 +76,6 @@ Priority getPriority(); ResourceSet getResourceSet(); + + boolean isReInitializing(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 5622f8c..1c56cd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -25,6 +25,7 @@ KILL_CONTAINER, UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, + REINITIALIZE_CONTAINER, // DownloadManager CONTAINER_INITED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index ce9e581..7966e8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -90,13 +91,22 @@ public class ContainerImpl implements Container { + private final static class ReInitializationContext { + private ResourceSet resourceSet = new ResourceSet(); + private final ContainerLaunchContext newLaunchContext; + + private ReInitializationContext(ContainerLaunchContext newLaunchContext) { + this.newLaunchContext = newLaunchContext; + } + } + private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; - private final ContainerLaunchContext launchContext; + private volatile ContainerLaunchContext launchContext; private final ContainerTokenIdentifier containerTokenIdentifier; private final ContainerId containerId; private volatile Resource resource; @@ -117,6 +127,7 @@ private String logDir; private String host; private String ips; + private volatile ReInitializationContext reInitContext; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -296,10 +307,14 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.RELAUNCHING, + ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new RetryFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.REINITIALIZE_CONTAINER, + new ReInitializeContainerTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.RESOURCE_LOCALIZED, new ResourceLocalizedWhileRunningTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, @@ -310,7 +325,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) - .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, + .addTransition(ContainerState.RUNNING, + EnumSet.of(ContainerState.EXITED_WITH_FAILURE, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) @@ -822,17 +839,66 @@ public ContainerState transition(ContainerImpl container, } /** + * Transition to start the Re-Initialization process. + */ + static class ReInitializeContainerTransition extends ContainerTransition { + + @SuppressWarnings("unchecked") + @Override + public void transition(ContainerImpl container, ContainerEvent event) { + container.reInitContext = createReInitContext(container, event); + try { + Map> + req = container.reInitContext.resourceSet.addResources( + getResourcesToLocalize(event)); + if (!req.isEmpty()) { + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationRequestEvent(container, req)); + } else { + // We are not waiting on any resources, so... + // Kill the current container. + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + } catch (Exception e) { + LOG.error("Container [" + container.getContainerId() + "]" + + " re-initialization failure..", e); + container.addDiagnostics("Error re-initializing due to" + + "[" + e.getMessage() + "]"); + } + } + + protected ReInitializationContext createReInitContext( + ContainerImpl container, ContainerEvent event) { + return new ReInitializationContext( + ((ContainerReInitEvent)event).getReInitLaunchContext()); + } + + protected Map getResourcesToLocalize( + ContainerEvent event) { + return ((ContainerReInitEvent)event).getReInitLaunchContext() + .getLocalResources(); + } + } + + /** * Resource is localized while the container is running - create symlinks */ static class ResourceLocalizedWhileRunningTransition extends ContainerTransition { + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; - List links = container.resourceSet - .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation()); + List links = new ArrayList<>(); + checkAndUpdatePending(rsrcEvent, container.resourceSet, links); + if (container.isReInitializing()) { + checkAndUpdatePending( + rsrcEvent, container.reInitContext.resourceSet, links); + } // creating symlinks. for (String link : links) { try { @@ -852,6 +918,29 @@ public void transition(ContainerImpl container, ContainerEvent event) { LOG.error(message, e); } } + if (container.isReInitializing()) { + // Check if all ResourceLocalization has completed + if (container.reInitContext.resourceSet.getPendingResources() + .isEmpty()) { + // Kill the current container. + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + } + } + + private void checkAndUpdatePending( + ContainerResourceLocalizedEvent rsrcEvent, + ResourceSet resourceSet, List links) { + if (resourceSet.getPendingResources().containsKey( + rsrcEvent.getResource())) { + List l = resourceSet.resourceLocalized( + rsrcEvent.getResource(), rsrcEvent.getLocation()); + if (l != null) { + links.addAll(l); + } + } } } @@ -868,6 +957,14 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.resourceSet .resourceLocalizationFailed(failedEvent.getResource()); container.addDiagnostics(failedEvent.getDiagnosticMessage()); + if (container.isReInitializing() && + container.reInitContext.resourceSet.getPendingResources() + .containsKey(failedEvent.getResource())) { + LOG.error("Container [" + container.getContainerId() + "] Re-init" + + " failed !! Resource [" + failedEvent.getResource() + "] could" + + " not be localized !!"); + container.reInitContext = null; + } } } @@ -883,6 +980,10 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.metrics.runningContainer(); container.wasLaunched = true; + if (container.isReInitializing()) { + container.reInitContext = null; + } + if (container.recoveredAsKilled) { LOG.info("Killing " + container.containerId + " due to recovered as killed"); @@ -959,7 +1060,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** - * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon * CONTAINER_EXITED_WITH_FAILURE state. **/ @SuppressWarnings("unchecked") // dispatcher not typed @@ -982,7 +1083,9 @@ public ContainerState transition(final ContainerImpl container, container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } - if (container.shouldRetry(container.exitCode)) { + // If Container died during an upgrade, dont bother retrying. + if (container.shouldRetry(container.exitCode) && + !container.isReInitializing()) { if (container.remainingRetryAttempts > 0) { container.remainingRetryAttempts--; try { @@ -991,7 +1094,7 @@ public ContainerState transition(final ContainerImpl container, } catch (IOException e) { LOG.warn( "Unable to update remainingRetryAttempts in state store for " - + container.getContainerId(), e); + + container.getContainerId(), e); } } LOG.info("Relaunching Container " + container.getContainerId() @@ -1053,17 +1156,31 @@ public boolean shouldRetry(int errorCode) { } /** - * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST + * Transition to EXITED_WITH_FAILURE or LOCALIZED (if its an upgrade + * or rollback) */ - static class KilledExternallyTransition extends ExitedWithFailureTransition { - KilledExternallyTransition() { - super(true); - } + static class KilledExternallyTransition implements + MultipleArcTransition { @Override - public void transition(ContainerImpl container, ContainerEvent event) { - super.transition(container, event); - container.addDiagnostics("Killed by external signal\n"); + public ContainerState transition(ContainerImpl container, + ContainerEvent event) { + if (!container.isReInitializing()) { + new ExitedWithFailureTransition(true).transition(container, event); + container.addDiagnostics("Killed by external signal\n"); + return ContainerState.EXITED_WITH_FAILURE; + } else { + LOG.info("Relaunching Container [" + container.getContainerId() + + "] for upgrade !!"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + + container.launchContext = container.reInitContext.newLaunchContext; + container.resourceSet.merge(container.reInitContext.resourceSet); + + container.sendLaunchEvent(); + return ContainerState.LOCALIZED; + } } } @@ -1129,8 +1246,13 @@ public void transition(ContainerImpl container, ContainerEvent event) { @SuppressWarnings("unchecked") // dispatcher not typed static class KillTransition implements SingleArcTransition { + + @SuppressWarnings("unchecked") @Override public void transition(ContainerImpl container, ContainerEvent event) { + // This is an explicit kill.. so stop any upgrade + container.reInitContext = null; + // Kill the process/process-grp container.dispatcher.getEventHandler().handle( new ContainersLauncherEvent(container, @@ -1385,4 +1507,9 @@ ContainerRetryContext getContainerRetryContext() { public Priority getPriority() { return containerTokenIdentifier.getPriority(); } + + @Override + public boolean isReInitializing() { + return reInitContext != null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java new file mode 100644 index 0000000..87beef9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerReInitEvent.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; + +/** + * ContainerEvent sent by ContainerManager to ContainerImpl to + * re-initiate Container. + */ +public class ContainerReInitEvent extends ContainerEvent { + + private final ContainerLaunchContext reInitLaunchContext; + + /** + * Container Re-Init Event. + * @param cID Container Id + * @param upgradeContext Upgrade context + */ + public ContainerReInitEvent(ContainerId cID, + ContainerLaunchContext upgradeContext) { + super(cID, ContainerEventType.REINITIALIZE_CONTAINER); + this.reInitLaunchContext = upgradeContext; + } + + /** + * Get the Launch Context to be used for upgrade. + * @return ContainerLaunchContext + */ + public ContainerLaunchContext getReInitLaunchContext() { + return reInitLaunchContext; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java index a41ee20..5f4ebd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java @@ -132,6 +132,20 @@ } } + public void merge(ResourceSet upgradeResourceSet) { + this.localizedResources.putAll(upgradeResourceSet.localizedResources); + + // TODO : START : Should we de-dup here ? + this.publicRsrcs.addAll(upgradeResourceSet.publicRsrcs); + this.privateRsrcs.addAll(upgradeResourceSet.privateRsrcs); + this.appRsrcs.addAll(upgradeResourceSet.appRsrcs); + // TODO : END + + this.resourcesToBeUploaded.putAll(upgradeResourceSet.resourcesToBeUploaded); + this.resourcesUploadPolicies.putAll( + upgradeResourceSet.resourcesUploadPolicies); + } + public void resourceLocalizationFailed(LocalResourceRequest request) { pendingResources.remove(request); resourcesFailedToBeLocalized.add(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java index aa0d975..8a27849 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java @@ -269,6 +269,42 @@ public void testForcefulShutdownSignal() throws IOException, super.testForcefulShutdownSignal(); } + @Override + public void testContainerUpgradeSuccess() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerUpgradeSuccess"); + super.testContainerUpgradeSuccess(); + } + + @Override + public void testContainerUpgradeLocalizationFailure() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerUpgradeLocalizationFailure"); + super.testContainerUpgradeLocalizationFailure(); + } + + @Override + public void testContainerUpgradeProcessFailure() throws IOException, + InterruptedException, YarnException { + // Don't run the test if the binary is not available. + if (!shouldRunTest()) { + LOG.info("LCE binary path is not passed. Not running the test"); + return; + } + LOG.info("Running testContainerUpgradeProcessFailure"); + super.testContainerUpgradeProcessFailure(); + } + private boolean shouldRunTest() { return System .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index ec38501..d359c3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -123,7 +123,11 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException { conf) { public int getHttpPort() { return HTTP_PORT; - }; + } + @Override + public ContainerExecutor getContainerExecutor() { + return exec; + } }; protected ContainerExecutor exec; protected DeletionService delSrvc; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 5785e1f..e0e5f4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -25,6 +25,7 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; @@ -33,6 +34,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,6 +66,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -94,7 +98,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -366,6 +369,207 @@ public void testContainerLaunchAndStop() throws IOException, DefaultContainerExecutor.containerIsAlive(pid)); } + @Test + public void testContainerUpgradeSuccess() throws IOException, + InterruptedException, YarnException { + containerManager.start(); + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); + + String pid = prepareInitialContainer(cId, oldStartFile); + + File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); + + prepareContainerUpgrade(false, false, cId, newStartFile); + + // Assert that the First process is not alive anymore + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + + BufferedReader reader = + new BufferedReader(new FileReader(newStartFile)); + Assert.assertEquals("Upgrade World!", reader.readLine()); + + // Get the pid of the process + String newPid = reader.readLine().trim(); + Assert.assertNotEquals("Old and New Pids must be different !", pid, newPid); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + // Assert that the New process is alive + Assert.assertTrue("New Process is not alive!", + DefaultContainerExecutor.containerIsAlive(newPid)); + } + + @Test + public void testContainerUpgradeLocalizationFailure() throws IOException, + InterruptedException, YarnException { + containerManager.start(); + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); + + String pid = prepareInitialContainer(cId, oldStartFile); + + File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); + + prepareContainerUpgrade(true, true, cId, newStartFile); + + // Assert that the First process is STILL alive + // since upgrade was terminated.. + Assert.assertTrue("Process is NOT alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + } + + @Test + public void testContainerUpgradeProcessFailure() throws IOException, + InterruptedException, YarnException { + containerManager.start(); + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile(); + + String pid = prepareInitialContainer(cId, oldStartFile); + + File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile(); + + prepareContainerUpgrade(true, false, cId, newStartFile); + + // Assert that the First process is not alive anymore + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + } + + private void prepareContainerUpgrade(boolean failCmd, boolean failLoc, + ContainerId cId, File startFile) + throws FileNotFoundException, YarnException, InterruptedException { + // Re-write scriptfile and processStartFile + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + + writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd); + + ContainerLaunchContext containerLaunchContext = + prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc); + + containerManager.upgradeContainer(cId, containerLaunchContext); + int timeoutSecs = 0; + int maxTimeToWait = failLoc ? 10 : 20; + // Wait for new processStartfile to be creases + while (!startFile.exists() && timeoutSecs++ < maxTimeToWait) { + Thread.sleep(1000); + LOG.info("Waiting for New process start-file to be created"); + } + } + + private String prepareInitialContainer(ContainerId cId, File startFile) + throws IOException, YarnException, InterruptedException { + File scriptFileOld = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriterOld = new PrintWriter(scriptFileOld); + + writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false); + + ContainerLaunchContext containerLaunchContext = + prepareContainerLaunchContext(scriptFileOld, "dest_file", false); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + createContainerToken(cId, + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, + context.getContainerTokenSecretManager())); + List list = new ArrayList<>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!startFile.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + startFile.exists()); + + // Now verify the contents of the file + BufferedReader reader = + new BufferedReader(new FileReader(startFile)); + Assert.assertEquals("Hello World!", reader.readLine()); + // Get the pid of the process + String pid = reader.readLine().trim(); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + // Assert that the process is alive + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + // Once more + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + return pid; + } + + private void writeScriptFile(PrintWriter fileWriter, String startLine, + File processStartFile, ContainerId cId, boolean isFailure) { + if (Shell.WINDOWS) { + fileWriter.println("@echo " + startLine + "> " + processStartFile); + fileWriter.println("@echo " + cId + ">> " + processStartFile); + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); // So that start file is readable by test + if (isFailure) { + // Echo PID and throw some error code + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexit 111"); + } else { + fileWriter.write("\necho " + startLine + " > " + processStartFile); + fileWriter.write("\necho $$ >> " + processStartFile); + fileWriter.write("\nexec sleep 100"); + } + } + fileWriter.close(); + } + + private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile, + String destFName, boolean putBadFile) { + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resourceAlpha = null; + if (putBadFile) { + File fileToDelete = new File(tmpDir, "fileToDelete") + .getAbsoluteFile(); + resourceAlpha = + URL.fromPath(localFS + .makeQualified(new Path(fileToDelete.getAbsolutePath()))); + fileToDelete.delete(); + } else { + resourceAlpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + } + LocalResource rsrcAlpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrcAlpha.setResource(resourceAlpha); + rsrcAlpha.setSize(-1); + rsrcAlpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrcAlpha.setType(LocalResourceType.FILE); + rsrcAlpha.setTimestamp(scriptFile.lastModified()); + Map localResources = new HashMap<>(); + localResources.put(destFName, rsrcAlpha); + containerLaunchContext.setLocalResources(localResources); + + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, + new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0); + containerLaunchContext.setContainerRetryContext(containerRetryContext); + List commands = Arrays.asList( + Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + return containerLaunchContext; + } + protected void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index c176556..72d7cb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -190,4 +190,9 @@ public Priority getPriority() { public void setIpAndHost(String[] ipAndHost) { } + + @Override + public boolean isReInitializing() { + return false; + } }