diff --git src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
index 8e6842e..8439824 100644
--- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
+++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
@@ -65,7 +65,7 @@ public class TestStreamingAsDifferentUser extends
"stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") };
StreamJob streamJob = new StreamJob(args, true);
streamJob.setConf(myConf);
- streamJob.go();
+ assertTrue("Job has not succeeded", streamJob.go() == 0);
assertOwnerShip(outputPath);
}
}
diff --git src/docs/src/documentation/content/xdocs/cluster_setup.xml src/docs/src/documentation/content/xdocs/cluster_setup.xml
index 28350fe..ef6e888 100644
--- src/docs/src/documentation/content/xdocs/cluster_setup.xml
+++ src/docs/src/documentation/content/xdocs/cluster_setup.xml
@@ -649,11 +649,13 @@
distribution. The task tracker uses this executable to
launch and kill tasks. The setuid executable switches to
the user who has submitted the job and launches or kills
- the tasks. Currently, this task controller
- opens up permissions to local files and directories used
- by the tasks such as the job jar files, distributed archive
- files, intermediate files and task log files. In future,
- it is expected that stricter file permissions are used.
+ the tasks. For maximum security, this task controller
+ sets up restricted permissions and user/group ownership of
+ local files and directories used by the tasks such as the
+ job jar files, intermediate files and task log files. Currently
+ permissions on distributed cache files are opened up to be
+ accessible by all users. In future, it is expected that stricter
+ file permissions are set for these files too.
@@ -697,6 +699,10 @@
The executable must be deployed as a setuid executable, by changing
the ownership to root and giving it permissions 4755.
+ The group ownership of the executable must be with the user who runs
+ the TaskTracker. For security reasons, it is *required* that the user
+ running the TaskTracker is the sole member of the group to which he/she
+ belongs to.
The executable requires a configuration file called
@@ -721,10 +727,10 @@
- The LinuxTaskController requires that paths leading up to
+ The LinuxTaskController requires that paths including and leading up to
the directories specified in
- mapred.local.dir and hadoop.log.dir to be 755
- and directories themselves having 777 permissions.
+ mapred.local.dir and hadoop.log.dir to be set 755
+ permissions.
diff --git src/java/org/apache/hadoop/mapred/BackupStore.java src/java/org/apache/hadoop/mapred/BackupStore.java
index 52d0175..77724a3 100644
--- src/java/org/apache/hadoop/mapred/BackupStore.java
+++ src/java/org/apache/hadoop/mapred/BackupStore.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
@@ -548,10 +547,9 @@ public class BackupStore {
boolean isActive() { return isActive; }
private Writer createSpillFile() throws IOException {
- Path tmp = new Path(
- TaskTracker.getIntermediateOutputDir(
- tid.getJobID().toString(), tid.toString()) +
- "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out");
+ Path tmp =
+ new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_"
+ + (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
diff --git src/java/org/apache/hadoop/mapred/Child.java src/java/org/apache/hadoop/mapred/Child.java
index 3ea7dc8..02ae78c 100644
--- src/java/org/apache/hadoop/mapred/Child.java
+++ src/java/org/apache/hadoop/mapred/Child.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.log4j.LogManager;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
/**
* The main() for child processes.
@@ -138,6 +139,10 @@ class Child {
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
JobConf job = new JobConf(task.getJobFile());
+ // setup the child's mapred-local-dir. The child is now sandboxed and
+ // can only see files down and under attemtdir only.
+ TaskRunner.setupChildMapredLocalDirs(task, job);
+
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
//the symlinks have to be rebuilt.
@@ -168,14 +173,15 @@ class Child {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage());
} catch (Throwable throwable) {
- LOG.warn("Error running child", throwable);
+ LOG.warn("Error running child : "
+ + StringUtils.stringifyException(throwable));
try {
if (task != null) {
// do cleanup for the task
task.taskCleanup(umbilical);
}
} catch (Throwable th) {
- LOG.info("Error cleaning up" + th);
+ LOG.info("Error cleaning up : " + StringUtils.stringifyException(th));
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git src/java/org/apache/hadoop/mapred/DefaultTaskController.java src/java/org/apache/hadoop/mapred/DefaultTaskController.java
index 174403b..fd6b4aa 100644
--- src/java/org/apache/hadoop/mapred/DefaultTaskController.java
+++ src/java/org/apache/hadoop/mapred/DefaultTaskController.java
@@ -48,6 +48,8 @@ class DefaultTaskController extends TaskController {
*/
void launchTaskJVM(TaskController.TaskControllerContext context)
throws IOException {
+ initializeTask(context);
+
JvmEnv env = context.env;
List wrappedCommand =
TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
@@ -72,20 +74,13 @@ class DefaultTaskController extends TaskController {
// So this is a dummy method.
return;
}
-
-
- @Override
- void setup() {
- // nothing to setup
- return;
- }
/*
* No need to do anything as we don't need to do as we dont need anything
* extra from what TaskTracker has done.
*/
@Override
- void initializeJob(JobID jobId) {
+ void initializeJob(JobInitializationContext context) {
}
@Override
diff --git src/java/org/apache/hadoop/mapred/IsolationRunner.java src/java/org/apache/hadoop/mapred/IsolationRunner.java
index edf9d0d..56dbe69 100644
--- src/java/org/apache/hadoop/mapred/IsolationRunner.java
+++ src/java/org/apache/hadoop/mapred/IsolationRunner.java
@@ -31,13 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
* IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -169,17 +164,24 @@ public class IsolationRunner {
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
local.setWorkingDirectory(new Path(workDirName.toString()));
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
// set up a classloader with the right classpath
- ClassLoader classLoader = makeClassLoader(conf, workDirName);
+ ClassLoader classLoader =
+ makeClassLoader(conf, new File(workDirName.toString()));
Thread.currentThread().setContextClassLoader(classLoader);
conf.setClassLoader(classLoader);
-
- Path localSplit = new Path(new Path(jobFilename.toString()).getParent(),
- "split.dta");
+
+ // split.dta file is used only by IsolationRunner. The file can now be in
+ // any of the configured local disks, so use LocalDirAllocator to find out
+ // where it is.
+ Path localSplit =
+ new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+ TaskTracker.getLocalSplitFile(taskId.getJobID().toString(), taskId
+ .toString()), conf);
DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
String splitClass = Text.readString(splitFile);
BytesWritable split = new BytesWritable();
diff --git src/java/org/apache/hadoop/mapred/JobConf.java src/java/org/apache/hadoop/mapred/JobConf.java
index 5b5a478..bab43bb 100644
--- src/java/org/apache/hadoop/mapred/JobConf.java
+++ src/java/org/apache/hadoop/mapred/JobConf.java
@@ -1418,7 +1418,7 @@ public class JobConf extends Configuration {
* @return The localized job specific shared directory
*/
public String getJobLocalDir() {
- return get("job.local.dir");
+ return get(TaskTracker.JOB_LOCAL_DIR);
}
public long getMemoryForMapTask() {
diff --git src/java/org/apache/hadoop/mapred/JvmManager.java src/java/org/apache/hadoop/mapred/JvmManager.java
index c2a463b..36c1632 100644
--- src/java/org/apache/hadoop/mapred/JvmManager.java
+++ src/java/org/apache/hadoop/mapred/JvmManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.StringUtils;
class JvmManager {
@@ -111,7 +112,8 @@ class JvmManager {
}
}
- public TaskInProgress getTaskForJvm(JVMId jvmId) {
+ public TaskInProgress getTaskForJvm(JVMId jvmId)
+ throws IOException {
if (jvmId.isMapJVM()) {
return mapJvmManager.getTaskForJvm(jvmId);
} else {
@@ -177,7 +179,8 @@ class JvmManager {
jvmIdToRunner.get(jvmId).setBusy(true);
}
- synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
+ synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
+ throws IOException {
if (jvmToRunningTask.containsKey(jvmId)) {
//Incase of JVM reuse, tasks are returned to previously launched
//JVM via this method. However when a new task is launched
@@ -185,16 +188,26 @@ class JvmManager {
TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
Task task = taskRunner.getTaskInProgress().getTask();
- TaskControllerContext context =
- new TaskController.TaskControllerContext();
+
+ // Initialize task dirs
+ TaskControllerContext context =
+ new TaskController.TaskControllerContext();
context.env = jvmRunner.env;
context.task = task;
- //If we are returning the same task as which the JVM was launched
- //we don't initialize task once again.
- if(!jvmRunner.env.conf.get("mapred.task.id").
- equals(task.getTaskID().toString())) {
- tracker.getTaskController().initializeTask(context);
+ // If we are returning the same task as which the JVM was launched
+ // we don't initialize task once again.
+ if (!jvmRunner.env.conf.get("mapred.task.id").equals(
+ task.getTaskID().toString())) {
+ try {
+ tracker.getTaskController().initializeTask(context);
+ } catch (IOException e) {
+ LOG.warn("Failed to initialize the new task "
+ + task.getTaskID().toString() + " to be given to JVM with id "
+ + jvmId);
+ throw e;
+ }
}
+
return taskRunner.getTaskInProgress();
}
return null;
@@ -393,7 +406,6 @@ class JvmManager {
//Launch the task controller to run task JVM
initalContext.task = jvmToRunningTask.get(jvmId).getTask();
initalContext.env = env;
- tracker.getTaskController().initializeTask(initalContext);
tracker.getTaskController().launchTaskJVM(initalContext);
} catch (IOException ioe) {
// do nothing
@@ -403,13 +415,13 @@ class JvmManager {
if (shexec == null) {
return;
}
-
+
kill();
-
+
int exitCode = shexec.getExitCode();
updateOnJvmExit(jvmId, exitCode);
- LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
- numTasksRan);
+ LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
+ + ". Number of tasks it ran: " + numTasksRan);
try {
// In case of jvm-reuse,
//the task jvm cleans up the common workdir for every
@@ -438,6 +450,7 @@ class JvmManager {
.getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ // Destroy the task jvm
controller.destroyTaskJVM(initalContext);
} else {
LOG.info(String.format("JVM Not killed %s but just removed", jvmId
diff --git src/java/org/apache/hadoop/mapred/LinuxTaskController.java src/java/org/apache/hadoop/mapred/LinuxTaskController.java
index c21ccb1..18c4236 100644
--- src/java/org/apache/hadoop/mapred/LinuxTaskController.java
+++ src/java/org/apache/hadoop/mapred/LinuxTaskController.java
@@ -24,13 +24,13 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -73,52 +73,27 @@ class LinuxTaskController extends TaskController {
new File(hadoopBin, "task-controller").getAbsolutePath();
}
- // The list of directory paths specified in the
- // variable mapred.local.dir. This is used to determine
- // which among the list of directories is picked up
- // for storing data for a particular task.
- private String[] mapredLocalDirs;
-
- // permissions to set on files and directories created.
- // When localized files are handled securely, this string
- // will change to something more restrictive. Until then,
- // it opens up the permissions for all, so that the tasktracker
- // and job owners can access files together.
- private static final String FILE_PERMISSIONS = "ugo+rwx";
-
- // permissions to set on components of the path leading to
- // localized files and directories. Read and execute permissions
- // are required for different users to be able to access the
- // files.
- private static final String PATH_PERMISSIONS = "go+rx";
-
public LinuxTaskController() {
super();
}
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- mapredLocalDirs = conf.getStrings("mapred.local.dir");
- //Setting of the permissions of the local directory is done in
- //setup()
- }
-
/**
* List of commands that the setuid script will execute.
*/
enum TaskCommands {
+ INITIALIZE_JOB,
LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM
+ KILL_TASK_JVM,
}
-
+
/**
* Launch a task JVM that will run as the owner of the job.
*
- * This method launches a task JVM by executing a setuid
- * executable that will switch to the user and run the
- * task.
+ * This method launches a task JVM by executing a setuid executable that will
+ * switch to the user and run the task. Also does initialization of the first
+ * task in the same setuid process launch.
*/
@Override
void launchTaskJVM(TaskController.TaskControllerContext context)
@@ -150,48 +125,103 @@ class LinuxTaskController extends TaskController {
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
- launchTaskJVMArgs, env);
+ launchTaskJVMArgs, env.workDir, env.env);
context.shExec = shExec;
try {
shExec.execute();
} catch (Exception e) {
- LOG.warn("Exception thrown while launching task JVM : " +
- StringUtils.stringifyException(e));
- LOG.warn("Exit code from task is : " + shExec.getExitCode());
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ int exitCode = shExec.getExitCode();
+ LOG.warn("Exit code from task is : " + exitCode);
+ // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+ // terminated/killed forcefully. In all other cases, log the
+ // task-controller output
+ if (exitCode != 143 && exitCode != 137) {
+ LOG.warn("Exception thrown while launching task JVM : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
+ }
throw new IOException(e);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
}
}
/**
- * Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskCommands,
- * String, List, JvmEnv)} documentation.
+ * Helper method that runs a LinuxTaskController command
+ *
+ * @param taskCommand
+ * @param user
+ * @param cmdArgs
+ * @param env
+ * @throws IOException
+ */
+ private void runCommand(TaskCommands taskCommand, String user,
+ List cmdArgs, File workDir, Map env)
+ throws IOException {
+
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+ + shExec.getExitCode());
+ LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+ + " follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+ + " follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+ /**
+ * Returns list of arguments to be passed while initializing a new task. See
+ * {@code buildTaskControllerExecutor(TaskCommands, String, List,
+ * JvmEnv)} documentation.
+ *
* @param context
* @return Argument to be used while launching Task VM
*/
- private List buildLaunchTaskArgs(TaskControllerContext context) {
+ private List buildInitializeTaskArgs(TaskControllerContext context) {
List commandArgs = new ArrayList(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context));
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
- context));
commandArgs.add(jobId);
- if(!context.task.isTaskCleanupTask()) {
+ if (!context.task.isTaskCleanupTask()) {
commandArgs.add(taskId);
- }else {
+ } else {
commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
}
return commandArgs;
}
-
- // get the Job ID from the information in the TaskControllerContext
+
+ @Override
+ void initializeTask(TaskControllerContext context)
+ throws IOException {
+ LOG.info("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+ + " for " + context.task.getTaskID().toString());
+ runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+ buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+ }
+
+ private void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
+ }
+ }
+ }
+
private String getJobId(TaskControllerContext context) {
String taskId = context.task.getTaskID().toString();
TaskAttemptID tId = TaskAttemptID.forName(taskId);
@@ -199,6 +229,29 @@ class LinuxTaskController extends TaskController {
return jobId;
}
+ /**
+ * Returns list of arguments to be passed while launching task VM.
+ * See {@code buildTaskControllerExecutor(TaskCommands,
+ * String, List, JvmEnv)} documentation.
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List buildLaunchTaskArgs(TaskControllerContext context) {
+ List commandArgs = new ArrayList(3);
+ String taskId = context.task.getTaskID().toString();
+ String jobId = getJobId(context);
+ LOG.info("getting the task directory as: "
+ + getTaskCacheDirectory(context));
+ LOG.info("getting the tt_root as " +getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context) );
+ commandArgs.add(getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context));
+ commandArgs.addAll(buildInitializeTaskArgs(context));
+ return commandArgs;
+ }
+
// Get the directory from the list of directories configured
// in mapred.local.dir chosen for storing data pertaining to
// this task.
@@ -208,8 +261,8 @@ class LinuxTaskController extends TaskController {
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
File mapredDir = new File(dir);
- File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
- jobId, taskId, context.task.isTaskCleanupTask()));
+ File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
+ jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
if (directory.equals(taskDir)) {
return dir;
}
@@ -219,68 +272,7 @@ class LinuxTaskController extends TaskController {
throw new IllegalArgumentException("invalid task cache directory "
+ directory.getAbsolutePath());
}
-
- /**
- * Setup appropriate permissions for directories and files that
- * are used by the task.
- *
- * As the LinuxTaskController launches tasks as a user, different
- * from the daemon, all directories and files that are potentially
- * used by the tasks are setup with appropriate permissions that
- * will allow access.
- *
- * Until secure data handling is implemented (see HADOOP-4491 and
- * HADOOP-4493, for e.g.), the permissions are set up to allow
- * read, write and execute access for everyone. This will be
- * changed to restricted access as data is handled securely.
- */
- void initializeTask(TaskControllerContext context) {
- // Setup permissions for the job and task cache directories.
- setupTaskCacheFileAccess(context);
- // setup permissions for task log directory
- setupTaskLogFileAccess(context);
- }
-
- // Allows access for the task to create log files under
- // the task log directory
- private void setupTaskLogFileAccess(TaskControllerContext context) {
- TaskAttemptID taskId = context.task.getTaskID();
- File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
- String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
- changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
- }
-
- // Allows access for the task to read, write and execute
- // the files under the job and task cache directories
- private void setupTaskCacheFileAccess(TaskControllerContext context) {
- String taskId = context.task.getTaskID().toString();
- JobID jobId = JobID.forName(getJobId(context));
- //Change permission for the task across all the disks
- for(String localDir : mapredLocalDirs) {
- File f = new File(localDir);
- File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
- jobId.toString(), taskId, context.task.isTaskCleanupTask()));
- if(taskCacheDir.exists()) {
- changeDirectoryPermissions(taskCacheDir.getPath(),
- FILE_PERMISSIONS, true);
- }
- }//end of local directory Iteration
- }
- // convenience method to execute chmod.
- private void changeDirectoryPermissions(String dir, String mode,
- boolean isRecursive) {
- int ret = 0;
- try {
- ret = FileUtil.chmod(dir, mode, isRecursive);
- } catch (Exception e) {
- LOG.warn("Exception in changing permissions for directory " + dir +
- ". Exception: " + e.getMessage());
- }
- if (ret != 0) {
- LOG.warn("Could not change permissions for directory " + dir);
- }
- }
/**
* Builds the command line for launching/terminating/killing task JVM.
* Following is the format for launching/terminating/killing task JVM
@@ -295,14 +287,15 @@ class LinuxTaskController extends TaskController {
* @param command command to be executed.
* @param userName user name
* @param cmdArgs list of extra arguments
+ * @param workDir working directory for the task-controller
* @param env JVM environment variables.
* @return {@link ShellCommandExecutor}
* @throws IOException
*/
- private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command,
- String userName,
- List cmdArgs, JvmEnv env)
- throws IOException {
+ private ShellCommandExecutor buildTaskControllerExecutor(
+ TaskCommands command, String userName, List cmdArgs,
+ File workDir, Map env)
+ throws IOException {
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
taskControllerCmd[0] = getTaskControllerExecutablePath();
taskControllerCmd[1] = userName;
@@ -317,9 +310,9 @@ class LinuxTaskController extends TaskController {
}
}
ShellCommandExecutor shExec = null;
- if(env.workDir != null && env.workDir.exists()) {
+ if(workDir != null && workDir.exists()) {
shExec = new ShellCommandExecutor(taskControllerCmd,
- env.workDir, env.env);
+ workDir, env);
} else {
shExec = new ShellCommandExecutor(taskControllerCmd);
}
@@ -371,68 +364,21 @@ class LinuxTaskController extends TaskController {
}
}
}
-
- /**
- * Sets up the permissions of the following directories:
- *
- * Job cache directory
- * Archive directory
- * Hadoop log directories
- *
- */
- @Override
- void setup() {
- //set up job cache directory and associated permissions
- String localDirs[] = this.mapredLocalDirs;
- for(String localDir : localDirs) {
- //Cache root
- File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
- File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
- if(!cacheDirectory.exists()) {
- if(!cacheDirectory.mkdirs()) {
- LOG.warn("Unable to create cache directory : " +
- cacheDirectory.getPath());
- }
- }
- if(!jobCacheDirectory.exists()) {
- if(!jobCacheDirectory.mkdirs()) {
- LOG.warn("Unable to create job cache directory : " +
- jobCacheDirectory.getPath());
- }
- }
- //Give world writable permission for every directory under
- //mapred-local-dir.
- //Child tries to write files under it when executing.
- changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
- }//end of local directory manipulations
- //setting up perms for user logs
- File taskLog = TaskLog.getUserLogDir();
- changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+ private List buildInitializeJobCommandArgs(
+ JobInitializationContext context) {
+ List initJobCmdArgs = new ArrayList();
+ initJobCmdArgs.add(context.jobid.toString());
+ return initJobCmdArgs;
}
- /*
- * Create Job directories across disks and set their permissions to 777
- * This way when tasks are run we just need to setup permissions for
- * task folder.
- */
@Override
- void initializeJob(JobID jobid) {
- for(String localDir : this.mapredLocalDirs) {
- File jobDirectory = new File(localDir,
- TaskTracker.getLocalJobDir(jobid.toString()));
- if(!jobDirectory.exists()) {
- if(!jobDirectory.mkdir()) {
- LOG.warn("Unable to create job cache directory : "
- + jobDirectory.getPath());
- continue;
- }
- }
- //Should be recursive because the jar and work folders might be
- //present under the job cache directory
- changeDirectoryPermissions(
- jobDirectory.getPath(), FILE_PERMISSIONS, true);
- }
+ void initializeJob(JobInitializationContext context)
+ throws IOException {
+ LOG.info("Going to initialize job " + context.jobid.toString()
+ + " on the TT");
+ runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+ buildInitializeJobCommandArgs(context), context.workDir, null);
}
/**
@@ -467,7 +413,7 @@ class LinuxTaskController extends TaskController {
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env);
+ buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
try {
shExec.execute();
} catch (Exception e) {
@@ -498,6 +444,5 @@ class LinuxTaskController extends TaskController {
protected String getTaskControllerExecutablePath() {
return taskControllerExe;
- }
+ }
}
-
diff --git src/java/org/apache/hadoop/mapred/LocalJobRunner.java src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index eb48616..47ecae2 100644
--- src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
@@ -90,7 +90,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
private JobStatus status;
private ArrayList mapIds = new ArrayList();
- private MapOutputFile mapoutputFile;
+
private JobProfile profile;
private Path localFile;
private FileSystem localFs;
@@ -110,8 +110,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
public Job(JobID jobid, JobConf conf) throws IOException {
this.file = new Path(getSystemDir(), jobid + "/job.xml");
this.id = jobid;
- this.mapoutputFile = new MapOutputFile(jobid);
- this.mapoutputFile.setConf(conf);
this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
this.localFs = FileSystem.getLocal(conf);
@@ -166,7 +164,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
-
+
+ Map mapOutputFiles =
+ new HashMap();
for (int i = 0; i < rawSplits.length; i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = new TaskAttemptID(
@@ -177,6 +177,12 @@ class LocalJobRunner implements JobSubmissionProtocol {
rawSplits[i].getClassName(),
rawSplits[i].getBytes(), 1);
JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+ MapOutputFile mapOutput = new MapOutputFile(map.getJobID());
+ mapOutput.setConf(localConf);
+ mapOutputFiles.put(mapId, mapOutput);
+
map.setJobFile(localFile.toString());
map.localizeConfiguration(localConf);
map.setConf(localConf);
@@ -194,14 +200,20 @@ class LocalJobRunner implements JobSubmissionProtocol {
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
try {
if (numReduceTasks > 0) {
+ ReduceTask reduce = new ReduceTask(file.toString(),
+ reduceId, 0, mapIds.size(), 1);
+ JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn = this.mapoutputFile.getInputFileForWrite(
- mapId.getTaskID(),reduceId,
- localFs.getFileStatus(mapOut).getLen());
+ Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+ MapOutputFile localOutputFile = new MapOutputFile(jobId);
+ localOutputFile.setConf(localConf);
+ Path reduceIn =
+ localOutputFile.getInputFileForWrite(mapId.getTaskID(),
+ localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@@ -213,9 +225,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
}
if (!this.isInterrupted()) {
- ReduceTask reduce = new ReduceTask(file.toString(),
- reduceId, 0, mapIds.size(), 1);
- JobConf localConf = new JobConf(job);
reduce.setJobFile(localFile.toString());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
@@ -230,11 +239,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
}
} finally {
- for (TaskAttemptID mapId: mapIds) {
- this.mapoutputFile.removeAll(mapId);
- }
- if (numReduceTasks == 1) {
- this.mapoutputFile.removeAll(reduceId);
+ for (MapOutputFile output : mapOutputFiles.values()) {
+ output.removeAll();
}
}
// delete the temporary directory in output directory
diff --git src/java/org/apache/hadoop/mapred/MapOutputFile.java src/java/org/apache/hadoop/mapred/MapOutputFile.java
index 30b71c9..d4afd05 100644
--- src/java/org/apache/hadoop/mapred/MapOutputFile.java
+++ src/java/org/apache/hadoop/mapred/MapOutputFile.java
@@ -31,7 +31,9 @@ class MapOutputFile {
private JobConf conf;
private JobID jobId;
-
+
+ static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
MapOutputFile() {
}
@@ -42,132 +44,143 @@ class MapOutputFile {
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
- /** Return the path to local map output file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", conf);
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", conf);
}
- /** Create a local map output file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", size, conf);
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", size, conf);
}
- /** Return the path to a local map output index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index", conf);
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", conf);
}
- /** Create a local map output index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output index file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index",
- size, conf);
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", size, conf);
}
- /** Return a local map spill file created earlier.
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill file created earlier.
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill"
- + spillNumber + ".out", conf);
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", conf);
}
- /** Create a local map spill file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out", size, conf);
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", size, conf);
}
- /** Return a local map spill index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill index file created earlier
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out.index", conf);
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", conf);
}
- /** Create a local map spill index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill index file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" + spillNumber +
- ".out.index", size, conf);
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", size, conf);
}
- /** Return a local reduce input file created earlier
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
*/
- public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId + ".out",
- conf);
+ public Path getInputFile(int mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
+ .valueOf(mapId)), conf);
}
- /** Create a local reduce input file name.
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId,
- long size)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId.getId() + ".out",
- size, conf);
+ public Path getInputFileForWrite(TaskID mapId, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
+ size, conf);
}
/** Removes all of the files related to a task. */
- public void removeAll(TaskAttemptID taskId) throws IOException {
- conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), taskId.toString())
-);
+ public void removeAll()
+ throws IOException {
+ conf.deleteLocalFiles(TaskTracker.OUTPUT);
}
public void setConf(Configuration conf) {
diff --git src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/MapTask.java
index d5892f3..0656e7c 100644
--- src/java/org/apache/hadoop/mapred/MapTask.java
+++ src/java/org/apache/hadoop/mapred/MapTask.java
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -66,7 +67,6 @@ class MapTask extends Task {
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
private BytesWritable split = new BytesWritable();
private String splitClass;
@@ -101,11 +101,21 @@ class MapTask extends Task {
}
@Override
- public void localizeConfiguration(JobConf conf) throws IOException {
+ public void localizeConfiguration(JobConf conf)
+ throws IOException {
super.localizeConfiguration(conf);
- if (isMapOrReduce()) {
- Path localSplit = new Path(new Path(getJobFile()).getParent(),
- "split.dta");
+ // split.dta file is used only by IsolationRunner.
+ // Write the split file to the local disk if it is a normal map task (not a
+ // job-setup or a job-cleanup task) and if the user wishes to run
+ // IsolationRunner either by setting keep.failed.tasks.files to true or by
+ // using keep.tasks.files.pattern
+ if (isMapOrReduce()
+ && (conf.getKeepTaskFilesPattern() != null || conf
+ .getKeepFailedTaskFiles())) {
+ Path localSplit =
+ new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
+ TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
+ .toString()), conf);
LOG.debug("Writing local split to " + localSplit);
DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
Text.writeString(out, splitClass);
@@ -1220,8 +1230,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
@@ -1285,9 +1295,9 @@ class MapTask extends Task {
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1313,8 +1323,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
@@ -1350,9 +1360,9 @@ class MapTask extends Task {
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1442,14 +1452,14 @@ class MapTask extends Task {
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
+ filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+ rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
@@ -1460,7 +1470,7 @@ class MapTask extends Task {
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
@@ -1468,10 +1478,10 @@ class MapTask extends Task {
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
diff --git src/java/org/apache/hadoop/mapred/MapTaskRunner.java src/java/org/apache/hadoop/mapred/MapTaskRunner.java
index 918b2a6..94750d0 100644
--- src/java/org/apache/hadoop/mapred/MapTaskRunner.java
+++ src/java/org/apache/hadoop/mapred/MapTaskRunner.java
@@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner {
return false;
}
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
diff --git src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java
index f24ea10..5982595 100644
--- src/java/org/apache/hadoop/mapred/ReduceTask.java
+++ src/java/org/apache/hadoop/mapred/ReduceTask.java
@@ -208,7 +208,7 @@ class ReduceTask extends Task {
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+ fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
@@ -1283,12 +1283,11 @@ class ReduceTask extends Task {
// else, we will check the localFS to find a suitable final location
// for this path
TaskAttemptID reduceId = reduceTask.getTaskID();
- Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
- reduceId.getJobID().toString(),
- reduceId.toString())
- + "/map_" +
- loc.getTaskId().getId() + ".out");
-
+ Path filename =
+ new Path(String.format(
+ MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
+ TaskTracker.OUTPUT, loc.getTaskId().getId()));
+
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
@@ -2325,8 +2324,8 @@ class ReduceTask extends Task {
sortPhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), inMemToDiskBytes);
+ final Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
@@ -2620,8 +2619,8 @@ class ReduceTask extends Task {
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), mergeOutputSize);
+ Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
diff --git src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
index 903354e..fc6d1ae 100644
--- src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
+++ src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
@@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunner {
}
// cleanup from failures
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
@@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunner {
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
getTask().getProgress().setStatus("closed");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
diff --git src/java/org/apache/hadoop/mapred/TaskController.java src/java/org/apache/hadoop/mapred/TaskController.java
index 030bf6b..e727177 100644
--- src/java/org/apache/hadoop/mapred/TaskController.java
+++ src/java/org/apache/hadoop/mapred/TaskController.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -45,27 +48,92 @@ abstract class TaskController implements Configurable {
public Configuration getConf() {
return conf;
}
-
+
+ // The list of directory paths specified in the variable mapred.local.dir.
+ // This is used to determine which among the list of directories is picked up
+ // for storing data for a particular task.
+ protected String[] mapredLocalDirs;
+
public void setConf(Configuration conf) {
this.conf = conf;
+ mapredLocalDirs = conf.getStrings("mapred.local.dir");
}
-
+
/**
- * Setup task controller component.
+ * Sets up the permissions of the following directories on all the configured
+ * disks:
+ *
+ * - mapred-local directories
+ * - Job cache directories
+ * - Archive directories
+ * - Hadoop log directories
+ *
+ */
+ void setup() {
+ for (String localDir : this.mapredLocalDirs) {
+ // Set up the mapred-local directories.
+ File mapredlocalDir = new File(localDir);
+ if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
+ LOG.warn("Unable to create mapred-local directory : "
+ + mapredlocalDir.getPath());
+ } else {
+ FileUtil.setPermissions(mapredlocalDir, FileUtil.sevenFiveFive);
+ }
+ // TODO: should throw exception if failed on all disks?
+
+ // Set up the cache directory used for distributed cache files
+ File distributedCacheDir =
+ new File(localDir, TaskTracker.getDistributedCacheDir());
+ if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
+ LOG.warn("Unable to create cache directory : "
+ + distributedCacheDir.getPath());
+ } else {
+ FileUtil.setPermissions(distributedCacheDir, FileUtil.sevenFiveFive);
+ }
+
+ // Set up the jobcache directory
+ File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+ if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
+ LOG.warn("Unable to create job cache directory : "
+ + jobCacheDir.getPath());
+ } else {
+ FileUtil.setPermissions(jobCacheDir, FileUtil.sevenFiveFive);
+ }
+ }
+
+ // Set up the user log directory
+ File taskLog = TaskLog.getUserLogDir();
+ if (!taskLog.exists() && !taskLog.mkdirs()) {
+ LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
+ } else {
+ FileUtil.setPermissions(taskLog, FileUtil.sevenFiveFive);
+ }
+ }
+
+ /**
+ * Take task-controller specific actions to initialize job. This involves
+ * setting appropriate permissions to job-files so as to secure the files to
+ * be accessible only by the user's tasks.
*
+ * @throws IOException
*/
- abstract void setup();
-
-
+ abstract void initializeJob(JobInitializationContext context) throws IOException;
+
/**
* Launch a task JVM
*
- * This method defines how a JVM will be launched to run a task.
+ * This method defines how a JVM will be launched to run a task. Each
+ * task-controller should also do an
+ * {@link #initializeTask(TaskControllerContext)} inside this method so as to
+ * initialize the task before launching it. This is for reasons of
+ * task-controller specific optimizations w.r.t combining initialization and
+ * launching of tasks.
+ *
* @param context the context associated to the task
*/
abstract void launchTaskJVM(TaskControllerContext context)
throws IOException;
-
+
/**
* Top level cleanup a task JVM method.
*
@@ -90,47 +158,44 @@ abstract class TaskController implements Configurable {
}
killTask(context);
}
-
- /**
- * Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, PID and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context);
-
-
+
+ /** Perform initializing actions required before a task can run.
+ *
+ * For instance, this method can be used to setup appropriate
+ * access permissions for files and directories that will be
+ * used by tasks. Tasks use the job cache, log, and distributed cache
+ * directories and files as part of their functioning. Typically,
+ * these files are shared between the daemon and the tasks
+ * themselves. So, a TaskController that is launching tasks
+ * as different users can implement this method to setup
+ * appropriate ownership and permissions for these directories
+ * and files.
+ */
+ abstract void initializeTask(TaskControllerContext context)
+ throws IOException;
+
/**
* Contains task information required for the task controller.
*/
static class TaskControllerContext {
// task being executed
- Task task;
- // the JVM environment for the task
- JvmEnv env;
- // the Shell executor executing the JVM for this task
- ShellCommandExecutor shExec;
- // process handle of task JVM
- String pid;
- // waiting time before sending SIGKILL to task JVM after sending SIGTERM
- long sleeptimeBeforeSigkill;
+ Task task;
+ ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
+
+ // Information used only when this context is used for launching new tasks.
+ JvmEnv env; // the JVM environment for the task.
+
+ // Information used only when this context is used for destroying a task jvm.
+ String pid; // process handle of task JVM.
+ long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+ }
+
+ static class JobInitializationContext {
+ JobID jobid;
+ File workDir;
+ String user;
}
- /**
- * Method which is called after the job is localized so that task controllers
- * can implement their own job localization logic.
- *
- * @param tip Task of job for which localization happens.
- */
- abstract void initializeJob(JobID jobId);
-
/**
* Sends a graceful terminate signal to taskJVM and it sub-processes.
*
@@ -144,6 +209,5 @@ abstract class TaskController implements Configurable {
*
* @param context task context
*/
-
abstract void killTask(TaskControllerContext context);
}
diff --git src/java/org/apache/hadoop/mapred/TaskLog.java src/java/org/apache/hadoop/mapred/TaskLog.java
index 9baafd8..fa1ef28 100644
--- src/java/org/apache/hadoop/mapred/TaskLog.java
+++ src/java/org/apache/hadoop/mapred/TaskLog.java
@@ -54,9 +54,10 @@ public class TaskLog {
private static final Log LOG =
LogFactory.getLog(TaskLog.class);
+ static final String USERLOGS_DIR_NAME = "userlogs";
+
private static final File LOG_DIR =
- new File(System.getProperty("hadoop.log.dir"),
- "userlogs").getAbsoluteFile();
+ new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
static LocalFileSystem localFS = null;
static {
@@ -156,8 +157,12 @@ public class TaskLog {
return new File(getBaseDir(taskid), "log.index");
}
}
-
- private static File getBaseDir(String taskid) {
+
+ static String getBaseLogDir() {
+ return System.getProperty("hadoop.log.dir");
+ }
+
+ static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
private static long prevOutLength;
diff --git src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskRunner.java
index 86c5868..2c6c1bf 100644
--- src/java/org/apache/hadoop/mapred/TaskRunner.java
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java
@@ -121,213 +121,45 @@ abstract class TaskRunner extends Thread {
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
+
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
+ // We don't create any symlinks yet, so presence/absence of workDir
+ // actually on the file system doesn't matter.
setupDistributedCache(lDirAlloc, workDir, archives, files);
-
- if (!prepare()) {
- return;
- }
- // Accumulates class paths for child.
- List classPaths = new ArrayList();
- // start with same classpath as parent process
- appendSystemClasspaths(classPaths);
+ // Set up the child task's configuration. After this call, no localization
+ // of files should happen in the TaskTracker's process space. Any changes to
+ // the conf object after this will NOT be reflected to the child.
+ setupChildTaskConfiguration(lDirAlloc);
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- LOG.fatal("Mkdirs failed to create " + workDir.toString());
- }
+ if (!prepare()) {
+ return;
}
- // include the user specified classpath
- appendJobJarClasspaths(conf.getJar(), classPaths);
-
- // Distributed cache paths
- appendDistributedCacheClasspaths(conf, archives, files, classPaths);
-
- // Include the working dir too
- classPaths.add(workDir.toString());
-
// Build classpath
-
-
- // Build exec child JVM args.
- Vector vargs = new Vector(8);
- File jvm = // use same jvm as parent
- new File(new File(System.getProperty("java.home"), "bin"), "java");
-
- vargs.add(jvm.toString());
-
- // Add child (task) java-vm options.
- //
- // The following symbols if present in mapred.child.java.opts value are
- // replaced:
- // + @taskid@ is interpolated with value of TaskID.
- // Other occurrences of @ will not be altered.
- //
- // Example with multiple arguments and substitutions, showing
- // jvm GC logging, and start of a passwordless JVM JMX agent so can
- // connect with jconsole and the likes to watch child memory, threads
- // and get thread dumps.
- //
- //
- // mapred.child.java.opts
- // -verbose:gc -Xloggc:/tmp/@taskid@.gc \
- // -Dcom.sun.management.jmxremote.authenticate=false \
- // -Dcom.sun.management.jmxremote.ssl=false \
- //
- //
- //
- String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
- javaOpts = javaOpts.replace("@taskid@", taskid.toString());
- String [] javaOptsSplit = javaOpts.split(" ");
-
- // Add java.library.path; necessary for loading native libraries.
- //
- // 1. To support native-hadoop library i.e. libhadoop.so, we add the
- // parent processes' java.library.path to the child.
- // 2. We also add the 'cwd' of the task to it's java.library.path to help
- // users distribute native libraries via the DistributedCache.
- // 3. The user can also specify extra paths to be added to the
- // java.library.path via mapred.child.java.opts.
- //
- String libraryPath = System.getProperty("java.library.path");
- if (libraryPath == null) {
- libraryPath = workDir.getAbsolutePath();
- } else {
- libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
- }
- boolean hasUserLDPath = false;
- for(int i=0; i classPaths = getClassPaths(conf, workDir, archives, files);
- // Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
- vargs.add("-Dhadoop.log.dir=" +
- new File(System.getProperty("hadoop.log.dir")
- ).getAbsolutePath());
- vargs.add("-Dhadoop.root.logger=INFO,TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
-
- if (conf.getProfileEnabled()) {
- if (conf.getProfileTaskRange(t.isMapTask()
- ).isIncluded(t.getPartition())) {
- File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
- vargs.add(String.format(conf.getProfileParams(), prof.toString()));
- }
- }
- // Add main class and its arguments
- vargs.add(Child.class.getName()); // main of Child
- // pass umbilical address
- InetSocketAddress address = tracker.getTaskTrackerReportAddress();
- vargs.add(address.getAddress().getHostAddress());
- vargs.add(Integer.toString(address.getPort()));
- vargs.add(taskid.toString()); // pass task identifier
+ // Build exec child JVM args.
+ Vector vargs =
+ getVMArgs(taskid, workDir, classPaths, logSize);
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
- List setup = null;
- if (ulimitCmd != null) {
- setup = new ArrayList();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
- }
+ List setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
- File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
- File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
- boolean b = stdout.getParentFile().mkdirs();
- if (!b) {
- LOG.warn("mkdirs failed. Ignoring");
- }
- tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
+ File[] logFiles = prepareLogFiles(taskid);
+ File stdout = logFiles[0];
+ File stderr = logFiles[1];
+ tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
+ stderr);
Map env = new HashMap();
- StringBuffer ldLibraryPath = new StringBuffer();
- ldLibraryPath.append(workDir.toString());
- String oldLdLibraryPath = null;
- oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
- if (oldLdLibraryPath != null) {
- ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
- ldLibraryPath.append(oldLdLibraryPath);
- }
- env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-
- // add the env variables passed by the user
- String mapredChildEnv = conf.get("mapred.child.env");
- if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
- String childEnvs[] = mapredChildEnv.split(",");
- for (String cEnv : childEnvs) {
- try {
- String[] parts = cEnv.split("="); // split on '='
- String value = env.get(parts[0]);
- if (value != null) {
- // replace $env with the child's env constructed by tt's
- // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // this key is not configured by the tt for the child .. get it
- // from the tt's env
- // example PATH=$PATH:/tmp
- value = System.getenv(parts[0]);
- if (value != null) {
- // the env key is present in the tt's env
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // the env key is note present anywhere .. simply set it
- // example X=$X:/tmp or X=/tmp
- value = parts[1].replace("$" + parts[0], "");
- }
- }
- env.put(parts[0], value);
- } catch (Throwable t) {
- // set the error msg
- errorInfo = "Invalid User environment settings : " + mapredChildEnv
- + ". Failed to parse user-passed environment param."
- + " Expecting : env1=value1,env2=value2...";
- LOG.warn(errorInfo);
- throw t;
- }
- }
- }
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -355,7 +187,7 @@ abstract class TaskRunner extends Thread {
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
} catch (Throwable throwable) {
- LOG.warn(t.getTaskID() + errorInfo, throwable);
+ LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
Throwable causeThrowable = new Throwable(errorInfo, throwable);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
causeThrowable.printStackTrace(new PrintStream(baos));
@@ -385,15 +217,327 @@ abstract class TaskRunner extends Thread {
}
}
+ /**
+ * Prepare the log files for the task
+ *
+ * @param taskid
+ * @return an array of files. The first file is stdout, the second is stderr.
+ */
+ static File[] prepareLogFiles(TaskAttemptID taskid) {
+ File[] logFiles = new File[2];
+ logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ File logDir = logFiles[0].getParentFile();
+ boolean b = logDir.mkdirs();
+ if (!b) {
+ LOG.warn("mkdirs failed. Ignoring");
+ } else {
+ FileUtil.setPermissions(logDir, FileUtil.sevenZeroZero);
+ }
+ return logFiles;
+ }
+
+ /**
+ * Write the child's configuration to the disk and set it in configuration so
+ * that the child can pick it up from there.
+ *
+ * @param lDirAlloc
+ * @throws IOException
+ */
+ void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
+ throws IOException {
+
+ Path localTaskFile =
+ lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
+ .getJobID().toString(), t.getTaskID().toString(), t
+ .isTaskCleanupTask()), conf);
+
+ // write the child's task configuration file to the local disk
+ writeLocalTaskFile(localTaskFile.toString(), conf);
+
+ // Set the final job file in the task. The child needs to know the correct
+ // path to job.xml. So set this path accordingly.
+ t.setJobFile(localTaskFile.toString());
+ }
+
+ /**
+ * @return
+ */
+ private List getVMSetupCmd() {
+ String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+ List setup = null;
+ if (ulimitCmd != null) {
+ setup = new ArrayList();
+ for (String arg : ulimitCmd) {
+ setup.add(arg);
+ }
+ }
+ return setup;
+ }
+
+ /**
+ * @param taskid
+ * @param workDir
+ * @param classPaths
+ * @param logSize
+ * @return
+ * @throws IOException
+ */
+ private Vector getVMArgs(TaskAttemptID taskid, File workDir,
+ List classPaths, long logSize)
+ throws IOException {
+ Vector vargs = new Vector(8);
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+
+ vargs.add(jvm.toString());
+
+ // Add child (task) java-vm options.
+ //
+ // The following symbols if present in mapred.child.java.opts value are
+ // replaced:
+ // + @taskid@ is interpolated with value of TaskID.
+ // Other occurrences of @ will not be altered.
+ //
+ // Example with multiple arguments and substitutions, showing
+ // jvm GC logging, and start of a passwordless JVM JMX agent so can
+ // connect with jconsole and the likes to watch child memory, threads
+ // and get thread dumps.
+ //
+ //
+ // mapred.child.java.opts
+ // -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+ // -Dcom.sun.management.jmxremote.authenticate=false \
+ // -Dcom.sun.management.jmxremote.ssl=false \
+ //
+ //
+ //
+ String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+ javaOpts = javaOpts.replace("@taskid@", taskid.toString());
+ String [] javaOptsSplit = javaOpts.split(" ");
+
+ // Add java.library.path; necessary for loading native libraries.
+ //
+ // 1. To support native-hadoop library i.e. libhadoop.so, we add the
+ // parent processes' java.library.path to the child.
+ // 2. We also add the 'cwd' of the task to it's java.library.path to help
+ // users distribute native libraries via the DistributedCache.
+ // 3. The user can also specify extra paths to be added to the
+ // java.library.path via mapred.child.java.opts.
+ //
+ String libraryPath = System.getProperty("java.library.path");
+ if (libraryPath == null) {
+ libraryPath = workDir.getAbsolutePath();
+ } else {
+ libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
+ }
+ boolean hasUserLDPath = false;
+ for(int i=0; i getClassPaths(JobConf conf, File workDir,
+ URI[] archives, URI[] files)
+ throws IOException {
+ // Accumulates class paths for child.
+ List classPaths = new ArrayList();
+ // start with same classpath as parent process
+ appendSystemClasspaths(classPaths);
+
+ // include the user specified classpath
+ appendJobJarClasspaths(conf.getJar(), classPaths);
+
+ // Distributed cache paths
+ appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+
+ // Include the working dir too
+ classPaths.add(workDir.toString());
+ return classPaths;
+ }
+
+ /**
+ * @param errorInfo
+ * @param workDir
+ * @param env
+ * @return
+ * @throws Throwable
+ */
+ private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
+ Map env)
+ throws Throwable {
+ StringBuffer ldLibraryPath = new StringBuffer();
+ ldLibraryPath.append(workDir.toString());
+ String oldLdLibraryPath = null;
+ oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
+ if (oldLdLibraryPath != null) {
+ ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+ ldLibraryPath.append(oldLdLibraryPath);
+ }
+ env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+
+ // add the env variables passed by the user
+ String mapredChildEnv = conf.get("mapred.child.env");
+ if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+ String childEnvs[] = mapredChildEnv.split(",");
+ for (String cEnv : childEnvs) {
+ try {
+ String[] parts = cEnv.split("="); // split on '='
+ String value = env.get(parts[0]);
+ if (value != null) {
+ // replace $env with the child's env constructed by tt's
+ // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // this key is not configured by the tt for the child .. get it
+ // from the tt's env
+ // example PATH=$PATH:/tmp
+ value = System.getenv(parts[0]);
+ if (value != null) {
+ // the env key is present in the tt's env
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // the env key is note present anywhere .. simply set it
+ // example X=$X:/tmp or X=/tmp
+ value = parts[1].replace("$" + parts[0], "");
+ }
+ }
+ env.put(parts[0], value);
+ } catch (Throwable t) {
+ // set the error msg
+ errorInfo = "Invalid User environment settings : " + mapredChildEnv
+ + ". Failed to parse user-passed environment param."
+ + " Expecting : env1=value1,env2=value2...";
+ LOG.warn(errorInfo);
+ throw t;
+ }
+ }
+ }
+ return errorInfo;
+ }
+
+ /**
+ * Write the task specific job-configuration file.
+ *
+ * @param localFs
+ * @throws IOException
+ */
+ private static void writeLocalTaskFile(String jobFile, JobConf conf)
+ throws IOException {
+ Path localTaskFile = new Path(jobFile);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(localTaskFile, true);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+ * Whenever it uses LocalDirAllocator from now on inside the child, it will
+ * only see files inside the attempt-directory. This is done in the Child's
+ * process space.
+ */
+ static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+ String[] localDirs = conf.getStrings("mapred.local.dir");
+ String jobId = t.getJobID().toString();
+ String taskId = t.getTaskID().toString();
+ boolean isCleanup = t.isTaskCleanupTask();
+ String childMapredLocalDir =
+ localDirs[0] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup);
+ for (int i = 1; i < localDirs.length; i++) {
+ childMapredLocalDir +=
+ "," + localDirs[i] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup);
+ }
+ LOG.info("mapred.local.dir for child : " + childMapredLocalDir);
+ conf.set("mapred.local.dir", childMapredLocalDir);
+ }
+
/** Creates the working directory pathname for a task attempt. */
static File formWorkDir(LocalDirAllocator lDirAlloc,
TaskAttemptID task, boolean isCleanup, JobConf conf)
throws IOException {
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.toString(), isCleanup)
- + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
- return workDir;
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.toString(), isCleanup), conf);
+
+ return new File(workDir.toString());
}
private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
@@ -412,7 +556,7 @@ abstract class TaskRunner extends Thread {
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -438,7 +582,7 @@ abstract class TaskRunner extends Thread {
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -455,20 +599,12 @@ abstract class TaskRunner extends Thread {
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
- Path localTaskFile = new Path(t.getJobFile());
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
}
}
- private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives,
- URI[] files, List classPaths) throws IOException {
+ private static void appendDistributedCacheClasspaths(JobConf conf,
+ URI[] archives, URI[] files, List classPaths)
+ throws IOException {
// Archive paths
Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
if (archiveClasspaths != null && archives != null) {
@@ -503,8 +639,9 @@ abstract class TaskRunner extends Thread {
}
}
- private void appendSystemClasspaths(List classPaths) {
- for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+ private static void appendSystemClasspaths(List classPaths) {
+ for (String c : System.getProperty("java.class.path").split(
+ SYSTEM_PATH_SEPARATOR)) {
classPaths.add(c);
}
}
@@ -586,19 +723,8 @@ abstract class TaskRunner extends Thread {
// Do not exit even if symlinks have not been created.
LOG.warn(StringUtils.stringifyException(ie));
}
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
- // if temp directory path is not absolute
- // prepend it with workDir.
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
+ createChildTmpDir(workDir, conf);
}
/**
diff --git src/java/org/apache/hadoop/mapred/TaskTracker.java src/java/org/apache/hadoop/mapred/TaskTracker.java
index 2ec5c41..c4b7270 100644
--- src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
@@ -197,10 +198,16 @@ public class TaskTracker
//for serving map output to the other nodes
static Random r = new Random();
- private static final String SUBDIR = "taskTracker";
- private static final String CACHEDIR = "archive";
- private static final String JOBCACHE = "jobcache";
- private static final String OUTPUT = "output";
+ static final String SUBDIR = "taskTracker";
+ private static final String DISTCACHEDIR = "distcache";
+ static final String JOBCACHE = "jobcache";
+ static final String OUTPUT = "output";
+ private static final String JARSDIR = "jars";
+ static final String LOCAL_SPLIT_FILE = "split.dta";
+ static final String JOBFILE = "job.xml";
+
+ static final String JOB_LOCAL_DIR = "job.local.dir";
+
private JobConf fConf;
private FileSystem localFs;
private int maxMapSlots;
@@ -388,25 +395,52 @@ public class TaskTracker
}
}
- static String getCacheSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+ static String getDistributedCacheDir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
}
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
-
+
static String getLocalJobDir(String jobid) {
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
- static String getLocalTaskDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid, false) ;
+ static String getLocalJobConfFile(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+ }
+
+ static String getTaskConfFile(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
+ + TaskTracker.JOBFILE;
+ }
+
+ static String getJobJarsDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+ }
+
+ static String getJobJarFile(String jobid) {
+ return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+ }
+
+ static String getJobWorkDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
+ static String getLocalSplitFile(String jobid, String taskid) {
+ return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.LOCAL_SPLIT_FILE;
}
static String getIntermediateOutputDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid)
- + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.OUTPUT;
+ }
+
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false);
}
static String getLocalTaskDir(String jobid,
@@ -418,7 +452,17 @@ public class TaskTracker
}
return taskDir;
}
-
+
+ static String getTaskWorkDir(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String dir =
+ getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ dir = dir + TASK_CLEANUP_SUFFIX;
+ }
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
if (tip != null) {
@@ -762,10 +806,140 @@ public class TaskTracker
// intialize the job directory
private void localizeJob(TaskInProgress tip) throws IOException {
- Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
- Path jobFile = new Path(t.getJobFile());
+ RunningJob rjob = addTaskToJob(jobId, tip);
+
+ synchronized (rjob) {
+ if (!rjob.localized) {
+
+ JobConf localJobConf = localizeJobFiles(t);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir. Note that initializeJob
+ // should be the last call after every other directory/file to be
+ // directly under the job directory is created.
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = localJobConf.getUser();
+ context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
+ taskController.initializeJob(context);
+
+ rjob.jobConf = localJobConf;
+ rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+ localJobConf.getKeepFailedTaskFiles());
+ rjob.localized = true;
+ }
+ }
+ launchTaskForJob(tip, new JobConf(rjob.jobConf));
+ }
+
+ /**
+ * Localize the job on this tasktracker. Specifically
+ *
+ * - Cleanup and create job directories on all disks
+ * - Download the job config file job.xml from the DFS
+ * - Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
+ * in the configuration.
+ *
- Download the job jar file job.jar from the DFS, unjar it and set jar
+ * file in the configuration.
+ *
+ *
+ * @param t task whose job has to be localized on this TT
+ * @return the modified job configuration to be used for all the tasks of this
+ * job as a starting point.
+ * @throws IOException
+ */
+ JobConf localizeJobFiles(Task t)
+ throws IOException {
+ JobID jobId = t.getJobID();
+
+ // Initialize the job directories first
+ FileSystem localFs = FileSystem.getLocal(fConf);
+ initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+ // Download the job.xml for this job from the system FS
+ Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId);
+
+ JobConf localJobConf = new JobConf(localJobFile);
+
+ // create the 'job-work' directory: job-specific shared directory for use as
+ // scratch space by all tasks of the same job running on this TaskTracker.
+ Path workDir =
+ lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
+ fConf);
+ if (!localFs.mkdirs(workDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
+ localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+
+ // Download the job.jar for this job from the system FS
+ localizeJobJarFile(jobId, localFs, localJobConf);
+ return localJobConf;
+ }
+
+ /**
+ * Prepare the job directories for a given job. To be called by the job
+ * localization code, only if the job is not already localized.
+ *
+ *
+ * Here, we set 700 permissions on the job directories created on all disks.
+ * This we do so as to avoid any misuse by other users till the time
+ * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+ * alter time to set proper private permissions on the job directories.
+ *
+ * @param jobId
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeJobDirs(JobID jobId, FileSystem fs,
+ String[] localDirs)
+ throws IOException {
+ boolean initJobDirStatus = false;
+ String jobDirPath = getLocalJobDir(jobId.toString());
+ for (String localDir : localDirs) {
+ Path jobDir = new Path(localDir, jobDirPath);
+ if (fs.exists(jobDir)) {
+ // this will happen on a partial execution of localizeJob. Sometimes
+ // copying job.xml to the local disk succeeds but copying job.jar might
+ // throw out an exception. We should clean up and then try again.
+ fs.delete(jobDir, true);
+ }
+
+ boolean jobDirStatus = fs.mkdirs(jobDir);
+ if (!jobDirStatus) {
+ LOG.warn("Not able to create job directory " + jobDir.toString());
+ }
+
+ initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+ // TODO: fix return status for the following.
+ // TODO: is this really needed? Try to fix/use RawLocalFileSystem.mkdirs
+ // job-dir has to be private to the TT
+ FileUtil.setPermissions(new File(jobDir.toUri().getPath()),
+ FileUtil.sevenZeroZero);
+ }
+
+ if (!initJobDirStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobId.toString());
+ }
+ }
+
+ /**
+ * Download the job configuration file from the DFS.
+ *
+ * @param t Task whose job file has to be downloaded
+ * @param jobId jobid of the task
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private Path localizeJobConfFile(Path jobFile, JobID jobId)
+ throws IOException {
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
FileStatus status = null;
@@ -776,82 +950,56 @@ public class TaskTracker
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "job.xml",
- jobFileSize, fConf);
- RunningJob rjob = addTaskToJob(jobId, tip);
- synchronized (rjob) {
- if (!rjob.localized) {
-
- FileSystem localFs = FileSystem.getLocal(fConf);
- // this will happen on a partial execution of localizeJob.
- // Sometimes the job.xml gets copied but copying job.jar
- // might throw out an exception
- // we should clean up and then try again
- Path jobDir = localJobFile.getParent();
- if (localFs.exists(jobDir)){
- localFs.delete(jobDir, true);
- boolean b = localFs.mkdirs(jobDir);
- if (!b)
- throw new IOException("Not able to create job directory "
- + jobDir.toString());
- }
- systemFS.copyToLocalFile(jobFile, localJobFile);
- JobConf localJobConf = new JobConf(localJobFile);
-
- // create the 'work' directory
- // job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite(
- (getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty("job.local.dir", workDir.toString());
- localJobConf.set("job.local.dir", workDir.toString());
-
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- try {
- status = systemFS.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch(FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for and we check five times the size of jarFileSize
- // to accommodate for unjarring the jar file in work directory
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "jars",
- 5 * jarFileSize, fConf), "job.jar");
- if (!localFs.mkdirs(localJarFile.getParent())) {
- throw new IOException("Mkdirs failed to create jars directory ");
- }
- systemFS.copyToLocalFile(jarFilePath, localJarFile);
- localJobConf.setJar(localJarFile.toString());
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
- // also unjar the job.jar files
- RunJar.unJar(new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()));
- }
- rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
- localJobConf.getKeepFailedTaskFiles());
- rjob.localized = true;
- rjob.jobConf = localJobConf;
- taskController.initializeJob(jobId);
+
+ Path localJobFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+ jobFileSize, fConf);
+
+ // Download job.xml
+ systemFS.copyToLocalFile(jobFile, localJobFile);
+ return localJobFile;
+ }
+
+ /**
+ * Download the job jar file from DFS to the local file system and unjar it.
+ * Set the local jar file in the passed configuration.
+ *
+ * @param jobId
+ * @param localFs
+ * @param localJobConf
+ * @throws IOException
+ */
+ private void localizeJobJarFile(JobID jobId, FileSystem localFs,
+ JobConf localJobConf)
+ throws IOException {
+ // copy Jar file to the local FS and unjar it.
+ String jarFile = localJobConf.getJar();
+ FileStatus status = null;
+ long jarFileSize = -1;
+ if (jarFile != null) {
+ Path jarFilePath = new Path(jarFile);
+ try {
+ status = systemFS.getFileStatus(jarFilePath);
+ jarFileSize = status.getLen();
+ } catch (FileNotFoundException fe) {
+ jarFileSize = -1;
}
+ // Here we check for and we check five times the size of jarFileSize
+ // to accommodate for unjarring the jar file in userfiles directory
+ Path localJarFile =
+ lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
+ 5 * jarFileSize, fConf);
+
+ // Download job.jar
+ systemFS.copyToLocalFile(jarFilePath, localJarFile);
+
+ localJobConf.setJar(localJarFile.toString());
+
+ // Also un-jar the job.jar files. We un-jar it so that classes inside
+ // sub-directories, for e.g., lib/, classes/ are available on class-path
+ RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
+ .getParent().toString()));
}
- launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
@@ -890,7 +1038,7 @@ public class TaskTracker
for (TaskInProgress tip : tasksToClose.values()) {
tip.jobHasFinished(false);
}
-
+
this.running = false;
// Clear local storage
@@ -929,6 +1077,17 @@ public class TaskTracker
}
/**
+ * For testing
+ */
+ TaskTracker() {
+ server = null;
+ }
+
+ void setConf(JobConf conf) {
+ fConf = conf;
+ }
+
+ /**
* Start with the local machine name, and the default JobTracker
*/
public TaskTracker(JobConf conf) throws IOException {
@@ -1571,7 +1730,7 @@ public class TaskTracker
mapOutputFile.setJobId(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path tmp_output = mapOutputFile.getOutputFile(taskId);
+ Path tmp_output = mapOutputFile.getOutputFile();
if(tmp_output == null)
return 0;
FileSystem localFS = FileSystem.getLocal(conf);
@@ -1847,54 +2006,31 @@ public class TaskTracker
taskTimeout = (10 * 60 * 1000);
}
- private void localizeTask(Task task) throws IOException{
+ void localizeTask(Task task) throws IOException{
- Path localTaskDir =
- lDirAlloc.getLocalPathForWrite(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask()),
- defaultJobConf );
-
FileSystem localFs = FileSystem.getLocal(fConf);
- if (!localFs.mkdirs(localTaskDir)) {
- throw new IOException("Mkdirs failed to create "
- + localTaskDir.toString());
- }
-
- // create symlink for ../work if it already doesnt exist
- String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalJobDir(task.getJobID().toString())
- + Path.SEPARATOR
- + "work", defaultJobConf).toString();
- String link = localTaskDir.getParent().toString()
- + Path.SEPARATOR + "work";
- File flink = new File(link);
- if (!flink.exists())
- FileUtil.symLink(workDir, link);
-
+
// create the working-directory of the task
- Path cwd = lDirAlloc.getLocalPathForWrite(
- getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ Path cwd =
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+ .toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
- Path localTaskFile = new Path(localTaskDir, "job.xml");
- task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
+
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
}
- localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ // Do the task-type specific localization
task.localizeConfiguration(localJobConf);
List staticResolutions = NetUtils.getAllStaticResolutions();
@@ -1927,12 +2063,6 @@ public class TaskTracker
//disable jvm reuse
localJobConf.setNumTasksToExecutePerJvm(1);
}
- OutputStream out = localFs.create(localTaskFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
task.setConf(localJobConf);
}
@@ -2188,7 +2318,7 @@ public class TaskTracker
localJobConf). toString());
} catch (IOException e) {
LOG.warn("Working Directory of the task " + task.getTaskID() +
- "doesnt exist. Caught exception " +
+ " doesnt exist. Caught exception " +
StringUtils.stringifyException(e));
}
// Build the command
@@ -2463,34 +2593,39 @@ public class TaskTracker
if (localJobConf == null) {
return;
}
- String taskDir = getLocalTaskDir(task.getJobID().toString(),
- taskId.toString(), task.isTaskCleanupTask());
+ String localTaskDir =
+ getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
+ String taskWorkDir =
+ getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
//and reduce inputs get stored)
runner.close();
}
- //We don't delete the workdir
- //since some other task (running in the same JVM)
- //might be using the dir. The JVM running the tasks would clean
- //the workdir per a task in the task process itself.
+
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ // No jvm reuse, remove everything
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir));
+ localTaskDir));
}
-
else {
- directoryCleanupThread.addToQueue(localFs,
- getLocalFiles(defaultJobConf,
- taskDir+"/job.xml"));
+ // Jvm reuse. We don't delete the workdir since some other task
+ // (running in the same JVM) might be using the dir. The JVM
+ // running the tasks would clean the workdir per a task in the
+ // task process itself.
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir + Path.SEPARATOR
+ + TaskTracker.JOBFILE));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir+"/work"));
+ taskWorkDir));
}
}
} catch (Throwable ie) {
diff --git src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
index 521c758..56febee 100644
--- src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
+++ src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
@@ -46,7 +46,7 @@ import junit.framework.TestCase;
* Make the built binary to setuid executable
* Execute following targets:
* ant test -Dcompile.c++=true -Dtaskcontroller-path=path to built binary
- * -Dtaskcontroller-user=user,group
+ * -Dtaskcontroller-ugi=user,group
*
*
*/
@@ -81,6 +81,9 @@ public class ClusterWithLinuxTaskController extends TestCase {
private static final int NUMBER_OF_NODES = 1;
+ static final String TASKCONTROLLER_PATH = "taskcontroller-path";
+ static final String TASKCONTROLLER_UGI = "taskcontroller-ugi";
+
private File configurationFile = null;
private UserGroupInformation taskControllerUser;
@@ -97,18 +100,20 @@ public class ClusterWithLinuxTaskController extends TestCase {
MyLinuxTaskController.class.getName());
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
- .toString(), 1, null, null, conf);
+ .toString(), 4, null, null, conf);
// Get the configured taskcontroller-path
- String path = System.getProperty("taskcontroller-path");
- createTaskControllerConf(path);
+ String path = System.getProperty(TASKCONTROLLER_PATH);
+ configurationFile =
+ createTaskControllerConf(path, mrCluster.getTaskTrackerRunner(0)
+ .getLocalDirs());
String execPath = path + "/task-controller";
TaskTracker tracker = mrCluster.getTaskTrackerRunner(0).tt;
// TypeCasting the parent to our TaskController instance as we
// know that that would be instance which should be present in TT.
((MyLinuxTaskController) tracker.getTaskController())
.setTaskControllerExe(execPath);
- String ugi = System.getProperty("taskcontroller-user");
+ String ugi = System.getProperty(TASKCONTROLLER_UGI);
clusterConf = mrCluster.createJobConf();
String[] splits = ugi.split(",");
taskControllerUser = new UnixUserGroupInformation(splits);
@@ -133,21 +138,39 @@ public class ClusterWithLinuxTaskController extends TestCase {
taskControllerUser.getGroupNames()[0]);
}
- private void createTaskControllerConf(String path)
+ /**
+ * Create taskcontroller.cfg.
+ *
+ * @param path Path to the taskcontroller binary.
+ * @param localDirs
+ * @return the created conf file
+ * @throws IOException
+ */
+ static File createTaskControllerConf(String path, String[] localDirs)
throws IOException {
File confDirectory = new File(path, "../conf");
if (!confDirectory.exists()) {
confDirectory.mkdirs();
}
- configurationFile = new File(confDirectory, "taskcontroller.cfg");
+ File configurationFile = new File(confDirectory, "taskcontroller.cfg");
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
- writer.println(String.format("mapred.local.dir=%s", mrCluster
- .getTaskTrackerLocalDir(0)));
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < localDirs.length; i++) {
+ sb.append(localDirs[i]);
+ if ((i + 1) != localDirs.length) {
+ sb.append(",");
+ }
+ }
+ writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+
+ writer
+ .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
writer.flush();
writer.close();
+ return configurationFile;
}
/**
@@ -155,28 +178,35 @@ public class ClusterWithLinuxTaskController extends TestCase {
*
* @return boolean
*/
- protected boolean shouldRun() {
- return isTaskExecPathPassed() && isUserPassed();
+ protected static boolean shouldRun() {
+ if (!isTaskExecPathPassed() || !isUserPassed()) {
+ LOG.info("Not running test.");
+ return false;
+ }
+ return true;
}
- private boolean isTaskExecPathPassed() {
- String path = System.getProperty("taskcontroller-path");
+ private static boolean isTaskExecPathPassed() {
+ String path = System.getProperty(TASKCONTROLLER_PATH);
if (path == null || path.isEmpty()
- || path.equals("${taskcontroller-path}")) {
+ || path.equals("${" + TASKCONTROLLER_PATH + "}")) {
+ LOG.info("Invalid taskcontroller-path : " + path);
return false;
}
return true;
}
- private boolean isUserPassed() {
- String ugi = System.getProperty("taskcontroller-user");
- if (ugi != null && !(ugi.equals("${taskcontroller-user}"))
+ private static boolean isUserPassed() {
+ String ugi = System.getProperty(TASKCONTROLLER_UGI);
+ if (ugi != null && !(ugi.equals("${" + TASKCONTROLLER_UGI + "}"))
&& !ugi.isEmpty()) {
if (ugi.indexOf(",") > 1) {
return true;
}
+ LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
+ LOG.info("Invalid taskcontroller-ugi : " + ugi);
return false;
}
diff --git src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
index 5c26fd1..5f2c49f 100644
--- src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
+++ src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
@@ -171,7 +171,7 @@ public class MiniMRCluster {
StringBuffer localPath = new StringBuffer();
for(int i=0; i < numDir; ++i) {
File ttDir = new File(localDirBase,
- Integer.toString(trackerId) + "_" + 0);
+ Integer.toString(trackerId) + "_" + i);
if (!ttDir.mkdirs()) {
if (!ttDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + ttDir);
diff --git src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
index 3bf0bc5..9ee549c 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -96,23 +97,20 @@ public class TestIsolationRunner extends TestCase {
});
return files.length;
}
-
+
private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
throws IOException {
- String[] localDirs = conf.getLocalDirs();
- assertEquals(1, localDirs.length);
- Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
- "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);
- Path attemptDir = new Path(jobCacheDir,
- new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString());
- return new Path(attemptDir, "job.xml");
+ String taskid =
+ new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
+ return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+ TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
}
public void testIsolationRunOfMapTask() throws
IOException, InterruptedException, ClassNotFoundException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(1, "file:///", 1);
+ mr = new MiniMRCluster(1, "file:///", 4);
// Run a job succesfully; keep task files.
JobConf conf = mr.createJobConf();
@@ -130,8 +128,10 @@ public class TestIsolationRunner extends TestCase {
// Retrieve succesful job's configuration and
// run IsolationRunner against the map task.
FileSystem localFs = FileSystem.getLocal(conf);
- Path mapJobXml = getAttemptJobXml(conf, jobId,
- TaskType.MAP).makeQualified(localFs);
+ Path mapJobXml =
+ getAttemptJobXml(
+ mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId,
+ TaskType.MAP).makeQualified(localFs);
assertTrue(localFs.exists(mapJobXml));
new IsolationRunner().run(new String[] {
diff --git src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
index f923f88..5c9caa1 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
/**
* Test a java-based mapred job with LinuxTaskController running the jobs as a
@@ -39,10 +41,32 @@ public class TestJobExecutionAsDifferentUser extends
startCluster();
Path inDir = new Path("input");
Path outDir = new Path("output");
- RunningJob job =
- UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir);
+
+ RunningJob job;
+
+ // Run a job with zero maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with 1 map and zero reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+ job.waitForCompletion();
assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
+
+ // Run a normal job with maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with jvm reuse
+ JobConf myConf = getClusterConf();
+ myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+ String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+ assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
}
public void testEnvironment() throws IOException {
diff --git src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
index 2ed2bec..e65adae 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
@@ -351,15 +351,26 @@ public class TestKillSubProcesses extends TestCase {
if (ProcessTree.isSetsidAvailable) {
FileSystem fs = FileSystem.getLocal(conf);
- if(fs.exists(scriptDir)){
+ if (fs.exists(scriptDir)) {
fs.delete(scriptDir, true);
}
- // create shell script
- Random rm = new Random();
+
+ // Create the directory and set open permissions so that the TT can
+ // access.
+ fs.mkdirs(scriptDir);
+ fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL,
+ FsAction.ALL));
+
+ // create shell script
+ Random rm = new Random();
Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt()
+ ".sh");
String shellScript = scriptPath.toString();
+
+ // Construct the script. Set umask to 0000 so that TT can access all the
+ // files.
String script =
+ "umask 000\n" +
"echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
"echo hello\n" +
"trap 'echo got SIGTERM' 15 \n" +
@@ -374,7 +385,10 @@ public class TestKillSubProcesses extends TestCase {
file.writeBytes(script);
file.close();
- LOG.info("Calling script from map task of failjob : " + shellScript);
+ // Set executable permissions on the script.
+ new File(scriptPath.toUri().getPath()).setExecutable(true);
+
+ LOG.info("Calling script from map task : " + shellScript);
Runtime.getRuntime()
.exec(shellScript + " " + numLevelsOfSubProcesses);
diff --git src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
new file mode 100644
index 0000000..3013615
--- /dev/null
+++ src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker when {@link LinuxTaskController} is used.
+ *
+ */
+public class TestLocalizationWithLinuxTaskController extends
+ TestTaskTrackerLocalization {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
+
+ private File configFile;
+ private MyLinuxTaskController taskController;
+
+ @Override
+ protected void setUp()
+ throws Exception {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ super.setUp();
+
+ taskController = new MyLinuxTaskController();
+ String path =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+ configFile =
+ ClusterWithLinuxTaskController.createTaskControllerConf(path,
+ localDirs);
+ String execPath = path + "/task-controller";
+ taskController.setTaskControllerExe(execPath);
+ taskController.setConf(trackerFConf);
+ taskController.setup();
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ super.tearDown();
+ if (configFile != null) {
+ configFile.delete();
+ }
+ }
+
+ /**
+ * Test job localization with {@link LinuxTaskController}. Also check the
+ * permissions and file ownership of the job related files.
+ */
+ @Override
+ public void testJobLocalization()
+ throws IOException,
+ LoginException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ // Do job localization
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ localizedJobConf.setUser(ugi.split(",")[0]);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = localizedJobConf.getUser();
+ context.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+ // /////////// The method being tested
+ taskController.initializeJob(context);
+ // ///////////
+
+ UserGroupInformation taskTrackerugi =
+ UserGroupInformation.login(localizedJobConf);
+ for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
+ File jobDir =
+ new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+ // check the private permissions on the job directory
+ checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+
+ // check the private permissions of various directories
+ List dirs = new ArrayList();
+ Path jarsDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
+ .toString()), trackerFConf);
+ dirs.add(jarsDir);
+ dirs.add(new Path(jarsDir, "lib"));
+ for (Path dir : dirs) {
+ checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+
+ // job-work dir needs user writable permissions
+ Path jobWorkDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
+ .toString()), trackerFConf);
+ checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+
+ // check the private permissions of various files
+ List files = new ArrayList();
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker
+ .getLocalJobConfFile(jobId.toString()), trackerFConf));
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+ .toString()), trackerFConf));
+ files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
+ files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
+ for (Path file : files) {
+ checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+ }
+
+ /**
+ * Test task localization with {@link LinuxTaskController}. Also check the
+ * permissions and file ownership of task related files.
+ */
+ @Override
+ public void testTaskLocalization()
+ throws IOException,
+ LoginException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ localizedJobConf.setUser(ugi.split(",")[0]);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = localizedJobConf.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
+
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+
+ // localize the task.
+ tip.localizeTask(task);
+ TaskRunner runner = task.createRunner(tracker, tip);
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf);
+ TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ localizedJobConf);
+ File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+
+ // Initialize task
+ TaskControllerContext taskContext =
+ new TaskController.TaskControllerContext();
+ taskContext.env =
+ new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+ .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+ taskContext.task = task;
+ // /////////// The method being tested
+ taskController.initializeTask(taskContext);
+ // ///////////
+
+ // check the private permissions of various directories
+ List dirs = new ArrayList();
+ dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
+ .toString(), taskId.toString()), trackerFConf));
+ dirs.add(workDir);
+ dirs.add(new Path(workDir, "tmp"));
+ dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
+ UserGroupInformation taskTrackerugi =
+ UserGroupInformation.login(localizedJobConf);
+ for (Path dir : dirs) {
+ checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+
+ // check the private permissions of various files
+ List files = new ArrayList();
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf));
+ for (Path file : files) {
+ checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
+ localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+ }
+ }
+}
diff --git src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
index 665dc33..2f70803 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
@@ -289,7 +289,7 @@ public class TestMapRed extends TestCase {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path input = mapOutputFile.getInputFile(0, taskId);
+ Path input = mapOutputFile.getInputFile(0);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
diff --git src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
index 1dac0ac..f14234c 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
@@ -135,7 +135,7 @@ public class TestMiniMRWithDFS extends TestCase {
int numNotDel = 0;
File localDir = new File(mr.getTaskTrackerLocalDir(i));
LOG.debug("Tracker directory: " + localDir);
- File trackerDir = new File(localDir, "taskTracker");
+ File trackerDir = new File(localDir, TaskTracker.SUBDIR);
assertTrue("local dir " + localDir + " does not exist.",
localDir.isDirectory());
assertTrue("task tracker dir " + trackerDir + " does not exist.",
@@ -150,7 +150,7 @@ public class TestMiniMRWithDFS extends TestCase {
}
for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
String name = contents[fileIdx];
- if (!("taskTracker".equals(contents[fileIdx]))) {
+ if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
LOG.debug("Looking at " + name);
assertTrue("Spurious directory " + name + " found in " +
localDir, false);
@@ -158,7 +158,7 @@ public class TestMiniMRWithDFS extends TestCase {
}
for (int idx = 0; idx < neededDirs.size(); ++idx) {
String name = neededDirs.get(idx);
- if (new File(new File(new File(trackerDir, "jobcache"),
+ if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
jobIds[idx]), name).isDirectory()) {
found[idx] = true;
numNotDel++;
diff --git src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
new file mode 100644
index 0000000..d8dba9b
--- /dev/null
+++ src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
+import junit.framework.TestCase;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker.
+ *
+ */
+public class TestTaskTrackerLocalization extends TestCase {
+
+ private File TEST_ROOT_DIR;
+ private File ROOT_MAPRED_LOCAL_DIR;
+ private File HADOOP_LOG_DIR;
+
+ private int numLocalDirs = 6;
+ private static final Log LOG =
+ LogFactory.getLog(TestTaskTrackerLocalization.class);
+
+ protected TaskTracker tracker;
+ protected JobConf trackerFConf;
+ protected JobID jobId;
+ protected TaskAttemptID taskId;
+ protected Task task;
+ protected String[] localDirs;
+ protected static LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
+
+ @Override
+ protected void setUp()
+ throws Exception {
+ TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp"),
+ "testTaskTrackerLocalization");
+ if (!TEST_ROOT_DIR.exists()) {
+ TEST_ROOT_DIR.mkdirs();
+ }
+
+ ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
+ ROOT_MAPRED_LOCAL_DIR.mkdirs();
+
+ HADOOP_LOG_DIR = new File(TEST_ROOT_DIR, "logs");
+ HADOOP_LOG_DIR.mkdir();
+ System.setProperty("hadoop.log.dir", HADOOP_LOG_DIR.getAbsolutePath());
+
+ trackerFConf = new JobConf();
+ trackerFConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ localDirs = new String[numLocalDirs];
+ for (int i = 0; i < numLocalDirs; i++) {
+ localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
+ }
+ trackerFConf.setStrings("mapred.local.dir", localDirs);
+
+ // Create the job jar file
+ File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+ JarOutputStream jstream =
+ new JarOutputStream(new FileOutputStream(jobJarFile));
+ ZipEntry ze = new ZipEntry("lib/lib1.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ ze = new ZipEntry("lib/lib2.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ jstream.finish();
+ jstream.close();
+ trackerFConf.setJar(jobJarFile.toURI().toString());
+
+ // Create the job configuration file
+ File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+ FileOutputStream out = new FileOutputStream(jobConfFile);
+ trackerFConf.writeXml(out);
+ out.close();
+
+ // Set up the TaskTracker
+ tracker = new TaskTracker();
+ tracker.setConf(trackerFConf);
+ tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+ // Set up the task to be localized
+ String jtIdentifier = "200907202331";
+ jobId = new JobID(jtIdentifier, 1);
+ taskId =
+ new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
+ task =
+ new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+
+ TaskController taskController = new DefaultTaskController();
+ taskController.setConf(trackerFConf);
+ taskController.setup();
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ FileUtil.fullyDelete(TEST_ROOT_DIR);
+ }
+
+ private static String[] getFilePermissionAttrs(String path)
+ throws IOException {
+ String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G");
+ return output.split(":|\n");
+ }
+
+ static void checkFilePermissions(String path, String expectedPermissions,
+ String expectedOwnerUser, String expectedOwnerGroup)
+ throws IOException {
+ String[] attrs = getFilePermissionAttrs(path);
+ assertTrue("File attrs length is not 3 but " + attrs.length,
+ attrs.length == 3);
+ assertTrue("Path " + path + " has the permissions " + attrs[0]
+ + " instead of the expected " + expectedPermissions, attrs[0]
+ .equals(expectedPermissions));
+ assertTrue("Path " + path + " is not user owned not by "
+ + expectedOwnerUser + " but by " + attrs[1], attrs[1]
+ .equals(expectedOwnerUser));
+ assertTrue("Path " + path + " is not group owned not by "
+ + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
+ .equals(expectedOwnerGroup));
+ }
+
+ /**
+ * Verify the task-controller's setup functionality
+ *
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testTaskControllerSetup()
+ throws IOException,
+ LoginException {
+ // Task-controller is already set up in the test's setup method. Now verify.
+ UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
+ for (String localDir : localDirs) {
+
+ // Verify the local-dir itself.
+ File lDir = new File(localDir);
+ assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
+ checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+
+ // Verify the distributed cache dir.
+ File distributedCacheDir =
+ new File(localDir, TaskTracker.getDistributedCacheDir());
+ assertTrue("distributed cache dir " + distributedCacheDir
+ + " doesn't exists!", distributedCacheDir.exists());
+ checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+ "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
+
+ // Verify the job cache dir.
+ File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+ assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
+ jobCacheDir.exists());
+ checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+ }
+
+ // Verify the pemissions on the userlogs dir
+ File taskLog = TaskLog.getUserLogDir();
+ checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+ }
+
+ /**
+ * Test job localization on a TT. Tests localization of job.xml, job.jar and
+ * corresponding setting of configuration.
+ *
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testJobLocalization()
+ throws IOException,
+ LoginException {
+
+ // /////////// The main method being tested
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ // ///////////
+
+ // Check the directory structure
+ for (String dir : localDirs) {
+
+ File localDir = new File(dir);
+ assertTrue("mapred.local.dir " + localDir + " isn'task created!",
+ localDir.exists());
+
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ + "is not created!", taskTrackerSubDir.exists());
+
+ File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
+ assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
+ + " isn'task created!", jobCache.exists());
+
+ File jobDir = new File(jobCache, jobId.toString());
+ assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
+ .exists());
+
+ // check the private permissions on the job directory
+ UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+ checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+ }
+
+ // check the localization of job.xml
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+ assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
+ trackerFConf) != null);
+
+ // check the localization of job.jar
+ Path jarFileLocalized =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+ .toString()), trackerFConf);
+ assertTrue("job.jar is not localized on this TaskTracker!!",
+ jarFileLocalized != null);
+ assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
+ jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
+ .exists());
+ assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
+ jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
+ .exists());
+
+ // check the creation of job work directory
+ assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
+ trackerFConf) != null);
+
+ // Check the setting of job.local.dir and job.jar which will eventually be
+ // used by the user's task
+ boolean jobLocalDirFlag = false, mapredJarFlag = false;
+ String localizedJobLocalDir =
+ localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
+ String localizedJobJar = localizedJobConf.getJar();
+ for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+ if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
+ + TaskTracker.getJobWorkDir(jobId.toString()))) {
+ jobLocalDirFlag = true;
+ }
+ if (localizedJobJar.equals(localDir + Path.SEPARATOR
+ + TaskTracker.getJobJarFile(jobId.toString()))) {
+ mapredJarFlag = true;
+ }
+ }
+ assertTrue(TaskTracker.JOB_LOCAL_DIR
+ + " is not set properly to the target users directory : "
+ + localizedJobLocalDir, jobLocalDirFlag);
+ assertTrue(
+ "mapred.jar is not set properly to the target users directory : "
+ + localizedJobJar, mapredJarFlag);
+ }
+
+ /**
+ * Test task localization on a TT.
+ *
+ * @throws IOException
+ * @throws LoginException
+ */
+ public void testTaskLocalization()
+ throws IOException,
+ LoginException {
+
+ JobConf localizedJobConf = tracker.localizeJobFiles(task);
+
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+
+ // ////////// The central method being tested
+ tip.localizeTask(task);
+ // //////////
+
+ // check the functionality of localizeTask
+ assertTrue("attempt-dir for " + taskId.toString()
+ + " is not created in any of the configured dirs!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId.toString(),
+ taskId.toString()), trackerFConf) != null);
+
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf);
+ assertTrue("atttempt work dir for " + taskId.toString()
+ + " is not created in any of the configured dirs!!", workDir != null);
+
+ TaskRunner runner = task.createRunner(tracker, tip);
+
+ // /////// Few more methods being tested
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ localizedJobConf);
+ File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
+ // ///////
+
+ // Make sure the task-conf file is created
+ Path localTaskFile =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), trackerFConf);
+ assertTrue("Task conf file " + localTaskFile.toString()
+ + " is not created!!", new File(localTaskFile.toUri().getPath())
+ .exists());
+
+ // /////// One more method being tested. This happens in child space.
+ JobConf localizedTaskConf = new JobConf(localTaskFile);
+ TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+ // ///////
+
+ // Make sure that the mapred.local.dir is sandboxed
+ for (String childMapredLocalDir : localizedTaskConf
+ .getStrings("mapred.local.dir")) {
+ assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
+ childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
+ .toString(), taskId.toString(), false)));
+ }
+
+ // Make sure task task.getJobFile is changed and pointed correctly.
+ assertTrue(task.getJobFile().endsWith(
+ TaskTracker
+ .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+
+ // Make sure that the tmp directories are created
+ assertTrue("tmp dir is not created in workDir "
+ + workDir.toUri().getPath(),
+ new File(workDir.toUri().getPath(), "tmp").exists());
+
+ // Make sure that the log are setup properly
+ File logDir =
+ new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
+ + task.getTaskID().toString());
+ assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
+ logDir.exists());
+ UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
+ checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
+ .getUserName(), ugi.getGroupNames()[0]);
+
+ File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
+ assertTrue("stdout log file is improper. Expected : "
+ + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
+ expectedStdout.toString().equals(logFiles[0].toString()));
+ File expectedStderr =
+ new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
+ assertTrue("stderr log file is improper. Expected : "
+ + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
+ expectedStderr.toString().equals(logFiles[1].toString()));
+ }
+}