Index: conf/taskcontroller.cfg =================================================================== --- conf/taskcontroller.cfg (revision 787560) +++ conf/taskcontroller.cfg (working copy) @@ -1,3 +1,2 @@ -mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated -hadoop.pid.dir=#configured HADOOP_PID_DIR -hadoop.indent.str=#configured HADOOP_IDENT_STR +mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths. +hadoop.log.dir=#configured value of hadoop.log.dir \ No newline at end of file Index: src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java =================================================================== --- src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (revision 787560) +++ src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (working copy) @@ -39,10 +39,37 @@ startCluster(); Path inDir = new Path("input"); Path outDir = new Path("output"); - RunningJob job = - UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir); + JobConf myConf = getClusterConf(); + myConf.setBoolean("keep.failed.task.files", true); + myConf.setInt("mapred.map.max.attempts", 1); + myConf.setInt("mapred.reduce.max.attempts", 1); + + RunningJob job; + // Run a job with zero maps/reduces + job = UtilsForTests.runJob(myConf, inDir, outDir, 0, 0); + job.waitForCompletion(); assertTrue("Job failed", job.isSuccessful()); assertOwnerShip(outDir); + + // Run a job with 1 map and zero reduces + job = UtilsForTests.runJob(myConf, inDir, outDir, 1, 0); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); + + // Run a normal job with maps/reduces + job = UtilsForTests.runJob(myConf, inDir, outDir, 1, 1); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); + + // Run a job with jvm reuse + // TODO: succeeds. to be confirmed + myConf.set("mapred.job.reuse.jvm.num.tasks", "-1"); + job = UtilsForTests.runJob(myConf, inDir, outDir, 4, 4); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); } public void testEnvironment() throws IOException { Index: src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java =================================================================== --- src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (revision 787560) +++ src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (working copy) @@ -160,7 +160,7 @@ StringBuffer localPath = new StringBuffer(); for(int i=0; i < numDir; ++i) { File ttDir = new File(localDirBase, - Integer.toString(trackerId) + "_" + 0); + Integer.toString(trackerId) + "_" + i); if (!ttDir.mkdirs()) { if (!ttDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + ttDir); Index: src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java =================================================================== --- src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (revision 787560) +++ src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (working copy) @@ -348,15 +348,26 @@ FileSystem fs = FileSystem.getLocal(conf); TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath(); scriptDir = new Path(TEST_ROOT_DIR + "/script"); - if(fs.exists(scriptDir)){ + if (fs.exists(scriptDir)) { fs.delete(scriptDir, true); } + + // Create the directory and set open permissions so that the TT can + // access. + fs.mkdirs(scriptDir); + fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL, + FsAction.ALL)); + // create shell script Random rm = new Random(); Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt() + ".sh"); String shellScript = scriptPath.toString(); + + // Construct the script. Set umask to 0000 so that TT can access all the + // files. String script = + "umask 0000\n" + "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" + "echo hello\n" + "trap 'echo got SIGTERM' 15 \n" + Index: src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java =================================================================== --- src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (revision 787560) +++ src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (working copy) @@ -97,7 +97,7 @@ MyLinuxTaskController.class.getName()); mrCluster = new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri() - .toString(), 1, null, null, conf); + .toString(), 4, null, null, conf); // Get the configured taskcontroller-path String path = System.getProperty("taskcontroller-path"); @@ -143,9 +143,19 @@ PrintWriter writer = new PrintWriter(new FileOutputStream(configurationFile)); - writer.println(String.format("mapred.local.dir=%s", mrCluster - .getTaskTrackerLocalDir(0))); + StringBuffer sb = new StringBuffer(); + String[] localDirs = mrCluster.getTaskTrackerRunner(0).getLocalDirs(); + for(int i = 0 ; i < localDirs.length; i++) { + sb.append(localDirs[i]); + if((i + 1) != localDirs.length) { + sb.append(","); + } + } + writer.println(String.format("mapred.local.dir=%s", sb.toString())); + writer + .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir())); + writer.flush(); writer.close(); } Index: src/test/log4j.properties =================================================================== --- src/test/log4j.properties (revision 787560) +++ src/test/log4j.properties (working copy) @@ -1,6 +1,6 @@ # log4j configuration used during build and unit tests -log4j.rootLogger=info,stdout +log4j.rootLogger=debug,stdout log4j.threshhold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout Index: src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java =================================================================== --- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (revision 787560) +++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java (working copy) @@ -65,7 +65,7 @@ "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") }; StreamJob streamJob = new StreamJob(args, true); streamJob.setConf(myConf); - streamJob.go(); + assertTrue("Job has not succeeded", streamJob.go() == 0); assertOwnerShip(outputPath); } } Index: src/java/org/apache/hadoop/mapred/TaskController.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskController.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/TaskController.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import org.apache.commons.logging.Log; @@ -24,6 +25,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -45,18 +47,55 @@ public Configuration getConf() { return conf; } - + + // The list of directory paths specified in the + // variable mapred.local.dir. This is used to determine + // which among the list of directories is picked up + // for storing data for a particular task. + protected String[] mapredLocalDirs; + public void setConf(Configuration conf) { this.conf = conf; + mapredLocalDirs = conf.getStrings("mapred.local.dir"); } - - /** - * Setup task controller component. + + private static final String FILE_PERMISSIONS = "ugo+rwx"; + +/** + * Sets up the permissions of the following directories: * + * Job cache directory Archive directory Hadoop log directories + * */ - abstract void setup(); + void setup() { + // set up job cache directory and associated permissions + String localDirs[] = this.mapredLocalDirs; + for (String localDir : localDirs) { + // Cache root + File cacheDirectory = new File(localDir, TaskTracker.getCacheSubdir()); + File jobCacheDirectory = + new File(localDir, TaskTracker.getJobCacheSubdir()); + if (!cacheDirectory.exists()) { + if (!cacheDirectory.mkdirs()) { + LOG.warn("Unable to create cache directory : " + + cacheDirectory.getPath()); + } + DiskChecker.setPermissions(cacheDirectory, DiskChecker.sevenFiveFive); + } + if (!jobCacheDirectory.exists()) { + if (!jobCacheDirectory.mkdirs()) { + LOG.warn("Unable to create job cache directory : " + + jobCacheDirectory.getPath()); + DiskChecker.setPermissions(jobCacheDirectory, + DiskChecker.sevenFiveFive); + } + } + } + // setting up permissions for user logs + File taskLog = TaskLog.getUserLogDir(); + DiskChecker.setPermissions(taskLog, DiskChecker.sevenFiveFive); + } - /** * Launch a task JVM * @@ -92,22 +131,6 @@ } /** - * Perform initializing actions required before a task can run. - * - * For instance, this method can be used to setup appropriate - * access permissions for files and directories that will be - * used by tasks. Tasks use the job cache, log, PID and distributed cache - * directories and files as part of their functioning. Typically, - * these files are shared between the daemon and the tasks - * themselves. So, a TaskController that is launching tasks - * as different users can implement this method to setup - * appropriate ownership and permissions for these directories - * and files. - */ - abstract void initializeTask(TaskControllerContext context); - - - /** * Contains task information required for the task controller. */ static class TaskControllerContext { @@ -122,14 +145,6 @@ // waiting time before sending SIGKILL to task JVM after sending SIGTERM long sleeptimeBeforeSigkill; } - - /** - * Method which is called after the job is localized so that task controllers - * can implement their own job localization logic. - * - * @param tip Task of job for which localization happens. - */ - abstract void initializeJob(JobID jobId); /** * Sends a graceful terminate signal to taskJVM and it sub-processes. @@ -144,6 +159,24 @@ * * @param context task context */ - abstract void killTask(TaskControllerContext context); + + /** + * Take task-controller specific actions to finalize task directories once the + * task finishes + * + * @param context + * @throws IOException + */ + abstract void finalizeTaskDirs(TaskControllerContext context) + throws IOException; + + /** + * Take task-controller specific actions to finalize job on its finishing + * + * @param context + * @throws IOException + */ + abstract void finalizeJob(JobID jobId, String workDir, String user) + throws IOException; } Index: src/java/org/apache/hadoop/mapred/Child.java =================================================================== --- src/java/org/apache/hadoop/mapred/Child.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/Child.java (working copy) @@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.jvm.JvmMetrics; import org.apache.log4j.LogManager; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; /** * The main() for child processes. @@ -137,6 +138,8 @@ //are viewable immediately TaskLog.syncLogs(firstTaskid, taskid, isCleanup); JobConf job = new JobConf(task.getJobFile()); + LOG.info(" task file is " + task.getJobFile()); + LOG.info("Local dir in child : " + job.get("mapred.local.dir")); //setupWorkDir actually sets up the symlinks for the distributed //cache. After a task exits we wipe the workdir clean, and hence @@ -147,6 +150,23 @@ assert(numTasksToExecute != 0); TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24)); + // Set up the mapred.local.dir for the child. The child is + // sand-boxed now. Whenever it uses LocalDirAllocator from now on, it + // will only see files inside the attempt-directory. + String childMapredLocalDir = ""; + LOG.info("mapred.local.dir for child before changing : " + + job.get("mapred.local.dir")); + for (String mapredLocalDir : job.getStrings("mapred.local.dir")) { + childMapredLocalDir += + mapredLocalDir + + Path.SEPARATOR + + TaskTracker.getLocalTaskDir(task.getJobID().toString(), + task.getTaskID().toString(), task.isJobCleanupTask()) + + ","; + } + LOG.info("mapred.local.dir for child : " + childMapredLocalDir); + job.set("mapred.local.dir", childMapredLocalDir); + task.setConf(job); defaultConf.addResource(new Path(task.getJobFile())); @@ -168,14 +188,15 @@ LOG.fatal("FSError from child", e); umbilical.fsError(taskid, e.getMessage()); } catch (Throwable throwable) { - LOG.warn("Error running child", throwable); + LOG.warn("Error running child : " + + StringUtils.stringifyException(throwable)); try { if (task != null) { // do cleanup for the task task.taskCleanup(umbilical); } } catch (Throwable th) { - LOG.info("Error cleaning up" + th); + LOG.info("Error cleaning up : " + StringUtils.stringifyException(th)); } // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); Index: src/java/org/apache/hadoop/mapred/JvmManager.java =================================================================== --- src/java/org/apache/hadoop/mapred/JvmManager.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/JvmManager.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.ProcessTree; +import org.apache.hadoop.util.StringUtils; class JvmManager { @@ -185,16 +186,6 @@ TaskRunner taskRunner = jvmToRunningTask.get(jvmId); JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); Task task = taskRunner.getTaskInProgress().getTask(); - TaskControllerContext context = - new TaskController.TaskControllerContext(); - context.env = jvmRunner.env; - context.task = task; - //If we are returning the same task as which the JVM was launched - //we don't initialize task once again. - if(!jvmRunner.env.conf.get("mapred.task.id"). - equals(task.getTaskID().toString())) { - tracker.getTaskController().initializeTask(context); - } return taskRunner.getTaskInProgress(); } return null; @@ -218,8 +209,15 @@ synchronized public void taskKilled(TaskRunner tr) { JVMId jvmId = runningTaskToJvm.remove(tr); if (jvmId != null) { - jvmToRunningTask.remove(jvmId); + TaskRunner runner = jvmToRunningTask.remove(jvmId); killJvm(jvmId); + try { + finalizeTaskDirs(runner); + } catch (IOException e) { + LOG.info("FinalizeTaskDirs for " + tr.getTask().getTaskID() + + " failed with an exception : " + + StringUtils.stringifyException(e)); + } } } @@ -393,7 +391,6 @@ //Launch the task controller to run task JVM initalContext.task = jvmToRunningTask.get(jvmId).getTask(); initalContext.env = env; - tracker.getTaskController().initializeTask(initalContext); tracker.getTaskController().launchTaskJVM(initalContext); } catch (IOException ioe) { // do nothing @@ -405,7 +402,16 @@ } kill(); - + + // Task finished. Try finalizing the task directories. + try { + finalizeTaskDirs(jvmToRunningTask.get(jvmId)); + } catch (IOException e) { + LOG.info("FinalizeTaskDirs for " + initalContext.task.getTaskID() + + " failed with an exception : " + + StringUtils.stringifyException(e)); + } + int exitCode = shexec.getExitCode(); updateOnJvmExit(jvmId, exitCode); LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + @@ -438,7 +444,18 @@ .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); + // Destroy the task jvm controller.destroyTaskJVM(initalContext); + + // Now finalize the directories. + try { + controller.finalizeTaskDirs(initalContext); + } catch (IOException e) { + LOG.info("FinalizeTaskDirs for " + + initalContext.task.getTaskID() + + " failed with an exception : " + + StringUtils.stringifyException(e)); + } } else { LOG.info(String.format("JVM Not killed %s but just removed", jvmId .toString())); @@ -462,6 +479,33 @@ return busy; } } + + /** + * Finalize the task directories. + * @param runner + * @throws IOException + */ + public void finalizeTaskDirs(TaskRunner runner) + throws IOException { + JVMId jvmId = runningTaskToJvm.get(runner); + if (jvmId == null) { + LOG.debug("JvmId is null, not doing anything for finalizeTaskDirs."); + return; + } + JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); + if (jvmRunner == null) { + LOG + .debug("JvmRunner is null, not doing anything for finalizeTaskDirs."); + return; + } + TaskControllerContext initialContext = jvmRunner.initalContext; + if (initialContext != null && initialContext.env != null) { + tracker.getTaskController().finalizeTaskDirs(initialContext); + } else { + LOG.debug("initialContext or initialContext.env is null," + + " not doing anything for finalizeTaskDirs."); + } + } } static class JvmEnv { //Helper class List vargs; @@ -485,4 +529,20 @@ this.conf = conf; } } + + // TODO: check where else it has to be called. + /** + * Finalize the task directories once the task finishes. + * + * @param runner + * @throws IOException + */ + void finalizeTaskDirs(TaskRunner runner) + throws IOException { + if (runner.getTask().isMapTask()) { + mapJvmManager.finalizeTaskDirs(runner); + } else { + reduceJvmManager.finalizeTaskDirs(runner); + } + } } Index: src/java/org/apache/hadoop/mapred/TaskRunner.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskRunner.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/TaskRunner.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -140,7 +141,9 @@ LOG.fatal("Mkdirs failed to create " + workDir.toString()); } } + DiskChecker.setPermissions(workDir, DiskChecker.sevenZeroZero); + // include the user specified classpath appendJobJarClasspaths(conf.getJar(), classPaths); @@ -227,6 +230,8 @@ if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } + DiskChecker.setPermissions(new File(tmpDir.toUri().getPath()), + DiskChecker.sevenZeroZero); vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); // Add classpath. @@ -278,6 +283,12 @@ if (!b) { LOG.warn("mkdirs failed. Ignoring"); } + + // TODO: LOG_SECCURITY: The following needs to be modified after security + // is fixed for the logs + DiskChecker.setPermissions(stdout.getParentFile(), + DiskChecker.sevenFiveFive); + tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr); Map env = new HashMap(); @@ -415,7 +426,7 @@ String cachePath = TaskTracker.getCacheSubdir() + Path.SEPARATOR + cacheId; - localPath = lDirAlloc.getLocalPathForWrite(cachePath, + localPath = lDirAlloc.getSecureLocalPathForWrite(cachePath, fileStatus.getLen(), conf); baseDir = localPath.toString().replace(cacheId, ""); p[i] = DistributedCache.getLocalCache(archives[i], conf, @@ -459,11 +470,15 @@ FileSystem localFs = FileSystem.getLocal(conf); localFs.delete(localTaskFile, true); OutputStream out = localFs.create(localTaskFile); + DiskChecker.setPermissions(new File(localTaskFile.toUri().getPath()), + DiskChecker.sevenZeroZero); try { conf.writeXml(out); } finally { out.close(); } + DiskChecker.setPermissions(new File(localTaskFile.toUri().getPath()), + DiskChecker.sevenZeroZero); } } @@ -554,6 +569,7 @@ File flink = new File(link); if (!flink.exists()) { FileUtil.symLink(localArchives[i].toString(), link); + DiskChecker.setPermissions(flink, DiskChecker.sevenZeroZero); } } } @@ -566,6 +582,7 @@ File flink = new File(link); if (!flink.exists()) { FileUtil.symLink(localFiles[i].toString(), link); + DiskChecker.setPermissions(flink, DiskChecker.sevenZeroZero); } } } @@ -598,6 +615,8 @@ if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){ throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } + DiskChecker.setPermissions(new File(tmpDir.toUri().getPath()), + DiskChecker.sevenZeroZero); } } Index: src/java/org/apache/hadoop/mapred/TaskTracker.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskTracker.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/TaskTracker.java (working copy) @@ -385,11 +385,29 @@ return getLocalTaskDir(jobid, taskid, false) ; } - static String getIntermediateOutputDir(String jobid, String taskid) { - return getLocalTaskDir(jobid, taskid) - + Path.SEPARATOR + TaskTracker.OUTPUT ; + /** + * Intermediate output directory for a given task as seen by the child. + * + * @param jobid + * @param taskid + * @return + */ + static String getIntermediateOutputDirForChild(String jobid, String taskid) { + return TaskTracker.OUTPUT; } + /** + * Intermediate output directory for a given task as seen by the TaskTracker. + * + * @param jobid + * @param taskid + * @return + */ + static String getIntermediateOutputDirForTT(String jobid, String taskid) { + return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR + + TaskTracker.OUTPUT; + } + static String getLocalTaskDir(String jobid, String taskid, boolean isCleanupAttempt) { @@ -750,7 +768,7 @@ } catch(FileNotFoundException fe) { jobFileSize = -1; } - Path localJobFile = lDirAlloc.getLocalPathForWrite( + Path localJobFile = lDirAlloc.getSecureLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "job.xml", jobFileSize, fConf); @@ -759,30 +777,25 @@ if (!rjob.localized) { FileSystem localFs = FileSystem.getLocal(fConf); - // this will happen on a partial execution of localizeJob. - // Sometimes the job.xml gets copied but copying job.jar - // might throw out an exception - // we should clean up and then try again - Path jobDir = localJobFile.getParent(); - if (localFs.exists(jobDir)){ - localFs.delete(jobDir, true); - boolean b = localFs.mkdirs(jobDir); - if (!b) - throw new IOException("Not able to create job directory " - + jobDir.toString()); - } + initializeJobDirs(jobId, localFs); systemFS.copyToLocalFile(jobFile, localJobFile); + DiskChecker.setPermissions(new File(localJobFile.toUri().getPath()), + DiskChecker.sevenZeroZero); + + // TODO: Fix this. TT config is not loaded at all! JobConf localJobConf = new JobConf(localJobFile); // create the 'work' directory // job-specific shared directory for use as scratch space - Path workDir = lDirAlloc.getLocalPathForWrite( + Path workDir = lDirAlloc.getSecureLocalPathForWrite( (getLocalJobDir(jobId.toString()) + Path.SEPARATOR + MRConstants.WORKDIR), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); } + DiskChecker.setPermissions(new File(workDir.toUri().getPath()), + DiskChecker.sevenZeroZero); System.setProperty("job.local.dir", workDir.toString()); localJobConf.set("job.local.dir", workDir.toString()); @@ -799,35 +812,67 @@ } // Here we check for and we check five times the size of jarFileSize // to accommodate for unjarring the jar file in work directory - localJarFile = new Path(lDirAlloc.getLocalPathForWrite( - getLocalJobDir(jobId.toString()) - + Path.SEPARATOR + "jars", - 5 * jarFileSize, fConf), "job.jar"); - if (!localFs.mkdirs(localJarFile.getParent())) { - throw new IOException("Mkdirs failed to create jars directory "); - } + localJarFile = + lDirAlloc.getSecureLocalPathForWrite(getLocalJobDir(jobId + .toString()) + + Path.SEPARATOR + "jars" + Path.SEPARATOR + "job.jar", + 5 * jarFileSize, fConf); systemFS.copyToLocalFile(jarFilePath, localJarFile); + DiskChecker.setPermissions(new File(localJarFile.toUri().getPath()), + DiskChecker.sevenZeroZero); localJobConf.setJar(localJarFile.toString()); OutputStream out = localFs.create(localJobFile); + DiskChecker.setPermissions(new File(localJobFile.toUri().getPath()), + DiskChecker.sevenZeroZero); try { localJobConf.writeXml(out); } finally { out.close(); } // also unjar the job.jar files - RunJar.unJar(new File(localJarFile.toString()), - new File(localJarFile.getParent().toString())); + RunJar.unJar(new File(localJarFile.toString()), new File( + localJarFile.getParent().toString()), DiskChecker.sevenZeroZero); } rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || localJobConf.getKeepFailedTaskFiles()); rjob.localized = true; rjob.jobConf = localJobConf; - taskController.initializeJob(jobId); } } launchTaskForJob(tip, new JobConf(rjob.jobConf)); } + /** + * Prepare the job directories for a given job. To be called only if the job + * is not already localized. + */ + private void initializeJobDirs(JobID jobId, FileSystem fs) + throws IOException { + boolean initStatus = true; + String jobDirPath = getLocalJobDir(jobId.toString()); + for (String localDir : fConf.getStrings("mapred.local.dir")) { + Path jobDir = new Path(localDir, jobDirPath); + if (fs.exists(jobDir)) { + // this will happen on a partial execution of localizeJob. + // Sometimes the job.xml gets copied but copying job.jar + // might throw out an exception + // we should clean up and then try again + fs.delete(jobDir, true); + } + boolean dirCreationStatus = fs.mkdirs(jobDir); + if (!dirCreationStatus) { + LOG.warn("Not able to create job directory " + jobDir.toString()); + } + initStatus = initStatus && dirCreationStatus; + DiskChecker.setPermissions(new File(jobDir.toUri().getPath()), + DiskChecker.sevenFiveFive); + } + if (!initStatus) { + throw new IOException("Not able to create job directory for " + + jobId.toString()); + } + } + private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ synchronized (tip) { tip.setJobConf(jobConf); @@ -1354,6 +1399,16 @@ indexCache.removeMap(tip.getTask().getTaskID().toString()); } } + // Finalize the job directories + try { + taskController.finalizeJob(jobId, TaskTracker.getLocalJobDir(jobId + .toString()) + + Path.SEPARATOR + "work", rjob.jobConf.getUser()); + } catch (IOException ioe) { + LOG.warn("Finalize job failed with the exception : " + + StringUtils.stringifyException(ioe)); + } + // Delete the job directory for this // task if the job is done/failed if (!rjob.keepJobFiles){ @@ -1801,7 +1856,7 @@ private void localizeTask(Task task) throws IOException{ Path localTaskDir = - lDirAlloc.getLocalPathForWrite( + lDirAlloc.getSecureLocalPathForWrite( TaskTracker.getLocalTaskDir(task.getJobID().toString(), task.getTaskID().toString(), task.isTaskCleanupTask()), defaultJobConf ); @@ -1811,6 +1866,8 @@ throw new IOException("Mkdirs failed to create " + localTaskDir.toString()); } + DiskChecker.setPermissions(new File(localTaskDir.toUri().getPath()), + DiskChecker.sevenZeroZero); // create symlink for ../work if it already doesnt exist String workDir = lDirAlloc.getLocalPathToRead( @@ -1820,11 +1877,13 @@ String link = localTaskDir.getParent().toString() + Path.SEPARATOR + "work"; File flink = new File(link); - if (!flink.exists()) + if (!flink.exists()) { FileUtil.symLink(workDir, link); + DiskChecker.setPermissions(flink, DiskChecker.sevenZeroZero); + } // create the working-directory of the task - Path cwd = lDirAlloc.getLocalPathForWrite( + Path cwd = lDirAlloc.getSecureLocalPathForWrite( getLocalTaskDir(task.getJobID().toString(), task.getTaskID().toString(), task.isTaskCleanupTask()) + Path.SEPARATOR + MRConstants.WORKDIR, @@ -1833,11 +1892,16 @@ throw new IOException("Mkdirs failed to create " + cwd.toString()); } + DiskChecker.setPermissions(new File(cwd.toUri().getPath()), + DiskChecker.sevenZeroZero); Path localTaskFile = new Path(localTaskDir, "job.xml"); task.setJobFile(localTaskFile.toString()); - localJobConf.set("mapred.local.dir", - fConf.get("mapred.local.dir")); + + // Expand mapred.local.dir components to be full paths + LOG.info(" From fComf " + fConf.get("mapred.local.dir")); + localJobConf.set("mapred.local.dir", fConf.get("mapred.local.dir")); + if (fConf.get("slave.host.name") != null) { localJobConf.set("slave.host.name", fConf.get("slave.host.name")); @@ -1879,6 +1943,9 @@ localJobConf.setNumTasksToExecutePerJvm(1); } OutputStream out = localFs.create(localTaskFile); + DiskChecker.setPermissions(new File(localTaskFile.toUri().getPath()), + DiskChecker.sevenZeroZero); + try { localJobConf.writeXml(out); } finally { @@ -2022,7 +2089,15 @@ this.taskStatus.setRunState(TaskStatus.State.KILLED); } } else { - this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + // Task succeeded. Try finalizing the task directories. + try { + jvmManager.finalizeTaskDirs(runner); + this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); + } catch (IOException e) { + LOG.warn("Finalizing task-directories failed with the exception : " + + StringUtils.stringifyException(e)); + this.taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN); + } } this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); @@ -2878,12 +2953,12 @@ // Index file Path indexFileName = lDirAlloc.getLocalPathToRead( - TaskTracker.getIntermediateOutputDir(jobId, mapId) + TaskTracker.getIntermediateOutputDirForTT(jobId, mapId) + "/file.out.index", conf); // Map-output file Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - TaskTracker.getIntermediateOutputDir(jobId, mapId) + TaskTracker.getIntermediateOutputDirForTT(jobId, mapId) + "/file.out", conf); /** Index: src/java/org/apache/hadoop/mapred/ReduceTask.java =================================================================== --- src/java/org/apache/hadoop/mapred/ReduceTask.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/ReduceTask.java (working copy) @@ -1276,7 +1276,7 @@ // else, we will check the localFS to find a suitable final location // for this path TaskAttemptID reduceId = reduceTask.getTaskID(); - Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir( + Path filename = new Path("/" + TaskTracker.getIntermediateOutputDirForChild( reduceId.getJobID().toString(), reduceId.toString()) + "/map_" + @@ -1331,6 +1331,8 @@ throw new IOException("Failed to rename map output " + tmpMapOutput + " to " + filename); } + // Secure permissions from the tmpMapOutput still remain. No need + // for doing anything more. synchronized (mapOutputFilesOnDisk) { addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename)); @@ -1595,7 +1597,7 @@ throws IOException { // Find out a suitable location for the output on local-filesystem Path localFilename = - lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), + lDirAlloc.getSecureLocalPathForWrite(filename.toUri().getPath(), mapOutputLength, conf); MapOutput mapOutput = @@ -2482,7 +2484,7 @@ // 2. Start the on-disk merge process Path outputPath = - lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), + lDirAlloc.getSecureLocalPathForWrite(mapFiles.get(0).toString(), approxOutputSize, conf) .suffix(".merged"); Writer writer = Index: src/java/org/apache/hadoop/mapred/MapTask.java =================================================================== --- src/java/org/apache/hadoop/mapred/MapTask.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/MapTask.java (working copy) @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Constructor; @@ -54,6 +55,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; @@ -108,6 +110,8 @@ "split.dta"); LOG.debug("Writing local split to " + localSplit); DataOutputStream out = FileSystem.getLocal(conf).create(localSplit); + DiskChecker.setPermissions(new File(localSplit.toUri().getPath()), + DiskChecker.sevenZeroZero); Text.writeString(out, splitClass); split.write(out); out.close(); @@ -1164,6 +1168,8 @@ final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); + DiskChecker.setPermissions(new File(filename.toUri().getPath()), + DiskChecker.sevenZeroZero); final int endPosition = (kvend > kvstart) ? kvend @@ -1258,6 +1264,8 @@ final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); + DiskChecker.setPermissions(new File(filename.toUri().getPath()), + DiskChecker.sevenZeroZero); // we don't run the combiner for a single record IndexRecord rec = new IndexRecord(); @@ -1417,6 +1425,8 @@ //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); + DiskChecker.setPermissions(new File(finalOutputFile.toUri().getPath()), + DiskChecker.sevenZeroZero); if (numSpills == 0) { //create dummy files Index: src/java/org/apache/hadoop/mapred/MapOutputFile.java =================================================================== --- src/java/org/apache/hadoop/mapred/MapOutputFile.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/MapOutputFile.java (working copy) @@ -31,7 +31,9 @@ private JobConf conf; private JobID jobId; - + + static final String INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; + MapOutputFile() { } @@ -47,7 +49,7 @@ */ public Path getOutputFile(TaskAttemptID mapTaskId) throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( + return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDirForChild( jobId.toString(), mapTaskId.toString()) + "/file.out", conf); } @@ -58,9 +60,9 @@ */ public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out", size, conf); + return lDirAlloc.getSecureLocalPathForWrite(TaskTracker + .getIntermediateOutputDirForChild(jobId.toString(), mapTaskId.toString()) + + "/file.out", size, conf); } /** Return the path to a local map output index file created earlier @@ -68,7 +70,7 @@ */ public Path getOutputIndexFile(TaskAttemptID mapTaskId) throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( + return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDirForChild( jobId.toString(), mapTaskId.toString()) + "/file.out.index", conf); } @@ -79,10 +81,9 @@ */ public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out.index", - size, conf); + return lDirAlloc.getSecureLocalPathForWrite(TaskTracker + .getIntermediateOutputDirForChild(jobId.toString(), mapTaskId.toString()) + + "/file.out.index", size, conf); } /** Return a local map spill file created earlier. @@ -91,7 +92,7 @@ */ public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( + return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDirForChild( jobId.toString(), mapTaskId.toString()) + "/spill" + spillNumber + ".out", conf); @@ -103,11 +104,11 @@ * @param size the size of the file */ public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + - spillNumber + ".out", size, conf); + long size) + throws IOException { + return lDirAlloc.getSecureLocalPathForWrite(TaskTracker + .getIntermediateOutputDirForChild(jobId.toString(), mapTaskId.toString()) + + "/spill" + spillNumber + ".out", size, conf); } /** Return a local map spill index file created earlier @@ -116,7 +117,7 @@ */ public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber) throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( + return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDirForChild( jobId.toString(), mapTaskId.toString()) + "/spill" + spillNumber + ".out.index", conf); @@ -128,11 +129,11 @@ * @param size the size of the file */ public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + spillNumber + - ".out.index", size, conf); + long size) + throws IOException { + return lDirAlloc.getSecureLocalPathForWrite(TaskTracker + .getIntermediateOutputDirForChild(jobId.toString(), mapTaskId.toString()) + + "/spill" + spillNumber + ".out.index", size, conf); } /** Return a local reduce input file created earlier @@ -141,11 +142,11 @@ */ public Path getInputFile(int mapId, TaskAttemptID reduceTaskId) throws IOException { - // TODO *oom* should use a format here - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), reduceTaskId.toString()) - + "/map_" + mapId + ".out", - conf); + String outputDir = + TaskTracker.getIntermediateOutputDirForChild(jobId.toString(), + reduceTaskId.toString()); + return lDirAlloc.getLocalPathToRead(String.format( + INPUT_FILE_FORMAT_STRING, outputDir, Integer.valueOf(mapId)), conf); } /** Create a local reduce input file name. @@ -156,16 +157,16 @@ public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, long size) throws IOException { - // TODO *oom* should use a format here - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), reduceTaskId.toString()) - + "/map_" + mapId.getId() + ".out", - size, conf); - } + String outputDir = + TaskTracker.getIntermediateOutputDirForChild(jobId.toString(), + reduceTaskId.toString()); + return lDirAlloc.getSecureLocalPathForWrite(String.format( + INPUT_FILE_FORMAT_STRING, outputDir, mapId.getId()), size, conf); +} /** Removes all of the files related to a task. */ public void removeAll(TaskAttemptID taskId) throws IOException { - conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir( + conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDirForChild( jobId.toString(), taskId.toString()) ); } Index: src/java/org/apache/hadoop/mapred/TaskLog.java =================================================================== --- src/java/org/apache/hadoop/mapred/TaskLog.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/TaskLog.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; import org.apache.log4j.Appender; @@ -55,8 +56,7 @@ LogFactory.getLog(TaskLog.class); private static final File LOG_DIR = - new File(System.getProperty("hadoop.log.dir"), - "userlogs").getAbsoluteFile(); + new File(getBaseLogDir(), "userlogs").getAbsoluteFile(); static LocalFileSystem localFS = null; static { @@ -156,8 +156,12 @@ return new File(getBaseDir(taskid), "log.index"); } } - - private static File getBaseDir(String taskid) { + + static String getBaseLogDir() { + return System.getProperty("hadoop.log.dir"); + } + + static File getBaseDir(String taskid) { return new File(LOG_DIR, taskid); } private static long prevOutLength; @@ -196,6 +200,10 @@ Path indexFilePath = new Path(indexFile.getAbsolutePath()); Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath()); localFS.rename (tmpIndexFilePath, indexFilePath); + + // TODO: LOG_SECCURITY: The following is not need after security is fixed + // for logs + localFS.setPermission(indexFilePath, new FsPermission((short)00544)); } private static void resetPrevLengths(TaskAttemptID firstTaskid) { prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length(); Index: src/java/org/apache/hadoop/mapred/LinuxTaskController.java =================================================================== --- src/java/org/apache/hadoop/mapred/LinuxTaskController.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/LinuxTaskController.java (working copy) @@ -28,9 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -73,44 +72,19 @@ new File(hadoopBin, "task-controller").getAbsolutePath(); } - // The list of directory paths specified in the - // variable mapred.local.dir. This is used to determine - // which among the list of directories is picked up - // for storing data for a particular task. - private String[] mapredLocalDirs; - - // permissions to set on files and directories created. - // When localized files are handled securely, this string - // will change to something more restrictive. Until then, - // it opens up the permissions for all, so that the tasktracker - // and job owners can access files together. - private static final String FILE_PERMISSIONS = "ugo+rwx"; - - // permissions to set on components of the path leading to - // localized files and directories. Read and execute permissions - // are required for different users to be able to access the - // files. - private static final String PATH_PERMISSIONS = "go+rx"; - public LinuxTaskController() { super(); } - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - mapredLocalDirs = conf.getStrings("mapred.local.dir"); - //Setting of the permissions of the local directory is done in - //setup() - } - /** * List of commands that the setuid script will execute. */ enum TaskCommands { LAUNCH_TASK_JVM, TERMINATE_TASK_JVM, - KILL_TASK_JVM + KILL_TASK_JVM, + FINALIZE_TASK_DIRS, + FINALIZE_JOB, } /** @@ -155,17 +129,27 @@ try { shExec.execute(); } catch (Exception e) { - LOG.warn("Exception thrown while launching task JVM : " + - StringUtils.stringifyException(e)); + // TODO: check for 143 (SIGTERM) and 137 (SIGKILL) exit codes + LOG.warn("Exception thrown while launching task JVM : " + + StringUtils.stringifyException(e)); LOG.warn("Exit code from task is : " + shExec.getExitCode()); - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); + logOutput(shExec.getOutput()); throw new IOException(e); } - if(LOG.isDebugEnabled()) { - LOG.debug("output after executing task jvm = " + shExec.getOutput()); + if (LOG.isDebugEnabled()) { + logOutput(shExec.getOutput()); } } + private void logOutput(String output) { + String shExecOutput = output; + if (shExecOutput != null) { + LOG.info("output after executing task jvm follows:"); + for (String str : shExecOutput.split("\n")) { + LOG.info(str); + } + } + } /** * Returns list of arguments to be passed while launching task VM. * See {@code buildTaskControllerExecutor(TaskCommands, @@ -177,8 +161,11 @@ List commandArgs = new ArrayList(3); String taskId = context.task.getTaskID().toString(); String jobId = getJobId(context); - LOG.debug("getting the task directory as: " + LOG.info("getting the task directory as: " + getTaskCacheDirectory(context)); + LOG.info("getting the tt_root as " +getDirectoryChosenForTask( + new File(getTaskCacheDirectory(context)), + context) ); commandArgs.add(getDirectoryChosenForTask( new File(getTaskCacheDirectory(context)), context)); @@ -188,6 +175,13 @@ }else { commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); } + File logDir = new File(TaskLog.getBaseLogDir()); + try { + commandArgs.add(logDir.getCanonicalPath()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return commandArgs; } @@ -219,68 +213,7 @@ throw new IllegalArgumentException("invalid task cache directory " + directory.getAbsolutePath()); } - - /** - * Setup appropriate permissions for directories and files that - * are used by the task. - * - * As the LinuxTaskController launches tasks as a user, different - * from the daemon, all directories and files that are potentially - * used by the tasks are setup with appropriate permissions that - * will allow access. - * - * Until secure data handling is implemented (see HADOOP-4491 and - * HADOOP-4493, for e.g.), the permissions are set up to allow - * read, write and execute access for everyone. This will be - * changed to restricted access as data is handled securely. - */ - void initializeTask(TaskControllerContext context) { - // Setup permissions for the job and task cache directories. - setupTaskCacheFileAccess(context); - // setup permissions for task log directory - setupTaskLogFileAccess(context); - } - - // Allows access for the task to create log files under - // the task log directory - private void setupTaskLogFileAccess(TaskControllerContext context) { - TaskAttemptID taskId = context.task.getTaskID(); - File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG); - String taskAttemptLogDir = f.getParentFile().getAbsolutePath(); - changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false); - } - // Allows access for the task to read, write and execute - // the files under the job and task cache directories - private void setupTaskCacheFileAccess(TaskControllerContext context) { - String taskId = context.task.getTaskID().toString(); - JobID jobId = JobID.forName(getJobId(context)); - //Change permission for the task across all the disks - for(String localDir : mapredLocalDirs) { - File f = new File(localDir); - File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir( - jobId.toString(), taskId, context.task.isTaskCleanupTask())); - if(taskCacheDir.exists()) { - changeDirectoryPermissions(taskCacheDir.getPath(), - FILE_PERMISSIONS, true); - } - }//end of local directory Iteration - } - - // convenience method to execute chmod. - private void changeDirectoryPermissions(String dir, String mode, - boolean isRecursive) { - int ret = 0; - try { - ret = FileUtil.chmod(dir, mode, isRecursive); - } catch (Exception e) { - LOG.warn("Exception in changing permissions for directory " + dir + - ". Exception: " + e.getMessage()); - } - if (ret != 0) { - LOG.warn("Could not change permissions for directory " + dir); - } - } /** * Builds the command line for launching/terminating/killing task JVM. * Following is the format for launching/terminating/killing task JVM @@ -363,79 +296,15 @@ if (pw != null) { pw.close(); } - // set execute permissions for all on the file. File f = new File(commandFile); - if (f.exists()) { - f.setReadable(true, false); - f.setExecutable(true, false); + if (!f.exists()) { + throw new IOException("Taskjvm.sh doesn't exist!!!"); } + DiskChecker.setPermissions(f, DiskChecker.sevenZeroZero); } } - /** - * Sets up the permissions of the following directories: - * - * Job cache directory - * Archive directory - * Hadoop log directories - * - */ - @Override - void setup() { - //set up job cache directory and associated permissions - String localDirs[] = this.mapredLocalDirs; - for(String localDir : localDirs) { - //Cache root - File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir()); - File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir()); - if(!cacheDirectory.exists()) { - if(!cacheDirectory.mkdirs()) { - LOG.warn("Unable to create cache directory : " + - cacheDirectory.getPath()); - } - } - if(!jobCacheDirectory.exists()) { - if(!jobCacheDirectory.mkdirs()) { - LOG.warn("Unable to create job cache directory : " + - jobCacheDirectory.getPath()); - } - } - //Give world writable permission for every directory under - //mapred-local-dir. - //Child tries to write files under it when executing. - changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true); - }//end of local directory manipulations - //setting up perms for user logs - File taskLog = TaskLog.getUserLogDir(); - changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false); - } - - /* - * Create Job directories across disks and set their permissions to 777 - * This way when tasks are run we just need to setup permissions for - * task folder. - */ - @Override - void initializeJob(JobID jobid) { - for(String localDir : this.mapredLocalDirs) { - File jobDirectory = new File(localDir, - TaskTracker.getLocalJobDir(jobid.toString())); - if(!jobDirectory.exists()) { - if(!jobDirectory.mkdir()) { - LOG.warn("Unable to create job cache directory : " - + jobDirectory.getPath()); - continue; - } - } - //Should be recursive because the jar and work folders might be - //present under the job cache directory - changeDirectoryPermissions( - jobDirectory.getPath(), FILE_PERMISSIONS, true); - } - } - - /** * API which builds the command line to be pass to LinuxTaskController * binary to terminate/kill the task. See * {@code buildTaskControllerExecutor(TaskCommands, @@ -443,16 +312,58 @@ * * * @param context context of task which has to be passed kill signal. + * @throws IOException * */ private List buildKillTaskCommandArgs(TaskControllerContext - context){ + context) throws IOException{ List killTaskJVMArgs = new ArrayList(); killTaskJVMArgs.add(context.pid); return killTaskJVMArgs; } - + /** + * Build the command line to be passed to LinuxTaskController binary to + * finalize task directories once a task finishes. + * + * @param context + * @return + */ + private List buildBuildFinalizeTaskDirsCommandArgs( + TaskControllerContext context) { + List commandArgs = new ArrayList(3); + String taskId = context.task.getTaskID().toString(); + String jobId = getJobId(context); + commandArgs.add(jobId); + if (!context.task.isTaskCleanupTask()) { + commandArgs.add(taskId); + } else { + commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); + } + File logDir = new File(TaskLog.getBaseLogDir()); + try { + commandArgs.add(logDir.getCanonicalPath()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return commandArgs; + } + + /** + * Build the command line to be passed to LinuxTaskController binary to + * actions for finalizing job. + * + * @param context + * @return + */ + private List buildFinalizeJobCommandArgs(JobID jobId) { + List finalizeJobArgs = new ArrayList(); + finalizeJobArgs.add(jobId.toString()); + return finalizeJobArgs; + } + + /** * Convenience method used to sending appropriate Kill signal to the task * VM * @param context @@ -465,13 +376,18 @@ LOG.info("Context task null not killing the JVM"); return; } + if (context.pid == null) { + LOG.warn("pid is null. Cannot kill task"); + return; + } ShellCommandExecutor shExec = buildTaskControllerExecutor( command, context.env.conf.getUser(), buildKillTaskCommandArgs(context), context.env); try { shExec.execute(); } catch (Exception e) { - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); + LOG.warn("Exit code from finishtask is : " + shExec.getExitCode()); + logOutput(shExec.getOutput()); throw new IOException(e); } } @@ -498,6 +414,46 @@ protected String getTaskControllerExecutablePath() { return taskControllerExe; - } + } + + @Override + void finalizeTaskDirs(TaskControllerContext context) + throws IOException { + LOG.info("Going to finalize task directories for " + + context.task.getTaskID().toString()); + ShellCommandExecutor shExec = + buildTaskControllerExecutor(TaskCommands.FINALIZE_TASK_DIRS, + context.env.conf.getUser(), + buildBuildFinalizeTaskDirsCommandArgs(context), context.env); + try { + shExec.execute(); + } catch (Exception e) { + LOG.warn("Exit code from finalizeTaskDirs is : " + shExec.getExitCode()); + LOG.warn("Exception thrown by finalizeTaskDirs : " + + StringUtils.stringifyException(e)); + logOutput(shExec.getOutput()); + throw new IOException(e); + } + } + + @Override + void finalizeJob(JobID jobId, String workDir, String user) + throws IOException { + LOG.info("Going to finalize job for " + jobId); + JvmEnv env = + new JvmEnv(null, null, null, null, -1, new File(workDir), null, null); + ShellCommandExecutor shExec = + buildTaskControllerExecutor(TaskCommands.FINALIZE_JOB, user, + buildFinalizeJobCommandArgs(jobId), env); + try { + shExec.execute(); + } catch (Exception e) { + LOG.warn("Exit code from finalizeJob is : " + shExec.getExitCode()); + LOG.warn("Exception thrown by finalizeJob : " + + StringUtils.stringifyException(e)); + logOutput(shExec.getOutput()); + throw new IOException(e); + } + } } Index: src/java/org/apache/hadoop/mapred/BackupStore.java =================================================================== --- src/java/org/apache/hadoop/mapred/BackupStore.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/BackupStore.java (working copy) @@ -549,7 +549,7 @@ private Writer createSpillFile() throws IOException { Path tmp = new Path( - TaskTracker.getIntermediateOutputDir( + TaskTracker.getIntermediateOutputDirForChild( tid.getJobID().toString(), tid.toString()) + "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); Index: src/java/org/apache/hadoop/mapred/DefaultTaskController.java =================================================================== --- src/java/org/apache/hadoop/mapred/DefaultTaskController.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/DefaultTaskController.java (working copy) @@ -46,6 +46,7 @@ * This method launches the new JVM for the task by executing the * the JVM command using the {@link Shell.ShellCommandExecutor} */ + @Override void launchTaskJVM(TaskController.TaskControllerContext context) throws IOException { JvmEnv env = context.env; @@ -59,36 +60,8 @@ context.shExec = shexec; shexec.execute(); } - - /** - * Initialize the task environment. - * - * Since tasks are launched as the tasktracker user itself, this - * method has no action to perform. - */ - void initializeTask(TaskController.TaskControllerContext context) { - // The default task controller does not need to set up - // any permissions for proper execution. - // So this is a dummy method. - return; - } - @Override - void setup() { - // nothing to setup - return; - } - - /* - * No need to do anything as we don't need to do as we dont need anything - * extra from what TaskTracker has done. - */ - @Override - void initializeJob(JobID jobId) { - } - - @Override void terminateTask(TaskControllerContext context) { ShellCommandExecutor shexec = context.shExec; if (shexec != null) { @@ -132,5 +105,17 @@ } } } - + + @Override + void finalizeTaskDirs(TaskControllerContext context) + throws IOException { + // Do nothing + } + + @Override + void finalizeJob(JobID jobId, String workDir, String user) + throws IOException { + // Do nothing + + } } Index: src/java/org/apache/hadoop/mapred/Merger.java =================================================================== --- src/java/org/apache/hadoop/mapred/Merger.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/Merger.java (working copy) @@ -588,7 +588,7 @@ Path tmpFilename = new Path(tmpDir, "intermediate").suffix("." + passNo); - Path outputFile = lDirAlloc.getLocalPathForWrite( + Path outputFile = lDirAlloc.getSecureLocalPathForWrite( tmpFilename.toString(), approxOutputSize, conf); Index: src/java/org/apache/hadoop/mapred/SpillRecord.java =================================================================== --- src/java/org/apache/hadoop/mapred/SpillRecord.java (revision 787560) +++ src/java/org/apache/hadoop/mapred/SpillRecord.java (working copy) @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; @@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DiskChecker; import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; @@ -118,6 +120,8 @@ final FileSystem rfs = FileSystem.getLocal(job).getRaw(); CheckedOutputStream chk = null; final FSDataOutputStream out = rfs.create(loc); + DiskChecker.setPermissions(new File(loc.toUri().getPath()), + DiskChecker.sevenZeroZero); try { if (crc != null) { crc.reset(); Index: src/c++/task-controller/Makefile.in =================================================================== --- src/c++/task-controller/Makefile.in (revision 787560) +++ src/c++/task-controller/Makefile.in (working copy) @@ -16,9 +16,11 @@ # limitations under the License. # OBJS=main.o task-controller.o configuration.o +TESTOBJS=test-task-controller.o task-controller.o configuration.o CC=@CC@ CFLAGS = @CFLAGS@ BINARY=task-controller +TESTBINARY=test-task-controller installdir = @prefix@ @@ -34,9 +36,15 @@ configuration.o: configuration.h configuration.c $(CC) $(CFLAG) -o configuration.o -c configuration.c +test-task-controller.o: task-controller.c task-controller.h + $(CC) $(CFLAG) -o test-task-controller.o -c test-task-controller.c +test: $(TESTOBJS) + $(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS) + cp $(TESTBINARY) $(installdir) + clean: - rm -rf $(BINARY) $(OBJS) + rm -rf $(BINARY) $(OBJS) $(TESTOBJS) install: all cp $(BINARY) $(installdir) Index: src/c++/task-controller/configuration.h.in =================================================================== --- src/c++/task-controller/configuration.h.in (revision 787560) +++ src/c++/task-controller/configuration.h.in (working copy) @@ -53,10 +53,10 @@ extern char *hadoop_conf_dir; #endif //method exposed to get the configurations -const char * get_value(char* key); +const char * get_value(const char* key); //method to free allocated configuration void free_configurations(); //function to return array of values pointing to the key. Values are //comma seperated strings. -const char ** get_values(char* key); +const char ** get_values(const char* key); Index: src/c++/task-controller/configure.ac =================================================================== --- src/c++/task-controller/configure.ac (revision 787560) +++ src/c++/task-controller/configure.ac (working copy) @@ -38,7 +38,7 @@ # Checks for header files. AC_HEADER_STDC -AC_CHECK_HEADERS([stdlib.h string.h unistd.h]) +AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h]) #check for HADOOP_CONF_DIR @@ -50,12 +50,14 @@ # Checks for typedefs, structures, and compiler characteristics. AC_C_CONST AC_TYPE_PID_T +AC_TYPE_MODE_T +AC_TYPE_SIZE_T # Checks for library functions. AC_FUNC_MALLOC AC_FUNC_REALLOC -AC_CHECK_FUNCS([strerror]) +AC_FUNC_CHOWN +AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup]) AC_CONFIG_FILES([Makefile]) AC_OUTPUT - Index: src/c++/task-controller/task-controller.c =================================================================== --- src/c++/task-controller/task-controller.c (revision 787560) +++ src/c++/task-controller/task-controller.c (working copy) @@ -71,104 +71,564 @@ return 0; } -// function to check if the passed tt_root is present in hadoop.tmp.dir -int check_tt_root(const char *tt_root) { - char ** mapred_local_dir; - int found = -1; +/** + * Checks the passed value for the variable config_key against the values in + * the configuration. + * Returns 0 if the passed value is found in the configuration, + * -1 otherwise + */ +int check_variable_against_config(const char *config_key, + const char *passed_value) { - if (tt_root == NULL) { + if (config_key == NULL || passed_value == NULL) { return -1; } - mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY); + int found = -1; - if (mapred_local_dir == NULL) { + const char **config_value = get_values(config_key); + + if (config_value == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", config_key); return -1; } - while(*mapred_local_dir != NULL) { - if(strcmp(*mapred_local_dir,tt_root) == 0) { + char **config_val_ptr = (char **) config_value; + while (*config_val_ptr != NULL) { + if (strcmp(*config_val_ptr, passed_value) == 0) { found = 0; break; } + config_val_ptr++; } - free(mapred_local_dir); + + if (found != 0) { + fprintf( + LOGFILE, + "Invalid value passed: \ + Configured value of %s is %s. \ + Passed value is %s.\n", + config_key, get_value(config_key), passed_value); + } + free(config_value); return found; } /** + * Utility function to concatenate argB to argA using the concat_pattern + */ +char *concatenate(const char *argA, const char *argB, char *concat_pattern, + char *return_path_name) { + if (argA == NULL || argB == NULL) { + fprintf(LOGFILE, "One of the arguments passed for %s in null.\n", + return_path_name); + return NULL; + } + + char *return_path = NULL; + int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB); + + return_path = (char *) malloc(sizeof(char) * (str_len + 1)); + if (return_path == NULL) { + fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name); + return NULL; + } + memset(return_path, '\0', str_len + 1); + snprintf(return_path, str_len, concat_pattern, argA, argB); + return return_path; +} + +/** + * Get the job-directory path from tt_root and job-id + */ +char *get_job_directory(const char * tt_root, const char *jobid) { + return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path"); +} + +/** + * Get the job-jars directory from the job_dir + */ +char *get_job_jars_directory(const char *job_dir) { + return concatenate(job_dir, "", JOB_DIR_TO_JARS_PATTERN, + "job_jars_dir_path"); +} + +/** + * Get the attempt directory for the given attempt_id + */ +char *get_attempt_directory(const char *job_dir, const char *attempt_id) { + return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN, + "attempt_dir_path"); +} + +/* + * Get the path to the task launcher file which is created by the TT + */ +char *get_task_launcher_file(const char *attempt_dir) { + return concatenate(attempt_dir, "", ATTEMPT_DIR_TO_TASK_SCRIPT_PATTERN, + "task_script_path"); +} + +/** + * Get the log directory for the given attempt. + */ +char *get_task_log_dir(const char *log_dir, const char *attempt_id) { + return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN, + "task_log_dir"); +} + +/** + * Function to check if the passed tt_root is present in mapred.local.dir + * the task-controller is configured with. + */ +int check_tt_root(const char *tt_root) { + return check_variable_against_config(TT_SYS_DIR_KEY, tt_root); +} + +/** + * Function to check if the passed log_dir is present in hadoop.log.dir + * the task-controller is configured with. + */ +int check_task_logs_dir(const char *log_dir) { + return check_variable_against_config(TT_LOG_DIR_KEY, log_dir); +} + +/** * Function to check if the constructed path and absolute * path resolve to one and same. */ - int check_path(char *path) { + fprintf(LOGFILE, "Passed path : %s\n", path); char * resolved_path = (char *) canonicalize_file_name(path); - if(resolved_path == NULL) { + if (resolved_path == NULL) { + switch (errno) { + case ENAMETOOLONG: + fprintf(LOGFILE, "The resulting path is too long.\n"); + break; + case EACCES: + fprintf(LOGFILE, "The path is not readable.\n"); + break; + case ENOENT: + fprintf(LOGFILE, "The input file name is empty or %s", + "the path components does not exist.\n"); + break; + case ELOOP: + fprintf(LOGFILE, "More than `MAXSYMLINKS' many symlinks have been followed.\n"); + break; + } return ERROR_RESOLVING_FILE_PATH; } - if(strcmp(resolved_path, path) !=0) { + fprintf(LOGFILE, "Resolved path : %s\n", resolved_path); + if (strcmp(resolved_path, path) != 0) { free(resolved_path); return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH; } free(resolved_path); return 0; } + /** - * Function to check if a user actually owns the file. + * Function to change the owner/group of a given path. */ -int check_owner(uid_t uid, char *path) { - struct stat filestat; - if(stat(path, &filestat)!=0) { - return UNABLE_TO_STAT_FILE; +int change_owner(const char *path, uid_t uid, gid_t gid) { + int exit_code = chown(path, uid, gid); + if (exit_code != 0) { + switch (errno) { + case (EPERM): + fprintf(LOGFILE, + "Process lacks permission to make the requested change.\n"); + break; + case (EROFS): + fprintf(LOGFILE, "File is on a read-only file system.\n"); + break; + case (EACCES): + fprintf( + LOGFILE, + "Process does not have search permission for a directory \ + component of the file name.\n"); + break; + case (ENAMETOOLONG): + fprintf(LOGFILE, "The file name is too long.\n"); + break; + case (ENOENT): + fprintf(LOGFILE, + "A directory component in the file name doesn't exist.\n"); + break; + case (ENOTDIR): + fprintf( + LOGFILE, + "A directory component in the file name exists, \ + but it isn't a directory.\n"); + break; + case (ELOOP): + fprintf( + LOGFILE, + "Too many symbolic links were resolved \ + while trying to look up the file name.\n"); + break; + default: + fprintf(LOGFILE, "chown failed with error number : %d.\n", errno); + } } - //check owner. - if(uid != filestat.st_uid){ - return FILE_NOT_OWNED_BY_TASKTRACKER; + return exit_code; +} + +/** + * Function to change the mode of a given path. + */ +int change_mode(const char *path, mode_t mode) { + int exit_code = chmod(path, mode); + if (exit_code != 0) { + switch (errno) { + case (ENOENT): + fprintf(LOGFILE, "The named file doesn't exist.\n"); + break; + case (EPERM): + fprintf( + LOGFILE, + "This process does not have permission to change the access\ + permissions of this file.\n"); + break; + case (EROFS): + fprintf(LOGFILE, "The file resides on a read-only file system.\n"); + break; + case (EACCES): + fprintf( + LOGFILE, + "Process does not have search permission for a directory \ + component of the file name.\n"); + break; + case (ENAMETOOLONG): + fprintf(LOGFILE, "The file name is too long.\n"); + break; + case (ENOTDIR): + fprintf( + LOGFILE, + "A directory component in the file name exists, \ + but it isn't a directory.\n"); + break; + case (ELOOP): + fprintf( + LOGFILE, + "Too many symbolic links were resolved \ + while trying to look up the file name.\n"); + break; + default: + fprintf(LOGFILE, "chown failed with error number : %d.\n", errno); + } } + return exit_code; +} + +/** + * Function to prepare the attempt directories for the task JVM. + * This is done by changing the ownership of the attempt directory recursively + * to the job owner. + */ +int prepare_attempt_directories(const char *job_id, const char *attempt_id, + const char *user) { + if (job_id == NULL || attempt_id == NULL || user == NULL) { + fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n"); + return -1; + } + + if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user); + return -1; + } + + char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *attempt_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + // prepare attempt-dir in each of the mapred_local_dir + job_dir = get_job_directory(*local_dir_ptr, job_id); + attempt_dir = get_attempt_directory(job_dir, attempt_id); + if (opendir(attempt_dir) == NULL) { + fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n", + attempt_dir); + } else if (secure_path(attempt_dir, user_detail->pw_uid, + user_detail->pw_gid) != 0) { + fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir); + failed = 0; + } + local_dir_ptr++; + free(attempt_dir); + free(job_dir); + } + free(local_dir); + free(full_local_dir_str); + + cleanup(); + if (failed == 0) { + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; + } return 0; } -/* - * function to provide path to the task file which is created by the tt - * - *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern +/** + * Function to prepare the job jars directories for the task JVM. Task JVM + * creates symbolic links to the jars in the job directories and hence needs + * access permissions. So here, we change the ownership of the jars directory + * recursively to the job owner. */ -void get_task_file_path(const char * jobid, const char * taskid, - const char * tt_root, char **task_script_path) { - const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY); - *task_script_path = NULL; - int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen( - taskid)) + strlen(tt_root); +int prepare_jars_directories(const char *jobid, const char *user) { + if (jobid == NULL || user == NULL) { + fprintf(LOGFILE, "Either jars_dir is null or the user passed is null.\n"); + return -1; + } - if (mapred_local_dir == NULL) { - return; + if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s", user); + return -1; } - *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1)); - if (*task_script_path == NULL) { - fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n"); - free(mapred_local_dir); - return; + char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return FINALIZE_JOB_FAILED; } - memset(*task_script_path,'\0',str_len+1); - snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root, - jobid, taskid); -#ifdef DEBUG - fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path); - fflush(LOGFILE); -#endif - free(mapred_local_dir); + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *jars_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + // finalise jars in each of the mapred_local_dir + job_dir = get_job_directory(*local_dir_ptr, jobid); + jars_dir = get_job_jars_directory(job_dir); + if (opendir(jars_dir) == NULL) { + fprintf(LOGFILE, "jars_dir %s doesn't exist. Not doing anything.\n", + jars_dir); + } else if (secure_path(jars_dir, user_detail->pw_uid, user_detail->pw_gid) + != 0) { + fprintf(LOGFILE, "Failed to secure the jars_dir %s\n", jars_dir); + failed = 0; + } + local_dir_ptr++; + free(job_dir); + free(jars_dir); + } + free(local_dir); + free(full_local_dir_str); + cleanup(); + if (failed == 0) { + return FINALIZE_JOB_FAILED; + } + return 0; } -//end of private functions +/** + * Function to prepare the task logs for the child. It gives the ownership + * of the attempt's log-dir to the user. It gives readable permissions to + * everyone for the attempt's dir and it's contents. This is only till + * security is fixed for task-logs. + */ +int prepare_task_logs(const char *log_dir, const char *task_id) { + + char *task_log_dir = get_task_log_dir(log_dir, task_id); + if (opendir(task_log_dir) == NULL) { + fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n", + task_log_dir); + // See TaskRunner.java to see that an absent log-dir doesn't fail the task. + return 0; + } + + if (change_owner(task_log_dir, user_detail->pw_uid, user_detail->pw_gid) + != 0) { + fprintf(LOGFILE, "couldn't change the ownership of attempt log dir %s\n", + task_log_dir); + return -1; + } + + // Create and set permissions for all the log related files. + // See org.apache.hadoop.mapred.TaskLog for information about these files. + // TODO: LOG_SECCURITY: + // Once security for logs is fixed, the following will not be needed. + char *files[5] = { "stdout", "stderr", "syslog", "profile.out", "debugout" }; + char *all_files[6]; + int i; + for (i = 0; i < 5; i++) { + all_files[i] = files[i]; + } + if (strstr(task_id, ".cleanup") != NULL) { + // This is a cleanup attempt + all_files[5] = "log.index.cleanup"; + } else { + all_files[5] = "log.index"; + } + + char *file_path; + int exit_code = 0; + mode_t old_umask = umask(0033); + for (i = 0; i < 7; i++) { + file_path = concatenate(task_log_dir, all_files[i], "%s/%s", + ("%s_file_path", all_files[i])); + // Give user the read and write permissions + if (creat(file_path, S_IRUSR | S_IWUSR) == -1) { + fprintf(LOGFILE, "couldn't create the file %s in the log_dir %s\n", + file_path, task_log_dir); + exit_code = -1; + } else if (change_owner(file_path, user_detail->pw_uid, + user_detail->pw_gid) != 0) { + fprintf(LOGFILE, "couldn't change the ownership of the file %s\n", + file_path); + exit_code = -1; + } + } + umask(old_umask); + return exit_code; +} + +/** + * Function to secure the given path. It does the following recursively: + * 1) changes the owner/group of the paths to the passed owner/group + * 2) changes the file permission to be only readable to the owner. + */ +int secure_path(const char *path, uid_t uid, gid_t gid) { + FTS *tree = NULL; // the file hierarchy + FTSENT *entry = NULL; // a file in the hierarchy + char *paths[] = { (char *)path }; + int process_path = 1; + int error_code = 0; + + // Secure permissions + mode_t file_mode = S_IREAD | S_IWRITE | S_IEXEC; + + // Get physical locations and don't resolve the symlinks. + // Don't change directory while walking the directory. + int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR; + tree = fts_open(paths, ftsoptions, NULL); + while (((entry = fts_read(tree)) != NULL) && error_code == 0) { + switch (entry->fts_info) { + case FTS_D: + // A directory being visited in pre-order. + // We change ownership of directories in post-order. + // so ignore the pre-order visit. + process_path = 1; + break; + case FTS_DC: + // A directory that causes a cycle in the tree + // We don't expect cycles, ignore. + process_path = 1; + break; + case FTS_DNR: + // A directory which cannot be read + // Ignore and set error code. + process_path = 1; + error_code = -1; + break; + case FTS_DOT: + // "." or ".." + process_path = 1; + break; + case FTS_F: + // A regular file + process_path = 0; + break; + case FTS_DP: + // A directory being visited in post-order + process_path = 0; + break; + case FTS_SL: + // A symbolic link + process_path = 0; + break; + case FTS_SLNONE: + // A symbolic link with a nonexistent target + process_path = 0; + break; + case FTS_NS: + // A file for which no stat(2) information was available + // Ignore and set error code + process_path = 1; + error_code = -1; + break; + case FTS_DEFAULT: + // File that doesn't belong to any of the above type. Ignore. + process_path = 1; + break; + } + + if (error_code != 0) { + break; + } + if (process_path == 1) { + continue; + } + + if (change_owner(entry->fts_path, uid, gid) != 0) { + fprintf(LOGFILE, "couldn't change the ownership of %s\n", + entry->fts_path); + error_code = -3; + } else if (change_mode(entry->fts_path, file_mode) != 0) { + fprintf(LOGFILE, "couldn't change the permissions of %s\n", + entry->fts_path); + error_code = -3; + } + } + fts_close(tree); + return error_code; +} + +/** + * Function to finalise directories to be owned by the TaskTracker back again. + * This is used in the following cases: + * 1) For changing ownership of the attempt-directory when the + * attempt finishes and + * 2) For changing ownership of the jars directory when the job finishes. + */ +int finalize_directories(const char *dir_to_be_finalised) { + if (dir_to_be_finalised == NULL) { + fprintf(LOGFILE, "dir_to_be_finalised is null.\n"); + return -1; + } + + if (opendir(dir_to_be_finalised) == NULL) { + fprintf(LOGFILE, + "dir_to_be_finalised %s doesn't exist. Not doing anything.\n", + dir_to_be_finalised); + return 0; + } + + uid_t uid = getuid(); + gid_t gid = getgid(); + + fprintf(LOGFILE, "Securing the path %s recursively to TT.\n", + dir_to_be_finalised); + if (secure_path(dir_to_be_finalised, uid, gid) != 0) { + fprintf(LOGFILE, "Failed to secure the path %s to TT.\n", + dir_to_be_finalised); + return -1; + } + return 0; +} + void display_usage(FILE *stream) { fprintf(stream, "Usage: task-controller [-l logfile] user command command-args\n"); } //function used to populate and user_details structure. - int get_user_details(const char *user) { if (user_detail == NULL) { user_detail = getpwnam(user); @@ -181,30 +641,27 @@ } /* - *Function used to launch a task as the provided user. - * First the function checks if the tt_root passed is found in - * hadoop.temp.dir - * Uses get_task_file_path to fetch the task script file path. - * Does an execlp on the same in order to replace the current image with + * Function used to launch a task as the provided user. It does the following : + * 1) Checks if the tt_root passed is found in mapred.local.dir + * 2) Prepares attempt_dir, jars_dir and log_dir to be accessible by the child + * 3) Uses get_task_launcher_file to fetch the task script file path + * 4) Does an execlp on the same in order to replace the current image with * task image. */ - int run_task_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root) { - char *task_script_path = NULL; + const char *tt_root, const char *log_dir) { int exit_code = 0; uid_t uid = getuid(); - if(jobid == NULL || taskid == NULL) { + if (jobid == NULL || taskid == NULL || tt_root == NULL || log_dir == NULL) { return INVALID_ARGUMENT_NUMBER; } -#ifdef DEBUG - fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid); - fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid); - fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root); - fflush(LOGFILE); -#endif + fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid); + fprintf(LOGFILE, "Task-d passed to run_task_as_user : %s.\n", taskid); + fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root); + fprintf(LOGFILE, "log_dir passed to run_task_as_user : %s.\n", log_dir); + //Check tt_root before switching the user, as reading configuration //file requires privileged access. if (check_tt_root(tt_root) < 0) { @@ -213,41 +670,112 @@ return INVALID_TT_ROOT; } - //change the user - fclose(LOGFILE); - fcloseall(); - umask(0); - if (change_user(user) != 0) { - cleanup(); - return SETUID_OPER_FAILED; + char *job_dir = NULL, *attempt_dir = NULL, *jars_dir = NULL, + *task_script_path = NULL; + + if (prepare_attempt_directories(jobid, taskid, user) != 0) { + fprintf(LOGFILE, + "Couldn't prepare the attempt directories for %s of user %s.\n", + taskid, user); + exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED; + goto cleanup; } - get_task_file_path(jobid, taskid, tt_root, &task_script_path); + if (prepare_jars_directories(jobid, user) != 0) { + fprintf(LOGFILE, "Couldn't prepare jars directory %s of user %s.\n", + jobid, user); + exit_code = PREPARE_JARS_DIRECTORY_FAILED; + goto cleanup; + } + + if (check_task_logs_dir(log_dir) < 0) { + fprintf(LOGFILE, "Problem with the log directory passed or configured.\n"); + exit_code = INVALID_TT_LOG_DIR; + goto cleanup; + } + + if (prepare_task_logs(log_dir, taskid) != 0) { + fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", log_dir, + taskid); + exit_code = PREPARE_TASK_LOGS_FAILED; + goto cleanup; + } + + job_dir = get_job_directory(tt_root, jobid); + if (job_dir == NULL) { + fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root); + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + attempt_dir = get_attempt_directory(job_dir, taskid); + if (attempt_dir == NULL) { + fprintf(LOGFILE, "Couldn't obtain attempt_dir for %s in %s.\n", taskid, job_dir); + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + task_script_path = get_task_launcher_file(attempt_dir); if (task_script_path == NULL) { - cleanup(); - return INVALID_TASK_SCRIPT_PATH; + fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir); + exit_code = OUT_OF_MEMORY; + goto cleanup; } + errno = 0; exit_code = check_path(task_script_path); if(exit_code != 0) { goto cleanup; } - errno = 0; - exit_code = check_owner(uid, task_script_path); - if(exit_code != 0) { + + //change the user + fcloseall(); + umask(0077); + if (change_user(user) != 0) { + exit_code = SETUID_OPER_FAILED; goto cleanup; } + errno = 0; cleanup(); execlp(task_script_path, task_script_path, NULL); if (errno != 0) { - free(task_script_path); - exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; - } + switch (errno) { + case E2BIG: + fprintf(LOGFILE, + "The total number of bytes in the environment (envp)\ + and argument list (argv) is too large."); + break; + case EACCES: + fprintf(LOGFILE, + "Search permission is denied on a component of the path prefix\ + of filename or the name of a script interpreter. or"); + fprintf(LOGFILE, + "Execute permission is denied for the file or a script or \ + ELF interpreter."); + break; + case ENOENT: + fprintf(LOGFILE, + "The file filename or a script or ELF interpreter does not exist,\ + or a shared library needed for file or interpreter cannot be found."); + break; + } + free(task_script_path); + exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; + } return exit_code; cleanup: + if (job_dir != NULL) { + free(job_dir); + } + if (attempt_dir != NULL) { + free(attempt_dir); + } + if (jars_dir != NULL) { + free(jars_dir); + } if (task_script_path != NULL) { free(task_script_path); } @@ -261,19 +789,23 @@ * The function sends appropriate signal to the process group * specified by the task_pid. */ - int kill_user_task(const char *user, const char *task_pid, int sig) { int pid = 0; if(task_pid == NULL) { return INVALID_ARGUMENT_NUMBER; } + + fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user); + fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid); + fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig); + pid = atoi(task_pid); if(pid <= 0) { return INVALID_TASK_PID; } - fclose(LOGFILE); + fcloseall(); if (change_user(user) != 0) { cleanup(); @@ -283,6 +815,7 @@ //Don't continue if the process-group is not alive anymore. if(kill(-pid,0) < 0) { errno = 0; + cleanup(); return 0; } @@ -298,3 +831,110 @@ return 0; } +/** + * Function to tidy up things when a task given by task-id finishes. + * As of now, it changes the ownership of the attempt directory and log + * directory back to TT so that + * 1) TT can serve the output in case of map-tasks, + * 2) TT can directly read the logs of finished tasks and + * 3) task dirs can be cleaned up directly by TT once not needed anymore. + */ +int finalize_task_dirs(const char *job_id, const char *task_id, + const char *log_dir) { + + if (job_id == NULL || task_id == NULL || log_dir == NULL) { + return INVALID_ARGUMENT_NUMBER; + } + + fprintf(LOGFILE, "job-id passed to finalize_task_dirs : %s.\n", job_id); + fprintf(LOGFILE, "task-d passed to finalize_task_dirs : %s.\n", task_id); + fprintf(LOGFILE, "log_dir passed to finalize_task_dirs : %s.\n", log_dir); + + char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *attempt_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + // prepare attempt-dir in each of the mapred_local_dir + job_dir = get_job_directory(*local_dir_ptr, job_id); + attempt_dir = get_attempt_directory(job_dir, task_id); + if (opendir(attempt_dir) == NULL) { + fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n", + attempt_dir); + } else if (finalize_directories(attempt_dir)) { + fprintf(LOGFILE, "Failed to finalise the directory %s\n", attempt_dir); + failed = 0; + } + local_dir_ptr++; + free(attempt_dir); + free(job_dir); + } + free(local_dir); + free(full_local_dir_str); + + cleanup(); + if (failed == 0) { + return FINALIZE_TASK_DIRS_FAILED; + } + return 0; +} + +/** + * Function to tidy up things once the job given by job-id finishes. + * As of now, it changes the ownership of job jars back to the TT so that they + * can be cleaned up by TT itself. + */ +int finalize_job(const char *jobid) { + if (jobid == NULL) { + return INVALID_ARGUMENT_NUMBER; + } + + fprintf(LOGFILE, "Job-id passed to finalize_job : %s.\n", jobid); + + char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return FINALIZE_JOB_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *jars_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + // finalise jars in each of the mapred_local_dir + job_dir = get_job_directory(*local_dir_ptr, jobid); + jars_dir = get_job_jars_directory(job_dir); + if (finalize_directories(jars_dir) != 0) { + failed = 0; + } + local_dir_ptr++; + free(job_dir); + free(jars_dir); + } + free(local_dir); + free(full_local_dir_str); + cleanup(); + if (failed == 0) { + return FINALIZE_JOB_FAILED; + } + return 0; +} Index: src/c++/task-controller/main.c =================================================================== --- src/c++/task-controller/main.c (revision 787560) +++ src/c++/task-controller/main.c (working copy) @@ -24,6 +24,7 @@ const char * job_id = NULL; const char * task_id = NULL; const char * tt_root = NULL; + const char *log_dir = NULL; int exit_code = 0; const char * task_pid = NULL; const char* const short_options = "l:"; @@ -35,7 +36,7 @@ //Minimum number of arguments required to run the task-controller //command-name user command tt-root if (argc < 3) { - display_usage(stderr); + display_usage(stdout); return INVALID_ARGUMENT_NUMBER; } @@ -55,20 +56,20 @@ } } while (next_option != -1); if (log_file == NULL) { - LOGFILE = stderr; + LOGFILE = stdout; } else { LOGFILE = fopen(log_file, "a"); if (LOGFILE == NULL) { - fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file); - LOGFILE = stderr; + fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file); + LOGFILE = stdout; } - if (LOGFILE != stderr) { + if (LOGFILE != stdout) { if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH | S_IRGRP | S_IWGRP) < 0) { - fprintf(stderr, "Unable to change permission of the log file %s \n", + fprintf(stdout, "Unable to change permission of the log file %s \n", log_file); - fprintf(stderr, "changing log file to stderr"); - LOGFILE = stderr; + fprintf(stdout, "changing log file to stdout"); + LOGFILE = stdout; } } } @@ -88,17 +89,18 @@ } optind = optind + 1; command = atoi(argv[optind++]); -#ifdef DEBUG + fprintf(LOGFILE, "main : command provided %d\n",command); fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name); -#endif + switch (command) { case LAUNCH_TASK_JVM: tt_root = argv[optind++]; job_id = argv[optind++]; task_id = argv[optind++]; + log_dir = argv[optind++]; exit_code - = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root); + = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root, log_dir); break; case TERMINATE_TASK_JVM: task_pid = argv[optind++]; @@ -108,6 +110,16 @@ task_pid = argv[optind++]; exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL); break; + case FINALIZE_TASK_DIRS: + job_id = argv[optind++]; + task_id = argv[optind++]; + log_dir = argv[optind++]; + exit_code = finalize_task_dirs(job_id, task_id, log_dir); + break; + case FINALIZE_JOB: + job_id = argv[optind++]; + exit_code = finalize_job(job_id); + break; default: exit_code = INVALID_COMMAND_PROVIDED; } Index: src/c++/task-controller/configuration.c =================================================================== --- src/c++/task-controller/configuration.c (revision 787560) +++ src/c++/task-controller/configuration.c (working copy) @@ -71,9 +71,8 @@ snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR); #endif -#ifdef DEBUG - fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name); -#endif + fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name); + //allocate space for ten configuration items. config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *) * MAX_SIZE); @@ -87,7 +86,7 @@ while(!feof(conf_file)) { line = (char *) malloc(linesize); if(line == NULL) { - fprintf(LOGFILE,"malloc failed while reading configuration file.\n"); + fprintf(LOGFILE, "malloc failed while reading configuration file.\n"); goto cleanup; } size_read = getline(&line,&linesize,conf_file); @@ -123,9 +122,9 @@ "Failed allocating memory for single configuration item\n"); goto cleanup; } -#ifdef DEBUG - fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok); -#endif + + fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok); + memset(config.confdetails[config.size], 0, sizeof(struct confentry)); config.confdetails[config.size]->key = (char *) malloc( sizeof(char) * (strlen(equaltok)+1)); @@ -142,9 +141,9 @@ free(config.confdetails[config.size]); continue; } -#ifdef DEBUG - fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok); -#endif + + fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok); + config.confdetails[config.size]->value = (char *) malloc( sizeof(char) * (strlen(equaltok)+1)); strcpy((char *)config.confdetails[config.size]->value, equaltok); @@ -184,8 +183,7 @@ * array, next time onwards used the populated array. * */ - -const char * get_value(char* key) { +const char * get_value(const char* key) { int count; if (config.size == 0) { get_configs(); @@ -196,15 +194,19 @@ } for (count = 0; count < config.size; count++) { if (strcmp(config.confdetails[count]->key, key) == 0) { - return config.confdetails[count]->value; + return strdup(config.confdetails[count]->value); } } return NULL; } -const char ** get_values(char * key) { +/** + * Function to return an array of values for a key. + * Value delimiter is assumed to be a comma. + */ +const char ** get_values(const char * key) { const char ** toPass = NULL; - const char * value = get_value(key); + const char *value = get_value(key); char *tempTok = NULL; char *tempstr = NULL; int size = 0; Index: src/c++/task-controller/task-controller.h =================================================================== --- src/c++/task-controller/task-controller.h (revision 787560) +++ src/c++/task-controller/task-controller.h (working copy) @@ -28,14 +28,20 @@ #include #include #include -#include +#include +#include +#include +#include + #include "configuration.h" //command definitions enum command { LAUNCH_TASK_JVM, TERMINATE_TASK_JVM, - KILL_TASK_JVM + KILL_TASK_JVM, + FINALIZE_TASK_DIRS, + FINALIZE_JOB, }; enum errorcodes { @@ -45,21 +51,37 @@ SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4 INVALID_TT_ROOT, //5 SETUID_OPER_FAILED, //6 - INVALID_TASK_SCRIPT_PATH, //7 - UNABLE_TO_EXECUTE_TASK_SCRIPT, //8 - UNABLE_TO_KILL_TASK, //9 - INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10 - INVALID_TASK_PID, //11 - ERROR_RESOLVING_FILE_PATH, //12 - RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13 - UNABLE_TO_STAT_FILE, //14 - FILE_NOT_OWNED_BY_TASKTRACKER //15 + UNABLE_TO_EXECUTE_TASK_SCRIPT, //7 + UNABLE_TO_KILL_TASK, //8 + INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //9 + INVALID_TASK_PID, //10 + ERROR_RESOLVING_FILE_PATH, //11 + RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //12 + UNABLE_TO_STAT_FILE, //13 + FILE_NOT_OWNED_BY_TASKTRACKER, //14 + PREPARE_ATTEMPT_DIRECTORIES_FAILED, //15 + PREPARE_JARS_DIRECTORY_FAILED, //16 + PREPARE_TASK_LOGS_FAILED, //17 + INVALID_TT_LOG_DIR, //18 + FINALIZE_TASK_DIRS_FAILED, //19 + FINALIZE_JOB_FAILED, //20 + OUT_OF_MEMORY, //21 }; +#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s" -#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh" +#define TT_ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN"/%s" +#define JOB_DIR_TO_JARS_PATTERN "%s/jars" + +#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s" + +#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s" + +#define ATTEMPT_DIR_TO_TASK_SCRIPT_PATTERN "%s/taskjvm.sh" + #define TT_SYS_DIR_KEY "mapred.local.dir" +#define TT_LOG_DIR_KEY "hadoop.log.dir" #define MAX_ITEMS 10 @@ -74,8 +96,25 @@ void display_usage(FILE *stream); -int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root); +int run_task_as_user(const char * user, const char *jobid, const char *taskid, + const char *tt_root, const char *log_dir); int kill_user_task(const char *user, const char *task_pid, int sig); +int finalize_task_dirs(const char *jobid, const char *taskid, + const char *log_dir); + +int prepare_attempt_directory(const char *attempt_dir, const char *user); + +int finalize_job(const char *job_id); + +// The following functions are exposed for testing + +int check_variable_against_config(const char *config_key, + const char *passed_value); + int get_user_details(const char *user); + +int check_path(char *path); + +char *get_job_cache_directory(const char * tt_root); Index: build.xml =================================================================== --- build.xml (revision 787560) +++ build.xml (working copy) @@ -1657,5 +1657,16 @@ - + + + + + + + + + +