diff --git hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index d878d17..2323650 100644 --- hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -161,7 +161,7 @@ @InterfaceAudience.Public @InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */ -public final class FileContext { +public class FileContext { public static final Log LOG = LogFactory.getLog(FileContext.class); /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index a7af1c5..4ce3804 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -96,12 +96,28 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, createAppLogDirs(appId, logDirs); // TODO: Why pick first app dir. The same in LCE why not random? - Path appStorageDir = getFirstApplicationDir(localDirs, user, appId); + Path appStorageDir = null; String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); - Path tokenDst = new Path(appStorageDir, tokenFn); - lfs.util().copy(nmPrivateContainerTokensPath, tokenDst); - LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); + for (int i = 0; i < localDirs.size(); ++i) { + Path curBase = getApplicationDir(new Path(localDirs.get(i)), + user, appId); + Path tokenDst = new Path(curBase, tokenFn); + try { + lfs.util().copy(nmPrivateContainerTokensPath, tokenDst); + LOG.info("Copying from " + nmPrivateContainerTokensPath + + " to " + tokenDst); + appStorageDir = curBase; + break; + } catch (IOException e) { + LOG.info("Failed to copy " + nmPrivateContainerTokensPath + + " to " + tokenDst); + } + } + if (appStorageDir == null) { + throw new IOException("Not able to find a working directory " + + "in any of the configured local directories to store tokens"); + } lfs.setWorkingDirectory(appStorageDir); LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory()); // TODO: DO it over RPC for maintaining similarity? @@ -449,11 +465,6 @@ public void deleteAsUser(String user, Path subDir, Path... baseDirs) * $logdir/$user/$appId */ static final short LOGDIR_PERM = (short)0710; - private Path getFirstApplicationDir(List localDirs, String user, - String appId) { - return getApplicationDir(new Path(localDirs.get(0)), user, appId); - } - private Path getApplicationDir(Path base, String user, String appId) { return new Path(getAppcacheDir(base, user), appId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java index 9c86c71..75cab95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.doAnswer; import static org.junit.Assert.assertTrue; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; @@ -41,14 +42,6 @@ import java.util.List; import java.util.Random; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; @@ -57,20 +50,29 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.After; -import org.junit.Assert; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -296,6 +298,89 @@ public Object answer(InvocationOnMock invocationOnMock) } } + @Test(timeout = 30000) + public void testStartLocalizer() + throws IOException, InterruptedException { + InetSocketAddress localizationServerAddress; + final Path firstDir = new Path(BASE_TMP_PATH, "localDir1"); + List localDirs = new ArrayList(); + final Path secondDir = new Path(BASE_TMP_PATH, "localDir2"); + List logDirs = new ArrayList(); + final Path logDir = new Path(BASE_TMP_PATH, "logDir"); + final Path tokenDir = new Path(BASE_TMP_PATH, "tokenDir"); + FsPermission perms = new FsPermission((short)0770); + + Configuration conf = new Configuration(); + localizationServerAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS, + YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT); + + final FileContext mockLfs = spy(FileContext.getLocalFSFileContext(conf)); + final FileContext.Util mockUtil = spy(mockLfs.util()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + return mockUtil; + } + }).when(mockLfs).util(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + Path dest = (Path) invocationOnMock.getArguments()[1]; + if (dest.toString().contains(firstDir.toString())) { + // throw an Exception when copy token to the first local dir + // to simulate no space on the first drive + throw new IOException("No space on this drive " + + dest.toString()); + } else { + // copy token to the second local dir + DataOutputStream tokenOut = null; + try { + Credentials credentials = new Credentials(); + tokenOut = mockLfs.create(dest, + EnumSet.of(CREATE, OVERWRITE)); + credentials.writeTokenStorageToStream(tokenOut); + } finally { + if (tokenOut != null) { + tokenOut.close(); + } + } + } + return null; + } + }).when(mockUtil).copy(any(Path.class), any(Path.class)); + + DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor( + mockLfs)); + mockExec.setConf(conf); + localDirs.add(mockLfs.makeQualified(firstDir).toString()); + localDirs.add(mockLfs.makeQualified(secondDir).toString()); + logDirs.add(mockLfs.makeQualified(logDir).toString()); + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, + localDirs.toArray(new String[localDirs.size()])); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); + mockLfs.mkdir(tokenDir, perms, true); + Path nmPrivateCTokensPath = new Path(tokenDir, "test.tokens"); + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String locId = "LOC_ID"; + try { + mockExec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, + appSubmitter, appId, locId, localDirs, logDirs); + } catch (IOException e) { + Assert.assertTrue("StartLocalizer failed to copy token file " + e, + false); + } finally { + mockExec.deleteAsUser(appSubmitter, firstDir); + mockExec.deleteAsUser(appSubmitter, secondDir); + mockExec.deleteAsUser(appSubmitter, logDir); + deleteTmpFiles(); + } + } // @Test // public void testInit() throws IOException, InterruptedException { // Configuration conf = new Configuration();