diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 9a0549d..59b69ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -89,7 +89,7 @@ public DefaultContainerExecutor() { } protected void copyFile(Path src, Path dst, String owner) throws IOException { - lfs.util().copy(src, dst); + lfs.util().copy(src, dst, false, true); } protected void setScriptExecutable(Path script, String owner) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 52d8566..b3a5caa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -110,6 +110,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerUpgradeEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent; @@ -1535,10 +1538,13 @@ public ResourceLocalizationResponse localize( } if (!container.getContainerState() .equals(org.apache.hadoop.yarn.server.nodemanager. - containermanager.container.ContainerState.RUNNING)) { + containermanager.container.ContainerState.RUNNING) + || container.isUpgrading()) { throw new YarnException( - containerId + " is at " + container.getContainerState() - + " state. Not able to localize new resources."); + containerId + " is at [" + container.getContainerState() + + "] state or container might be upgrading ? [" + + container.isUpgrading() + "]." + + " Not able to localize new resources."); } try { @@ -1556,6 +1562,43 @@ public ResourceLocalizationResponse localize( return ResourceLocalizationResponse.newInstance(); } + public void upgradeContainer(ContainerId containerId, + ContainerLaunchContext upgradeLaunchContext) throws YarnException { + Container container = context.getContainers().get(containerId); + if (container == null) { + throw new YarnException("Specified " + containerId + " does not exist!"); + } + if (!container.getContainerState() + .equals(org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING) + || container.isUpgrading()) { + throw new YarnException( + containerId + " is at [" + container.getContainerState() + + "] state or container might already be upgrading ? [" + + container.isUpgrading() + "]."); + } + dispatcher.getEventHandler().handle( + new ContainerUpgradeEvent(containerId, upgradeLaunchContext)); + } + + public void commitLastUpgrade(ContainerId containerId) throws YarnException { + Container container = context.getContainers().get(containerId); + if (container == null) { + throw new YarnException("Specified " + containerId + " does not exist!"); + } + if (!container.getContainerState() + .equals(org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING) + || container.isUpgrading()) { + throw new YarnException( + containerId + " is at [" + container.getContainerState() + + "] state or container might have finished upgrading [" + + container.isUpgrading() + "]."); + } + dispatcher.getEventHandler().handle( + new ContainerEvent(containerId, ContainerEventType.COMMIT_UPGRADE)); + } + @SuppressWarnings("unchecked") private void internalSignalToContainer(SignalContainerRequest request, String sentBy) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index c4cea18..9850797 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -76,4 +76,6 @@ Priority getPriority(); ResourceSet getResourceSet(); + + boolean isUpgrading(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java index 5622f8c..51f3fd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java @@ -25,6 +25,8 @@ KILL_CONTAINER, UPDATE_DIAGNOSTICS_MSG, CONTAINER_DONE, + UPGRADE_CONTAINER, + COMMIT_UPGRADE, // DownloadManager CONTAINER_INITED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index ce9e581..096c6b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -90,13 +90,23 @@ public class ContainerImpl implements Container { + private static class UpgradeContext { + private ResourceSet resourceSet = new ResourceSet(); + private volatile ContainerLaunchContext oldLaunchContext = null; + private final ContainerLaunchContext launchContext; + + private UpgradeContext(ContainerLaunchContext launchContext) { + this.launchContext = launchContext; + } + } + private final Lock readLock; private final Lock writeLock; private final Dispatcher dispatcher; private final NMStateStoreService stateStore; private final Credentials credentials; private final NodeManagerMetrics metrics; - private final ContainerLaunchContext launchContext; + private volatile ContainerLaunchContext launchContext; private final ContainerTokenIdentifier containerTokenIdentifier; private final ContainerId containerId; private volatile Resource resource; @@ -117,6 +127,8 @@ private String logDir; private String host; private String ips; + private volatile UpgradeContext upgradeContext; + private volatile UpgradeContext rollabackContext; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -296,10 +308,17 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.RELAUNCHING, + ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, new RetryFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.UPGRADE_CONTAINER, + new UpgradeContainerTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.COMMIT_UPGRADE, + new CommitUpgradeTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.RESOURCE_LOCALIZED, new ResourceLocalizedWhileRunningTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, @@ -310,7 +329,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.RUNNING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillTransition()) - .addTransition(ContainerState.RUNNING, ContainerState.EXITED_WITH_FAILURE, + .addTransition(ContainerState.RUNNING, + EnumSet.of(ContainerState.EXITED_WITH_FAILURE, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) @@ -822,6 +843,50 @@ public ContainerState transition(ContainerImpl container, } /** + * Transition to start the Upgrade process. + */ + static class UpgradeContainerTransition extends ContainerTransition { + + public void transition(ContainerImpl container, + ContainerEvent event) { + ContainerUpgradeEvent upgradeEvent = (ContainerUpgradeEvent) event; + container.upgradeContext = new UpgradeContext( + upgradeEvent.getUpgradeLaunchContext()); + try { + Map> + req = container.upgradeContext.resourceSet.addResources( + upgradeEvent.getUpgradeLaunchContext().getLocalResources()); + container.dispatcher.getEventHandler().handle( + new ContainerLocalizationRequestEvent(container, req)); + } catch (Exception e) { + LOG.error("Container [" + container.getContainerId() + "] upgrade" + + " failure..", e); + container.addDiagnostics("Error upgrading due to" + + "[" + e.getMessage() + "]"); + } + + } + } + + /** + * Transition to start the Upgrade process. + */ + static class CommitUpgradeTransition extends ContainerTransition { + + public void transition(ContainerImpl container, + ContainerEvent event) { + if (container.rollabackContext != null) { + LOG.info("Container Upgrade for container [" + + container.getContainerId() + "] has been committed !!"); + container.rollabackContext = null; + } else { + LOG.info("No rollback Context found for container [" + + container.getContainerId() + "].."); + } + } + } + + /** * Resource is localized while the container is running - create symlinks */ static class ResourceLocalizedWhileRunningTransition @@ -831,8 +896,10 @@ public ContainerState transition(ContainerImpl container, public void transition(ContainerImpl container, ContainerEvent event) { ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event; - List links = container.resourceSet - .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation()); + ResourceSet rSet = container.isUpgrading() ? + container.upgradeContext.resourceSet : container.resourceSet; + List links = rSet.resourceLocalized(rsrcEvent.getResource(), + rsrcEvent.getLocation()); // creating symlinks. for (String link : links) { try { @@ -852,6 +919,16 @@ public void transition(ContainerImpl container, ContainerEvent event) { LOG.error(message, e); } } + if (container.isUpgrading()) { + // Check if all ResourceLocalization has completed + if (container.upgradeContext.resourceSet.getPendingResources() + .isEmpty()) { + // Kill the current container. + container.dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(container, + ContainersLauncherEventType.CLEANUP_CONTAINER)); + } + } } } @@ -883,6 +960,17 @@ public void transition(ContainerImpl container, ContainerEvent event) { container.metrics.runningContainer(); container.wasLaunched = true; + if (container.isUpgrading()) { + // Container has finished upgrading.. + // Set the rollbackContext.. If container is Retried/Relaunchced + // AFTER upgrade, it will automatically rollback, unless the + // upgrade is committed. + synchronized (container) { + container.rollabackContext = container.upgradeContext; + container.upgradeContext = null; + } + } + if (container.recoveredAsKilled) { LOG.info("Killing " + container.containerId + " due to recovered as killed"); @@ -959,7 +1047,7 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** - * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon * CONTAINER_EXITED_WITH_FAILURE state. **/ @SuppressWarnings("unchecked") // dispatcher not typed @@ -982,43 +1070,55 @@ public ContainerState transition(final ContainerImpl container, container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); } - if (container.shouldRetry(container.exitCode)) { - if (container.remainingRetryAttempts > 0) { - container.remainingRetryAttempts--; - try { - container.stateStore.storeContainerRemainingRetryAttempts( - container.getContainerId(), container.remainingRetryAttempts); - } catch (IOException e) { - LOG.warn( - "Unable to update remainingRetryAttempts in state store for " - + container.getContainerId(), e); - } - } - LOG.info("Relaunching Container " + container.getContainerId() - + ". Remaining retry attempts(after relaunch) : " - + container.remainingRetryAttempts - + ". Interval between retries is " - + container.containerRetryContext.getRetryInterval() + "ms"); - container.wasLaunched = false; - container.metrics.endRunningContainer(); - if (container.containerRetryContext.getRetryInterval() == 0) { - container.sendRelaunchEvent(); + // If Container died during an upgrade, dont bother retrying. + if (container.shouldRetry(container.exitCode) && + !container.isUpgrading()) { + if (container.rollabackContext != null) { + container.launchContext = container.rollabackContext.oldLaunchContext; + container.rollabackContext = null; + container.addDiagnostics("Container upgrade will be Rolled Back."); + LOG.info("Rolling back Container upgrade for [" + + container.getContainerId() + "] !!"); + container.sendLaunchEvent(); + return ContainerState.LOCALIZED; } else { - // wait for some time, then send launch event - new Thread() { - @Override - public void run() { - try { - Thread.sleep( - container.containerRetryContext.getRetryInterval()); - container.sendRelaunchEvent(); - } catch (InterruptedException e) { - return; - } + if (container.remainingRetryAttempts > 0) { + container.remainingRetryAttempts--; + try { + container.stateStore.storeContainerRemainingRetryAttempts( + container.getContainerId(), container.remainingRetryAttempts); + } catch (IOException e) { + LOG.warn( + "Unable to update remainingRetryAttempts in state store for " + + container.getContainerId(), e); } - }.start(); + } + LOG.info("Relaunching Container " + container.getContainerId() + + ". Remaining retry attempts(after relaunch) : " + + container.remainingRetryAttempts + + ". Interval between retries is " + + container.containerRetryContext.getRetryInterval() + "ms"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + if (container.containerRetryContext.getRetryInterval() == 0) { + container.sendRelaunchEvent(); + } else { + // wait for some time, then send launch event + new Thread() { + @Override + public void run() { + try { + Thread.sleep( + container.containerRetryContext.getRetryInterval()); + container.sendRelaunchEvent(); + } catch (InterruptedException e) { + return; + } + } + }.start(); + } + return ContainerState.RELAUNCHING; } - return ContainerState.RELAUNCHING; } else { new ExitedWithFailureTransition(true).transition(container, event); return ContainerState.EXITED_WITH_FAILURE; @@ -1053,17 +1153,32 @@ public boolean shouldRetry(int errorCode) { } /** - * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST + * Transition to EXITED_WITH_FAILURE or RELAUCHING + * upon receiving KILLED_ON_REQUEST */ - static class KilledExternallyTransition extends ExitedWithFailureTransition { - KilledExternallyTransition() { - super(true); - } + static class KilledExternallyTransition implements + MultipleArcTransition { @Override - public void transition(ContainerImpl container, ContainerEvent event) { - super.transition(container, event); - container.addDiagnostics("Killed by external signal\n"); + public ContainerState transition(ContainerImpl container, + ContainerEvent event) { + if (!container.isUpgrading()) { + new ExitedWithFailureTransition(true).transition(container, event); + container.addDiagnostics("Killed by external signal\n"); + return ContainerState.EXITED_WITH_FAILURE; + } else { + LOG.info("Relaunching Container [" + container.getContainerId() + + "] for upgrade !!"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + + container.upgradeContext.oldLaunchContext = container.launchContext; + container.launchContext = container.upgradeContext.launchContext; + container.resourceSet.merge(container.upgradeContext.resourceSet); + + container.sendLaunchEvent(); + return ContainerState.LOCALIZED; + } } } @@ -1385,4 +1500,9 @@ ContainerRetryContext getContainerRetryContext() { public Priority getPriority() { return containerTokenIdentifier.getPriority(); } + + @Override + public boolean isUpgrading() { + return upgradeContext != null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index 6b96204..93484ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; public enum ContainerState { - NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING, - EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, + NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, UPGRADING, + RELAUNCHING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerUpgradeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerUpgradeEvent.java new file mode 100644 index 0000000..96bd05a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerUpgradeEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; + +public class ContainerUpgradeEvent extends ContainerEvent { + + private final ContainerLaunchContext upgradeLaunchContext; + + public ContainerUpgradeEvent(ContainerId cID, + ContainerLaunchContext upgradeContext) { + super(cID, ContainerEventType.UPGRADE_CONTAINER); + this.upgradeLaunchContext = upgradeContext; + } + + public ContainerLaunchContext getUpgradeLaunchContext() { + return upgradeLaunchContext; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java index a41ee20..5f4ebd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceSet.java @@ -132,6 +132,20 @@ } } + public void merge(ResourceSet upgradeResourceSet) { + this.localizedResources.putAll(upgradeResourceSet.localizedResources); + + // TODO : START : Should we de-dup here ? + this.publicRsrcs.addAll(upgradeResourceSet.publicRsrcs); + this.privateRsrcs.addAll(upgradeResourceSet.privateRsrcs); + this.appRsrcs.addAll(upgradeResourceSet.appRsrcs); + // TODO : END + + this.resourcesToBeUploaded.putAll(upgradeResourceSet.resourcesToBeUploaded); + this.resourcesUploadPolicies.putAll( + upgradeResourceSet.resourcesUploadPolicies); + } + public void resourceLocalizationFailed(LocalResourceRequest request) { pendingResources.remove(request); resourcesFailedToBeLocalized.add(request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 5785e1f..f8c1793 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -64,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -366,6 +369,182 @@ public void testContainerLaunchAndStop() throws IOException, DefaultContainerExecutor.containerIsAlive(pid)); } + @Test + public void testContainerUpgradeSuccess() throws IOException, + InterruptedException, YarnException { + testContainerUpgrade(false); + } + + @Test + public void testContainerUpgradeWithRollBack() throws IOException, + InterruptedException, YarnException { + testContainerUpgrade(true); + } + + private void testContainerUpgrade(boolean rollback) throws IOException, + InterruptedException, YarnException { + containerManager.start(); + + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + File processStartFileOld = + new File(tmpDir, "start_file.txt").getAbsoluteFile(); + + // ////// Construct the Container-id + ContainerId cId = createContainerId(0); + + if (Shell.WINDOWS) { + fileWriter.println("@echo Hello World!> " + processStartFileOld); + fileWriter.println("@echo " + cId + ">> " + processStartFileOld); + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); // So that start file is readable by the + // test + fileWriter.write("\necho Hello World! > " + processStartFileOld); + fileWriter.write("\necho $$ >> " + processStartFileOld); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + URL resource_alpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, + new HashSet<>(Arrays.asList(Integer.valueOf(111))), 4, 0); + containerLaunchContext.setContainerRetryContext(containerRetryContext); + List commands = Arrays.asList(Shell.getRunScriptCommand + (scriptFile)); + containerLaunchContext.setCommands(commands); + + StartContainerRequest scRequest = + StartContainerRequest.newInstance(containerLaunchContext, + createContainerToken(cId, + DUMMY_RM_IDENTIFIER, context.getNodeId(), user, + context.getContainerTokenSecretManager())); + List list = new ArrayList<>(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + int timeoutSecs = 0; + while (!processStartFileOld.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + Assert.assertTrue("ProcessStartFile doesn't exist!", + processStartFileOld.exists()); + + // Now verify the contents of the file + BufferedReader reader = + new BufferedReader(new FileReader(processStartFileOld)); + Assert.assertEquals("Hello World!", reader.readLine()); + // Get the pid of the process + String pid = reader.readLine().trim(); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + // Now test the stop functionality. + + // Assert that the process is alive + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + // Once more + Assert.assertTrue("Process is not alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + + // Re-write scriptfile and processStartFile + scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new"); + fileWriter = new PrintWriter(scriptFile); + File processStartFileNew = new File(tmpDir, "start_file_new.txt") + .getAbsoluteFile(); + + if (Shell.WINDOWS) { + fileWriter.println("@echo Upgrade World!> " + processStartFileNew); + fileWriter.println("@echo " + cId + ">> " + processStartFileNew); + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); // So that start file is readable by the + // test + if (rollback) { + // Throw some error code and rollback + fileWriter.write("\necho $$ >> " + processStartFileNew); + fileWriter.write("\nexit 111"); + } else { + fileWriter.write("\necho Upgrade World! > " + processStartFileNew); + fileWriter.write("\necho $$ >> " + processStartFileNew); + fileWriter.write("\nexec sleep 100"); + } + } + fileWriter.close(); + + containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + resource_alpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + localResources = new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + + containerManager.upgradeContainer(cId, containerLaunchContext); + timeoutSecs = 0; + // Wait for new processStartfile to be creases + while (!processStartFileNew.exists() && timeoutSecs++ < 20) { + Thread.sleep(1000); + LOG.info("Waiting for process start-file to be created"); + } + + // Assert that the First process is not alive anymore + Assert.assertFalse("Process is still alive!", + DefaultContainerExecutor.containerIsAlive(pid)); + + // Now verify the contents of the file has changed + if (rollback) { + reader = new BufferedReader(new FileReader(processStartFileOld)); + Assert.assertEquals("Hello World!", reader.readLine()); + } else { + reader = new BufferedReader(new FileReader(processStartFileNew)); + Assert.assertEquals("Upgrade World!", reader.readLine()); + containerManager.commitLastUpgrade(cId); + } + // Get the pid of the process + String newPid = reader.readLine().trim(); + Assert.assertNotEquals("Old and New Pids must be different !", pid, newPid); + // No more lines + Assert.assertEquals(null, reader.readLine()); + + // Assert that the process is alive + Assert.assertTrue("New Process is not alive!", + DefaultContainerExecutor.containerIsAlive(newPid)); + } + protected void testContainerLaunchAndExit(int exitCode) throws IOException, InterruptedException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index c176556..e076e07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -190,4 +190,9 @@ public Priority getPriority() { public void setIpAndHost(String[] ipAndHost) { } + + @Override + public boolean isUpgrading() { + return false; + } }