diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 0b599a8..1558c21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -1264,8 +1264,6 @@ private void recordContainerLogDir(ContainerId containerId, private void recordContainerWorkDir(ContainerId containerId, String workDir) throws IOException{ container.setWorkDir(workDir); - if (container.isRetryContextSet()) { - context.getNMStateStore().storeContainerWorkDir(containerId, workDir); - } + context.getNMStateStore().storeContainerWorkDir(containerId, workDir); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 2991c0c..af822c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -47,11 +48,15 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -75,6 +80,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -448,4 +455,65 @@ public static ContainerId createContainerId(int cId, int aId) { ContainerId.newContainerId(appAttemptId, cId); return containerId; } + + protected void checkResourceLocalized(ContainerId containerId, + String symLink) { + String appId = + containerId.getApplicationAttemptId().getApplicationId().toString(); + File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); + File userDir = new File(userCacheDir, user); + File appCache = new File(userDir, ContainerLocalizer.APPCACHE); + // localDir/usercache/nobody/appcache/application_0_0000 + File appDir = new File(appCache, appId); + // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000 + File containerDir = new File(appDir, containerId.toString()); + // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1 + File targetFile = new File(containerDir, symLink); + + File sysDir = + new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR); + // localDir/nmPrivate/application_0_0000 + File appSysDir = new File(sysDir, appId); + // localDir/nmPrivate/application_0_0000/container_0_0000_01_000000 + File containerSysDir = new File(appSysDir, containerId.toString()); + + Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!", + appDir.exists()); + Assert.assertTrue( + "AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!", + appSysDir.exists()); + Assert.assertTrue( + "containerDir " + containerDir.getAbsolutePath() + " doesn't exist !", + containerDir.exists()); + Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath() + + " doesn't exist !", containerDir.exists()); + Assert.assertTrue( + "targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!", + targetFile.exists()); + } + + protected Map setupLocalResources(String fileName, + String symLink) throws Exception { + // ////// Create the resources for the container + File dir = new File(tmpDir, "dir"); + dir.mkdirs(); + File file = new File(dir, fileName); + PrintWriter fileWriter = new PrintWriter(file); + fileWriter.write("Hello World!"); + fileWriter.close(); + + URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext() + .makeQualified(new Path(file.getAbsolutePath()))); + LocalResource resource = + recordFactory.newRecordInstance(LocalResource.class); + resource.setResource(resourceURL); + resource.setSize(-1); + resource.setVisibility(LocalResourceVisibility.APPLICATION); + resource.setType(LocalResourceType.FILE); + resource.setTimestamp(file.lastModified()); + Map localResources = + new HashMap(); + localResources.put(symLink, resource); + return localResources; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 60df7cb8..31b10da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -888,31 +888,6 @@ public void testContainerLaunchAndExitFailure() throws IOException, testContainerLaunchAndExit(exitCode); } - private Map setupLocalResources(String fileName, - String symLink) throws Exception { - // ////// Create the resources for the container - File dir = new File(tmpDir, "dir"); - dir.mkdirs(); - File file = new File(dir, fileName); - PrintWriter fileWriter = new PrintWriter(file); - fileWriter.write("Hello World!"); - fileWriter.close(); - - URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext() - .makeQualified(new Path(file.getAbsolutePath()))); - LocalResource resource = - recordFactory.newRecordInstance(LocalResource.class); - resource.setResource(resourceURL); - resource.setSize(-1); - resource.setVisibility(LocalResourceVisibility.APPLICATION); - resource.setType(LocalResourceType.FILE); - resource.setTimestamp(file.lastModified()); - Map localResources = - new HashMap(); - localResources.put(symLink, resource); - return localResources; - } - // Start the container // While the container is running, localize new resources. // Verify the symlink is created properly @@ -984,41 +959,6 @@ public Boolean get() { } } - private void checkResourceLocalized(ContainerId containerId, String symLink) { - String appId = - containerId.getApplicationAttemptId().getApplicationId().toString(); - File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE); - File userDir = new File(userCacheDir, user); - File appCache = new File(userDir, ContainerLocalizer.APPCACHE); - // localDir/usercache/nobody/appcache/application_0_0000 - File appDir = new File(appCache, appId); - // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000 - File containerDir = new File(appDir, containerId.toString()); - // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1 - File targetFile = new File(containerDir, symLink); - - File sysDir = - new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR); - // localDir/nmPrivate/application_0_0000 - File appSysDir = new File(sysDir, appId); - // localDir/nmPrivate/application_0_0000/container_0_0000_01_000000 - File containerSysDir = new File(appSysDir, containerId.toString()); - - Assert.assertTrue("AppDir " + appDir.getAbsolutePath() + " doesn't exist!!", - appDir.exists()); - Assert.assertTrue( - "AppSysDir " + appSysDir.getAbsolutePath() + " doesn't exist!!", - appSysDir.exists()); - Assert.assertTrue( - "containerDir " + containerDir.getAbsolutePath() + " doesn't exist !", - containerDir.exists()); - Assert.assertTrue("containerSysDir " + containerSysDir.getAbsolutePath() - + " doesn't exist !", containerDir.exists()); - Assert.assertTrue( - "targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!", - targetFile.exists()); - } - @Test public void testLocalFilesCleanup() throws InterruptedException, IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 633bb6d..a7f7a3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -48,10 +49,13 @@ import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -92,6 +96,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; @@ -470,6 +475,138 @@ public void testContainerResizeRecovery() throws Exception { } @Test + public void testContainerResourceLocalizationRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ((NodeManager.NMContext)context).setContainerExecutor(exec); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + final ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map containerEnv = Collections.emptyMap(); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = Collections.emptyMap(); + + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 1000000"); + } + fileWriter.close(); + FileContext localFS = FileContext.getLocalFSFileContext(); + URL resource_alpha = + URL.fromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = new HashMap<>(); + localResources.put(destinationFile, rsrc_alpha); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + containerTokens, acls); + StartContainersResponse startResponse = startContainer( + context, cm, cid, clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + // localize resource when container is running + localizeContainerResource(context, cm, cid, "newResourceFile1", "symLink1"); + this.user = cid.getApplicationAttemptId().toString(); + // Verify resource is localized and symlink is created. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + checkResourceLocalized(cid, "symLink1"); + return true; + } catch (Throwable e) { + return false; + } + } + }, 500, 20000); + + // restart and localize a new resource + cm.stop(); + context = createContext(conf, stateStore); + ((NodeManager.NMContext)context).setContainerExecutor(exec); + cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + Container container = context.getContainers().get(cid); + assertEquals(container.getContainerState(), + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + localizeContainerResource(context, cm, cid, "newResourceFile2", "symLink2"); + // Verify resource is localized and symlink is created. + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + try { + checkResourceLocalized(cid, "symLink1"); + checkResourceLocalized(cid, "symLink2"); + return true; + } catch (Throwable e) { + return false; + } + } + }, 500, 20000); + cm.stop(); + } + + private ResourceLocalizationResponse localizeContainerResource( + Context context, final ContainerManagerImpl cm, final ContainerId cid, + final String resource, final String symLink) throws Exception { + Map localResource = + setupLocalResources(resource, symLink); + ResourceLocalizationRequest request = + ResourceLocalizationRequest.newInstance(cid, localResource); + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ResourceLocalizationResponse run() throws Exception { + return cm.localize(request); + } + }); + } + + @Test public void testContainerCleanupOnShutdown() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationAttemptId attemptId =