diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 13ad9ac..16512ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -64,6 +64,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +/** + * The {@code DefaultContainerExecuter} class offers generic container + * execution services. Process execution is handled in a platform-independent + * way via {@link ProcessBuilder}. + */ public class DefaultContainerExecutor extends ContainerExecutor { private static final Log LOG = LogFactory @@ -71,32 +76,63 @@ private static final int WIN_MAX_PATH = 260; - protected final FileContext lfs; + /** + * A {@link FileContext} for the local file system. + */ + protected final FileContext LFS; private String logDirPermissions = null; + /** + * Default constructor to allow for creation via reflection. + */ public DefaultContainerExecutor() { try { - this.lfs = FileContext.getLocalFSFileContext(); + this.LFS = FileContext.getLocalFSFileContext(); } catch (UnsupportedFileSystemException e) { throw new RuntimeException(e); } } + /** + * Create an instance with a given {@link FileContext}. + * + * @param lfs the given {@link FileContext} + */ DefaultContainerExecutor(FileContext lfs) { - this.lfs = lfs; + this.LFS = lfs; } + /** + * Copy a file using the {@link LFS} {@link FileContext}. + * + * @param src the file to copy + * @param dst where to copy the file + * @param owner the owner of the new copy. Ignored except in secure Windows + * clusters + * @throws IOException when the copy fails + * @see WindowsSecureContainerExecutor + */ protected void copyFile(Path src, Path dst, String owner) throws IOException { - lfs.util().copy(src, dst); + LFS.util().copy(src, dst); } - protected void setScriptExecutable(Path script, String owner) throws IOException { - lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + /** + * Make a file executable using the {@link LFS} {@link FileContext}. + * + * @param script the path make executable + * @param owner the new owner for the file. Ignored except in secure Windows + * clusters + * @throws IOException when the change mode operation fails + * @see WindowsSecureContainerExecutor + */ + protected void setScriptExecutable(Path script, String owner) + throws IOException { + LFS.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } @Override - public void init() throws IOException { + public void init() { // nothing to do or verify here } @@ -121,15 +157,17 @@ public void startLocalizer(LocalizerStartContext ctx) // randomly choose the local directory Path appStorageDir = getWorkingDir(localDirs, user, appId); - String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); + String tokenFn = + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId); Path tokenDst = new Path(appStorageDir, tokenFn); copyFile(nmPrivateContainerTokensPath, tokenDst, user); - LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst); + LOG.info("Copying from " + nmPrivateContainerTokensPath + + " to " + tokenDst); - FileContext localizerFc = FileContext.getFileContext( - lfs.getDefaultFileSystem(), getConf()); - localizerFc.setUMask(lfs.getUMask()); + FileContext localizerFc = + FileContext.getFileContext(LFS.getDefaultFileSystem(), getConf()); + localizerFc.setUMask(LFS.getUMask()); localizerFc.setWorkingDirectory(appStorageDir); LOG.info("Localizer CWD set to " + appStorageDir + " = " + localizerFc.getWorkingDirectory()); @@ -257,15 +295,17 @@ public int launchContainer(ContainerStartContext ctx) throws IOException { StringBuilder builder = new StringBuilder(); builder.append("Exception from container-launch.\n"); - builder.append("Container id: " + containerId + "\n"); - builder.append("Exit code: " + exitCode + "\n"); + builder.append("Container id: ").append(containerId).append("\n"); + builder.append("Exit code: ").append(exitCode).append("\n"); if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) { - builder.append("Exception message: " + e.getMessage() + "\n"); + builder.append("Exception message: "); + builder.append(e.getMessage()).append("\n"); } - builder.append("Stack trace: " - + StringUtils.stringifyException(e) + "\n"); + builder.append("Stack trace: "); + builder.append(StringUtils.stringifyException(e)).append("\n"); if (!shExec.getOutput().isEmpty()) { - builder.append("Shell output: " + shExec.getOutput() + "\n"); + builder.append("Shell output: "); + builder.append(shExec.getOutput()).append("\n"); } String diagnostics = builder.toString(); logOutput(diagnostics); @@ -282,10 +322,24 @@ public int launchContainer(ContainerStartContext ctx) throws IOException { return 0; } + /** + * Create a new {@link ShellCommandExecutor} using the parameters. + * + * @param wrapperScriptPath the path to the script to execute + * @param containerIdStr the container ID + * @param user the application owner's username + * @param pidFile the path to the container's PID file + * @param resource this parameter controls memory and CPU limits. + * @param wordDir If not-null, specifies the directory which should be set + * as the current working directory for the command. If null, + * the current working directory is not modified. + * @param environment the container environment + * @return the new {@link ShellCommandExecutor} + * @see ShellCommandExecutor + */ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, String user, Path pidFile, Resource resource, - File wordDir, Map environment) - throws IOException { + File wordDir, Map environment) { String[] command = getRunCommand(wrapperScriptPath, containerIdStr, user, pidFile, this.getConf(), resource); @@ -299,6 +353,14 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, false); } + /** + * Create a {@link LocalWrapperScriptBuilder} for the given container ID + * and path that is appropriate to the current platform. + * + * @param containerIdStr the container ID + * @param containerWorkDir the container's working directory + * @return a new {@link LocalWrapperScriptBuilder} + */ protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( String containerIdStr, Path containerWorkDir) { return Shell.WINDOWS ? @@ -306,20 +368,39 @@ protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder( new UnixLocalWrapperScriptBuilder(containerWorkDir); } + /** + * This class is a utility to create a wrapper script that is platform + * appropriate. + */ protected abstract class LocalWrapperScriptBuilder { private final Path wrapperScriptPath; + /** + * Return the path for the wrapper script. + * + * @return the path for the wrapper script + */ public Path getWrapperScriptPath() { return wrapperScriptPath; } - public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException { + /** + * Write out the wrapper script for the container launch script. This method + * will create the script at the configured wrapper script path. + * + * @param launchDst the script to launch + * @param pidFile the file that will hold the PID + * @throws IOException if the wrapper script cannot be created + * @see #getWrapperScriptPath + */ + public void writeLocalWrapperScript(Path launchDst, Path pidFile) + throws IOException { DataOutputStream out = null; PrintStream pout = null; try { - out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); + out = LFS.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); pout = new PrintStream(out, false, "UTF-8"); writeLocalWrapperScript(launchDst, pidFile, pout); } finally { @@ -327,19 +408,40 @@ public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOExcep } } - protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile, - PrintStream pout); - + /** + * Write out the wrapper script for the container launch script. + * + * @param launchDst the script to launch + * @param pidFile the file that will hold the PID + * @param pout the stream to use to write out the wrapper script + */ + protected abstract void writeLocalWrapperScript(Path launchDst, + Path pidFile, PrintStream pout); + + /** + * Create an instance for the given container working directory. + * + * @param containerWorkDir the working directory for the container + */ protected LocalWrapperScriptBuilder(Path containerWorkDir) { this.wrapperScriptPath = new Path(containerWorkDir, Shell.appendScriptExtension("default_container_executor")); } } + /** + * This class is an instance of {@link LocalWrapperScriptBuilder} for + * non-Windows hosts. + */ private final class UnixLocalWrapperScriptBuilder extends LocalWrapperScriptBuilder { private final Path sessionScriptPath; + /** + * Create an instance for the given container path. + * + * @param containerWorkDir the container's working directory + */ public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { super(containerWorkDir); this.sessionScriptPath = new Path(containerWorkDir, @@ -372,7 +474,7 @@ private void writeSessionScript(Path launchDst, Path pidFile) DataOutputStream out = null; PrintStream pout = null; try { - out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + out = LFS.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); pout = new PrintStream(out, false, "UTF-8"); // We need to do a move as writing to a file is not atomic // Process reading a file being written to may get garbled data @@ -383,20 +485,30 @@ private void writeSessionScript(Path launchDst, Path pidFile) pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); String exec = Shell.isSetsidAvailable? "exec setsid" : "exec"; pout.println(exec + " /bin/bash \"" + - launchDst.toUri().getPath().toString() + "\""); + launchDst.toUri().getPath() + "\""); } finally { IOUtils.cleanup(LOG, pout, out); } - lfs.setPermission(sessionScriptPath, + LFS.setPermission(sessionScriptPath, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); } } + /** + * This class is an instance of {@link LocalWrapperScriptBuilder} for + * Windows hosts. + */ private final class WindowsLocalWrapperScriptBuilder extends LocalWrapperScriptBuilder { private final String containerIdStr; + /** + * Create an instance for the given container and working directory. + * + * @param containerIdStr the container ID + * @param containerWorkDir the container's working directory + */ public WindowsLocalWrapperScriptBuilder(String containerIdStr, Path containerWorkDir) { @@ -457,6 +569,7 @@ public boolean isContainerAlive(ContainerLivenessContext ctx) * * @param pid String pid * @return boolean true if the process is alive + * @throws IOException if the command to test process liveliness fails */ @VisibleForTesting public static boolean containerIsAlive(String pid) throws IOException { @@ -478,6 +591,7 @@ public static boolean containerIsAlive(String pid) throws IOException { * @param pid the pid of the process [group] to signal. * @param signal signal to send * (for logging). + * @throws IOException if the command to kill the process fails */ protected void killContainer(String pid, Signal signal) throws IOException { new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) @@ -492,7 +606,7 @@ public void deleteAsUser(DeletionAsUserContext ctx) if (baseDirs == null || baseDirs.size() == 0) { LOG.info("Deleting absolute path : " + subDir); - if (!lfs.delete(subDir, true)) { + if (!LFS.delete(subDir, true)) { //Maybe retry LOG.warn("delete returned false for path: [" + subDir + "]"); } @@ -502,7 +616,7 @@ public void deleteAsUser(DeletionAsUserContext ctx) Path del = subDir == null ? baseDir : new Path(baseDir, subDir); LOG.info("Deleting path : " + del); try { - if (!lfs.delete(del, true)) { + if (!LFS.delete(del, true)) { LOG.warn("delete returned false for path: [" + del + "]"); } } catch (FileNotFoundException e) { @@ -511,21 +625,29 @@ public void deleteAsUser(DeletionAsUserContext ctx) } } - /** Permissions for user dir. - * $local.dir/usercache/$user */ + /** + * Permissions for user dir. + * $local.dir/usercache/$user + */ static final short USER_PERM = (short)0750; - /** Permissions for user appcache dir. - * $local.dir/usercache/$user/appcache */ + /** + * Permissions for user appcache dir. + * $local.dir/usercache/$user/appcache + */ static final short APPCACHE_PERM = (short)0710; - /** Permissions for user filecache dir. - * $local.dir/usercache/$user/filecache */ + /** + * Permissions for user filecache dir. + * $local.dir/usercache/$user/filecache + */ static final short FILECACHE_PERM = (short)0710; - /** Permissions for user app dir. - * $local.dir/usercache/$user/appcache/$appId */ + /** + * Permissions for user app dir. + * $local.dir/usercache/$user/appcache/$appId + */ static final short APPDIR_PERM = (short)0710; private long getDiskFreeSpace(Path base) throws IOException { - return lfs.getFsStatus(base).getRemaining(); + return LFS.getFsStatus(base).getRemaining(); } private Path getApplicationDir(Path base, String user, String appId) { @@ -546,9 +668,20 @@ private Path getFileCacheDir(Path base, String user) { ContainerLocalizer.FILECACHE); } + /** + * Return a randomly chosen application directory from a list of local storage + * directories. The probability of selecting a directory is proportional to + * its size. + * + * @param localDirs the target directories from which to select + * @param user the user who owns the application + * @param appId the application ID + * @return the selected directory + * @throws IOException if no application directories for the user can be + * found + */ protected Path getWorkingDir(List localDirs, String user, String appId) throws IOException { - Path appStorageDir = null; long totalAvailable = 0L; long[] availableOnDisk = new long[localDirs.size()]; int i = 0; @@ -557,8 +690,7 @@ protected Path getWorkingDir(List localDirs, String user, // the available space on the directory. // firstly calculate the sum of all available space on these directories for (String localDir : localDirs) { - Path curBase = getApplicationDir(new Path(localDir), - user, appId); + Path curBase = getApplicationDir(new Path(localDir), user, appId); long space = 0L; try { space = getDiskFreeSpace(curBase); @@ -571,8 +703,7 @@ protected Path getWorkingDir(List localDirs, String user, // throw an IOException if totalAvailable is 0. if (totalAvailable <= 0L) { - throw new IOException("Not able to find a working directory for " - + user); + throw new IOException("Not able to find a working directory for " + user); } // make probability to pick a directory proportional to @@ -589,17 +720,26 @@ protected Path getWorkingDir(List localDirs, String user, while (randomPosition > availableOnDisk[dir]) { randomPosition -= availableOnDisk[dir++]; } - appStorageDir = getApplicationDir(new Path(localDirs.get(dir)), - user, appId); - return appStorageDir; + return getApplicationDir(new Path(localDirs.get(dir)), user, appId); } + /** + * Use the {@link LFS} {@link FileContext} to create the target directory. + * + * @param dirPath the target directory + * @param perms the target permissions for the target directory + * @param createParent whether the parent directories should also be created + * @param user the user as whom the target directory should be created. + * Ignored except on secure Windows hosts. + * @throws IOException + * @see WindowsSecureContainerExecutor + */ protected void createDir(Path dirPath, FsPermission perms, boolean createParent, String user) throws IOException { - lfs.mkdir(dirPath, perms, createParent); - if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { - lfs.setPermission(dirPath, perms); + LFS.mkdir(dirPath, perms, createParent); + if (!perms.equals(perms.applyUMask(LFS.getUMask()))) { + LFS.setPermission(dirPath, perms); } } @@ -608,6 +748,9 @@ protected void createDir(Path dirPath, FsPermission perms, *
    .mkdir *
  • $local.dir/usercache/$user
  • *
+ * + * @param localDirs the target directories to create + * @param user the user whose local cache directories should be initialized */ void createUserLocalDirs(List localDirs, String user) throws IOException { @@ -616,7 +759,8 @@ void createUserLocalDirs(List localDirs, String user) for (String localDir : localDirs) { // create $local.dir/usercache/$user and its immediate parent try { - createDir(getUserCacheDir(new Path(localDir), user), userperms, true, user); + createDir(getUserCacheDir(new Path(localDir), user), userperms, true, + user); } catch (IOException e) { LOG.warn("Unable to create the user directory : " + localDir, e); continue; @@ -637,6 +781,9 @@ void createUserLocalDirs(List localDirs, String user) *
  • $local.dir/usercache/$user/appcache
  • *
  • $local.dir/usercache/$user/filecache
  • * + * + * @param localDirs the target directories to create + * @param user the user whose local cache directories should be initialized */ void createUserCacheDirs(List localDirs, String user) throws IOException { @@ -683,7 +830,10 @@ void createUserCacheDirs(List localDirs, String user) *
      *
    • $local.dir/usercache/$user/appcache/$appid
    • *
    - * @param localDirs + * + * @param localDirs the target directories to create + * @param user the user whose local cache directories should be initialized + * @param appId the application ID */ void createAppDirs(List localDirs, String user, String appId) throws IOException { @@ -708,6 +858,10 @@ void createAppDirs(List localDirs, String user, String appId) /** * Create application log directories on all disks. + * + * @param appId the application ID + * @param logDirs the target directories to create + * @param user the user whose local cache directories should be initialized */ void createAppLogDirs(String appId, List logDirs, String user) throws IOException { @@ -734,6 +888,12 @@ void createAppLogDirs(String appId, List logDirs, String user) /** * Create application log directories on all disks. + * + * @param appId the application ID + * @param containerId the container ID + * @param logDirs the target directories to create + * @param user the user as whom the directories should be created. + * Ignored except on secure Windows hosts. */ void createContainerLogDirs(String appId, String containerId, List logDirs, String user) throws IOException { @@ -763,7 +923,9 @@ void createContainerLogDirs(String appId, String containerId, } /** - * Return default container log directory permissions. + * Return the default container log directory permissions. + * + * @return the default container log directory permissions */ @VisibleForTesting public String getLogDirPermissions() { @@ -784,10 +946,12 @@ public void clearLogDirPermissions() { } /** + * Return the list of paths of given local directories. + * * @return the list of paths of given local directories */ private static List getPaths(List dirs) { - List paths = new ArrayList(dirs.size()); + List paths = new ArrayList<>(dirs.size()); for (int i = 0; i < dirs.size(); i++) { paths.add(new Path(dirs.get(i))); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java index c75ecb1..886c53f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java @@ -613,7 +613,7 @@ protected void createDir(Path dirPath, FsPermission perms, } super.createDir(dirPath, perms, createParent, owner); - lfs.setOwner(dirPath, owner, nodeManagerGroup); + LFS.setOwner(dirPath, owner, nodeManagerGroup); } @Override @@ -732,7 +732,7 @@ public void startLocalizer(LocalizerStartContext ctx) throws IOException, @Override protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String containerIdStr, String userName, Path pidFile, Resource resource, - File wordDir, Map environment) throws IOException { + File wordDir, Map environment) { return new WintuilsProcessStubExecutor( wordDir.toString(), containerIdStr, userName, pidFile.toString(),