diff --git build.xml build.xml
index f9e0694..e69d869 100644
--- build.xml
+++ build.xml
@@ -1658,5 +1658,16 @@
-
+
+
+
+
+
+
+
+
+
+
diff --git conf/taskcontroller.cfg conf/taskcontroller.cfg
index fb4cced..eb2e5ee 100644
--- conf/taskcontroller.cfg
+++ conf/taskcontroller.cfg
@@ -1,3 +1,2 @@
-mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated
-hadoop.pid.dir=#configured HADOOP_PID_DIR
-hadoop.indent.str=#configured HADOOP_IDENT_STR
+mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths.
+hadoop.log.dir=#configured value of hadoop.log.dir.
\ No newline at end of file
diff --git lib/hadoop-core-0.21.0-dev.jar lib/hadoop-core-0.21.0-dev.jar
index 9494544..ad05a05 100644
Binary files lib/hadoop-core-0.21.0-dev.jar and lib/hadoop-core-0.21.0-dev.jar differ
diff --git src/c++/task-controller/Makefile.in src/c++/task-controller/Makefile.in
index ba35f96..83f418f 100644
--- src/c++/task-controller/Makefile.in
+++ src/c++/task-controller/Makefile.in
@@ -16,9 +16,11 @@
# limitations under the License.
#
OBJS=main.o task-controller.o configuration.o
+TESTOBJS=test-task-controller.o task-controller.o configuration.o
CC=@CC@
CFLAGS = @CFLAGS@
BINARY=task-controller
+TESTBINARY=test-task-controller
installdir = @prefix@
@@ -34,9 +36,15 @@ task-controller.o: task-controller.c task-controller.h
configuration.o: configuration.h configuration.c
$(CC) $(CFLAG) -o configuration.o -c configuration.c
+test-task-controller.o: task-controller.c task-controller.h
+ $(CC) $(CFLAG) -o test-task-controller.o -c test-task-controller.c
+
+test: $(TESTOBJS)
+ $(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS)
+ cp $(TESTBINARY) $(installdir)
clean:
- rm -rf $(BINARY) $(OBJS)
+ rm -rf $(BINARY) $(OBJS) $(TESTOBJS)
install: all
cp $(BINARY) $(installdir)
diff --git src/c++/task-controller/configuration.c src/c++/task-controller/configuration.c
index 05b267a..b6abbf5 100644
--- src/c++/task-controller/configuration.c
+++ src/c++/task-controller/configuration.c
@@ -71,9 +71,8 @@ void get_configs() {
snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR);
#endif
-#ifdef DEBUG
- fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name);
-#endif
+ fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name);
+
//allocate space for ten configuration items.
config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
* MAX_SIZE);
@@ -87,7 +86,7 @@ void get_configs() {
while(!feof(conf_file)) {
line = (char *) malloc(linesize);
if(line == NULL) {
- fprintf(LOGFILE,"malloc failed while reading configuration file.\n");
+ fprintf(LOGFILE, "malloc failed while reading configuration file.\n");
goto cleanup;
}
size_read = getline(&line,&linesize,conf_file);
@@ -123,9 +122,9 @@ void get_configs() {
"Failed allocating memory for single configuration item\n");
goto cleanup;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok);
-#endif
+
+ fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok);
+
memset(config.confdetails[config.size], 0, sizeof(struct confentry));
config.confdetails[config.size]->key = (char *) malloc(
sizeof(char) * (strlen(equaltok)+1));
@@ -142,9 +141,9 @@ void get_configs() {
free(config.confdetails[config.size]);
continue;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok);
-#endif
+
+ fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok);
+
config.confdetails[config.size]->value = (char *) malloc(
sizeof(char) * (strlen(equaltok)+1));
strcpy((char *)config.confdetails[config.size]->value, equaltok);
@@ -184,8 +183,7 @@ void get_configs() {
* array, next time onwards used the populated array.
*
*/
-
-const char * get_value(char* key) {
+const char * get_value(const char* key) {
int count;
if (config.size == 0) {
get_configs();
@@ -196,15 +194,19 @@ const char * get_value(char* key) {
}
for (count = 0; count < config.size; count++) {
if (strcmp(config.confdetails[count]->key, key) == 0) {
- return config.confdetails[count]->value;
+ return strdup(config.confdetails[count]->value);
}
}
return NULL;
}
-const char ** get_values(char * key) {
+/**
+ * Function to return an array of values for a key.
+ * Value delimiter is assumed to be a comma.
+ */
+const char ** get_values(const char * key) {
const char ** toPass = NULL;
- const char * value = get_value(key);
+ const char *value = get_value(key);
char *tempTok = NULL;
char *tempstr = NULL;
int size = 0;
diff --git src/c++/task-controller/configuration.h.in src/c++/task-controller/configuration.h.in
index af8086b..ebdf4d8 100644
--- src/c++/task-controller/configuration.h.in
+++ src/c++/task-controller/configuration.h.in
@@ -53,10 +53,10 @@ extern struct configuration config;
extern char *hadoop_conf_dir;
#endif
//method exposed to get the configurations
-const char * get_value(char* key);
+const char * get_value(const char* key);
//method to free allocated configuration
void free_configurations();
//function to return array of values pointing to the key. Values are
//comma seperated strings.
-const char ** get_values(char* key);
+const char ** get_values(const char* key);
diff --git src/c++/task-controller/configure.ac src/c++/task-controller/configure.ac
index 2fd52a8..8605ac8 100644
--- src/c++/task-controller/configure.ac
+++ src/c++/task-controller/configure.ac
@@ -38,7 +38,7 @@ AC_PROG_CC
# Checks for header files.
AC_HEADER_STDC
-AC_CHECK_HEADERS([stdlib.h string.h unistd.h])
+AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h])
#check for HADOOP_CONF_DIR
@@ -50,12 +50,14 @@ fi
# Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
AC_TYPE_PID_T
+AC_TYPE_MODE_T
+AC_TYPE_SIZE_T
# Checks for library functions.
AC_FUNC_MALLOC
AC_FUNC_REALLOC
-AC_CHECK_FUNCS([strerror])
+AC_FUNC_CHOWN
+AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup])
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
-
diff --git src/c++/task-controller/main.c src/c++/task-controller/main.c
index c19785a..39154a8 100644
--- src/c++/task-controller/main.c
+++ src/c++/task-controller/main.c
@@ -17,6 +17,26 @@
*/
#include "task-controller.h"
+void open_log_file(const char *log_file) {
+ if (log_file == NULL) {
+ LOGFILE = stdout;
+ } else {
+ LOGFILE = fopen(log_file, "a");
+ if (LOGFILE == NULL) {
+ fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file);
+ LOGFILE = stdout;
+ }
+ if (LOGFILE != stdout) {
+ if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
+ | S_IRGRP | S_IWGRP) < 0) {
+ fprintf(stdout, "Unable to change permission of the log file %s \n",
+ log_file);
+ fprintf(stdout, "changing log file to stdout");
+ LOGFILE = stdout;
+ }
+ }
+ }
+}
int main(int argc, char **argv) {
int command;
@@ -24,6 +44,7 @@ int main(int argc, char **argv) {
const char * job_id = NULL;
const char * task_id = NULL;
const char * tt_root = NULL;
+ const char *log_dir = NULL;
int exit_code = 0;
const char * task_pid = NULL;
const char* const short_options = "l:";
@@ -35,7 +56,7 @@ int main(int argc, char **argv) {
//Minimum number of arguments required to run the task-controller
//command-name user command tt-root
if (argc < 3) {
- display_usage(stderr);
+ display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
@@ -54,24 +75,9 @@ int main(int argc, char **argv) {
break;
}
} while (next_option != -1);
- if (log_file == NULL) {
- LOGFILE = stderr;
- } else {
- LOGFILE = fopen(log_file, "a");
- if (LOGFILE == NULL) {
- fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file);
- LOGFILE = stderr;
- }
- if (LOGFILE != stderr) {
- if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
- | S_IRGRP | S_IWGRP) < 0) {
- fprintf(stderr, "Unable to change permission of the log file %s \n",
- log_file);
- fprintf(stderr, "changing log file to stderr");
- LOGFILE = stderr;
- }
- }
- }
+
+ open_log_file(log_file);
+
//checks done for user name
//checks done if the user is root or not.
if (argv[optind] == NULL) {
@@ -88,17 +94,28 @@ int main(int argc, char **argv) {
}
optind = optind + 1;
command = atoi(argv[optind++]);
-#ifdef DEBUG
+
fprintf(LOGFILE, "main : command provided %d\n",command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
-#endif
+
switch (command) {
+ case INITIALIZE_JOB:
+ job_id = argv[optind++];
+ exit_code = initialize_job(job_id, user_detail->pw_name);
+ break;
case LAUNCH_TASK_JVM:
tt_root = argv[optind++];
job_id = argv[optind++];
task_id = argv[optind++];
+ log_dir = argv[optind++];
exit_code
- = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
+ = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root, log_dir);
+ break;
+ case INITIALIZE_TASK:
+ job_id = argv[optind++];
+ task_id = argv[optind++];
+ log_dir = argv[optind++];
+ exit_code = initialize_task(job_id, task_id, log_dir, user_detail->pw_name);
break;
case TERMINATE_TASK_JVM:
task_pid = argv[optind++];
diff --git src/c++/task-controller/task-controller.c src/c++/task-controller/task-controller.c
index 511f0cf..eb7e2ef 100644
--- src/c++/task-controller/task-controller.c
+++ src/c++/task-controller/task-controller.c
@@ -71,104 +71,462 @@ int change_user(const char * user) {
return 0;
}
-// function to check if the passed tt_root is present in hadoop.tmp.dir
-int check_tt_root(const char *tt_root) {
- char ** mapred_local_dir;
- int found = -1;
+/**
+ * Checks the passed value for the variable config_key against the values in
+ * the configuration.
+ * Returns 0 if the passed value is found in the configuration,
+ * -1 otherwise
+ */
+int check_variable_against_config(const char *config_key,
+ const char *passed_value) {
- if (tt_root == NULL) {
+ if (config_key == NULL || passed_value == NULL) {
return -1;
}
- mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY);
+ int found = -1;
+
+ const char **config_value = get_values(config_key);
- if (mapred_local_dir == NULL) {
+ if (config_value == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", config_key);
return -1;
}
- while(*mapred_local_dir != NULL) {
- if(strcmp(*mapred_local_dir,tt_root) == 0) {
+ char **config_val_ptr = (char **) config_value;
+ while (*config_val_ptr != NULL) {
+ if (strcmp(*config_val_ptr, passed_value) == 0) {
found = 0;
break;
}
+ config_val_ptr++;
+ }
+
+ if (found != 0) {
+ fprintf(
+ LOGFILE,
+ "Invalid value passed: \
+ Configured value of %s is %s. \
+ Passed value is %s.\n",
+ config_key, get_value(config_key), passed_value);
}
- free(mapred_local_dir);
+ free(config_value);
return found;
}
/**
+ * Utility function to concatenate argB to argA using the concat_pattern
+ */
+char *concatenate(const char *argA, const char *argB, char *concat_pattern,
+ char *return_path_name) {
+ if (argA == NULL || argB == NULL) {
+ fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+ return_path_name);
+ return NULL;
+ }
+
+ char *return_path = NULL;
+ int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+
+ return_path = (char *) malloc(sizeof(char) * (str_len + 1));
+ if (return_path == NULL) {
+ fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+ return NULL;
+ }
+ memset(return_path, '\0', str_len + 1);
+ snprintf(return_path, str_len, concat_pattern, argA, argB);
+ return return_path;
+}
+
+/**
+ * Get the job-directory path from tt_root and job-id
+ */
+char *get_job_directory(const char * tt_root, const char *jobid) {
+ return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+}
+
+/**
+ * Get the job-jars directory from the job_dir
+ */
+char *get_job_jars_directory(const char *job_dir) {
+ return concatenate(job_dir, "", JOB_DIR_TO_JARS_PATTERN,
+ "job_jars_dir_path");
+}
+
+/**
+ * Get the work directory from the job_dir
+ */
+char *get_work_directory(const char *job_dir) {
+ return concatenate(job_dir, "", JOB_DIR_TO_WORK_DIR_PATTERN,
+ "job_work_dir_path");
+}
+
+/**
+ * Get the attempt directory for the given attempt_id
+ */
+char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
+ return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
+ "attempt_dir_path");
+}
+
+/*
+ * Get the path to the task launcher file which is created by the TT
+ */
+char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) {
+ return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN,
+ "task_script_path");
+}
+
+/**
+ * Get the log directory for the given attempt.
+ */
+char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
+ return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
+ "task_log_dir");
+}
+
+/**
+ * Function to check if the passed tt_root is present in mapred.local.dir
+ * the task-controller is configured with.
+ */
+int check_tt_root(const char *tt_root) {
+ return check_variable_against_config(TT_SYS_DIR_KEY, tt_root);
+}
+
+/**
+ * Function to check if the passed log_dir is present in hadoop.log.dir
+ * the task-controller is configured with.
+ */
+int check_task_logs_dir(const char *log_dir) {
+ return check_variable_against_config(TT_LOG_DIR_KEY, log_dir);
+}
+
+/**
* Function to check if the constructed path and absolute
* path resolve to one and same.
*/
-
int check_path(char *path) {
+ fprintf(LOGFILE, "Passed path : %s\n", path);
char * resolved_path = (char *) canonicalize_file_name(path);
- if(resolved_path == NULL) {
+ if (resolved_path == NULL) {
+ switch (errno) {
+ case ENAMETOOLONG:
+ fprintf(LOGFILE, "The resulting path is too long.\n");
+ break;
+ case EACCES:
+ fprintf(LOGFILE, "The path is not readable.\n");
+ break;
+ case ENOENT:
+ fprintf(LOGFILE, "The input file name is empty or %s",
+ "the path components does not exist.\n");
+ break;
+ case ELOOP:
+ fprintf(LOGFILE, "More than `MAXSYMLINKS' many symlinks have been followed.\n");
+ break;
+ }
return ERROR_RESOLVING_FILE_PATH;
}
- if(strcmp(resolved_path, path) !=0) {
+ fprintf(LOGFILE, "Resolved path : %s\n", resolved_path);
+ if (strcmp(resolved_path, path) != 0) {
free(resolved_path);
return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
}
free(resolved_path);
return 0;
}
+
/**
- * Function to check if a user actually owns the file.
+ * Function to change the owner/group of a given path.
*/
-int check_owner(uid_t uid, char *path) {
- struct stat filestat;
- if(stat(path, &filestat)!=0) {
- return UNABLE_TO_STAT_FILE;
+int change_owner(const char *path, uid_t uid, gid_t gid) {
+ int exit_code = chown(path, uid, gid);
+ if (exit_code != 0) {
+ switch (errno) {
+ case (EPERM):
+ fprintf(LOGFILE,
+ "Process lacks permission to make the requested change.\n");
+ break;
+ case (EROFS):
+ fprintf(LOGFILE, "File is on a read-only file system.\n");
+ break;
+ case (EACCES):
+ fprintf(
+ LOGFILE,
+ "Process does not have search permission for a directory \
+ component of the file name.\n");
+ break;
+ case (ENAMETOOLONG):
+ fprintf(LOGFILE, "The file name is too long.\n");
+ break;
+ case (ENOENT):
+ fprintf(LOGFILE,
+ "A directory component in the file name doesn't exist.\n");
+ break;
+ case (ENOTDIR):
+ fprintf(
+ LOGFILE,
+ "A directory component in the file name exists, \
+ but it isn't a directory.\n");
+ break;
+ case (ELOOP):
+ fprintf(
+ LOGFILE,
+ "Too many symbolic links were resolved \
+ while trying to look up the file name.\n");
+ break;
+ default:
+ fprintf(LOGFILE, "chown failed with error number : %d.\n", errno);
+ }
}
- //check owner.
- if(uid != filestat.st_uid){
- return FILE_NOT_OWNED_BY_TASKTRACKER;
+ return exit_code;
+}
+
+/**
+ * Function to change the mode of a given path.
+ */
+int change_mode(const char *path, mode_t mode) {
+ int exit_code = chmod(path, mode);
+ if (exit_code != 0) {
+ switch (errno) {
+ case (ENOENT):
+ fprintf(LOGFILE, "The named file doesn't exist.\n");
+ break;
+ case (EPERM):
+ fprintf(
+ LOGFILE,
+ "This process does not have permission to change the access\
+ permissions of this file.\n");
+ break;
+ case (EROFS):
+ fprintf(LOGFILE, "The file resides on a read-only file system.\n");
+ break;
+ case (EACCES):
+ fprintf(
+ LOGFILE,
+ "Process does not have search permission for a directory \
+ component of the file name.\n");
+ break;
+ case (ENAMETOOLONG):
+ fprintf(LOGFILE, "The file name is too long.\n");
+ break;
+ case (ENOTDIR):
+ fprintf(
+ LOGFILE,
+ "A directory component in the file name exists, \
+ but it isn't a directory.\n");
+ break;
+ case (ELOOP):
+ fprintf(
+ LOGFILE,
+ "Too many symbolic links were resolved \
+ while trying to look up the file name.\n");
+ break;
+ default:
+ fprintf(LOGFILE, "chown failed with error number : %d.\n", errno);
+ }
+ }
+ return exit_code;
+}
+
+/**
+ * Function to prepare the attempt directories for the task JVM.
+ * This is done by changing the ownership of the attempt directory recursively
+ * to the job owner. We do the following:
+ * * sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/
+ * * sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/
+ */
+int prepare_attempt_directories(const char *job_id, const char *attempt_id,
+ const char *user) {
+ if (job_id == NULL || attempt_id == NULL || user == NULL) {
+ fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n");
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+ return INVALID_USER_NAME;
+ }
+
+ int tasktracker_gid = getgid();
+
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ }
+
+ char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+
+ char *job_dir;
+ char *attempt_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 1;
+ while (*local_dir_ptr != NULL) {
+ job_dir = get_job_directory(*local_dir_ptr, job_id);
+
+ // prepare attempt-dir in each of the mapred_local_dir
+ attempt_dir = get_attempt_directory(job_dir, attempt_id);
+ if (opendir(attempt_dir) == NULL) {
+ fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n",
+ attempt_dir);
+ } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid)
+ != 0) {
+ fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
+ failed = 0;
+ }
+
+ local_dir_ptr++;
+ free(attempt_dir);
+ free(job_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+
+ cleanup();
+ if (failed == 0) {
+ return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
}
return 0;
}
-/*
- * function to provide path to the task file which is created by the tt
- *
- *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern
+/**
+ * Function to prepare the task logs for the child. It gives the ownership
+ * of the attempt's log-dir to the user. It gives readable permissions to
+ * everyone for the attempt's dir and it's contents. This is only till
+ * security is fixed for task-logs.
+ * * sudo chown user:mapred log-dir/userlogs/$attemptid
+ * * sudo chmod -R 2770 log-dir/userlogs/$attemptid
*/
-void get_task_file_path(const char * jobid, const char * taskid,
- const char * tt_root, char **task_script_path) {
- const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY);
- *task_script_path = NULL;
- int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen(
- taskid)) + strlen(tt_root);
-
- if (mapred_local_dir == NULL) {
- return;
- }
-
- *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1));
- if (*task_script_path == NULL) {
- fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n");
- free(mapred_local_dir);
- return;
- }
-
- memset(*task_script_path,'\0',str_len+1);
- snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root,
- jobid, taskid);
-#ifdef DEBUG
- fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path);
- fflush(LOGFILE);
-#endif
- free(mapred_local_dir);
+int prepare_task_logs(const char *log_dir, const char *task_id) {
+
+ char *task_log_dir = get_task_log_dir(log_dir, task_id);
+ if (opendir(task_log_dir) == NULL) {
+ fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
+ task_log_dir);
+ // See TaskRunner.java to see that an absent log-dir doesn't fail the task.
+ return 0;
+ }
+
+ int tasktracker_gid = getgid();
+ if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid) != 0) {
+ fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir);
+ return -1;
+ }
+ return 0;
+}
+
+/**
+ * Function to secure the given path. It does the following recursively:
+ * 1) changes the owner/group of the paths to the passed owner/group
+ * 2) changes the file permission to the secure file_mode
+ */
+int secure_path(const char *path, uid_t uid, gid_t gid) {
+ FTS *tree = NULL; // the file hierarchy
+ FTSENT *entry = NULL; // a file in the hierarchy
+ char *paths[] = { (char *) path };
+ int process_path = 1;
+ int dir = 1;
+ int error_code = 0;
+
+ mode_t mode;
+ mode_t file_mode = S_IRWXU | S_IRWXG; // No setgid on files
+ mode_t dir_mode = S_ISGID | S_IRWXU | S_IRWXG;
+
+ // Get physical locations and don't resolve the symlinks.
+ // Don't change directory while walking the directory.
+ int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
+ tree = fts_open(paths, ftsoptions, NULL);
+ while (((entry = fts_read(tree)) != NULL) && error_code == 0) {
+ dir = 1;
+ switch (entry->fts_info) {
+ case FTS_D:
+ // A directory being visited in pre-order.
+ // We change ownership of directories in post-order.
+ // so ignore the pre-order visit.
+ process_path = 1;
+ break;
+ case FTS_DC:
+ // A directory that causes a cycle in the tree
+ // We don't expect cycles, ignore.
+ process_path = 1;
+ break;
+ case FTS_DNR:
+ // A directory which cannot be read
+ // Ignore and set error code.
+ process_path = 1;
+ error_code = -1;
+ break;
+ case FTS_DOT:
+ // "." or ".."
+ process_path = 1;
+ break;
+ case FTS_F:
+ // A regular file
+ process_path = 0;
+ break;
+ case FTS_DP:
+ // A directory being visited in post-order
+ process_path = 0;
+ dir = 0;
+ break;
+ case FTS_SL:
+ // A symbolic link
+ process_path = 0;
+ break;
+ case FTS_SLNONE:
+ // A symbolic link with a nonexistent target
+ process_path = 0;
+ break;
+ case FTS_NS:
+ // A file for which no stat(2) information was available
+ // Ignore and set error code
+ process_path = 1;
+ error_code = -1;
+ break;
+ case FTS_DEFAULT:
+ // File that doesn't belong to any of the above type. Ignore.
+ process_path = 1;
+ break;
+ }
+
+ if (error_code != 0) {
+ break;
+ }
+ if (process_path == 1) {
+ continue;
+ }
+
+ if (change_owner(entry->fts_path, uid, gid) != 0) {
+ fprintf(LOGFILE, "couldn't change the ownership of %s\n",
+ entry->fts_path);
+ error_code = -3;
+ } else {
+ if (dir == 0) {
+ mode = dir_mode;
+ } else {
+ mode = file_mode;
+ }
+ if (change_mode(entry->fts_path, mode) != 0) {
+ fprintf(LOGFILE, "couldn't change the permissions of %s\n",
+ entry->fts_path);
+ error_code = -3;
+ }
+ }
+ }
+ fts_close(tree);
+ return error_code;
}
-//end of private functions
void display_usage(FILE *stream) {
fprintf(stream,
"Usage: task-controller [-l logfile] user command command-args\n");
}
//function used to populate and user_details structure.
-
int get_user_details(const char *user) {
if (user_detail == NULL) {
user_detail = getpwnam(user);
@@ -180,31 +538,126 @@ int get_user_details(const char *user) {
return 0;
}
+
+/**
+ * Function to prepare the job directories for the task JVM.
+ * We do the following:
+ * * sudo chown user:mapred -R taskTracker/jobcache/$jobid
+ * * sudo chmod 2770 -R taskTracker/jobcache/$jobid
+ * * sudo chmod 0570 taskTracker/jobcache/$jobid/
+ */
+int initialize_job(const char *jobid, const char *user) {
+ if (jobid == NULL || user == NULL) {
+ fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n");
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+ return INVALID_USER_NAME;
+ }
+
+ gid_t tasktracker_gid = getgid(); // TaskTracker's group-id
+
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return INVALID_TT_ROOT;
+ }
+
+ char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+
+ char *job_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 1;
+ while (*local_dir_ptr != NULL) {
+ job_dir = get_job_directory(*local_dir_ptr, jobid);
+
+ if (opendir(job_dir) == NULL) {
+ fprintf(LOGFILE, "job_dir %s doesn't exist. Not doing anything.\n",
+ job_dir);
+ } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid) != 0) {
+ fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir);
+ failed = 0;
+ } else if (change_mode(job_dir, S_IREAD | S_IEXEC | S_IRWXG) != 0) {
+ fprintf(LOGFILE, "couldn't change the permissions of %s\n", job_dir);
+ failed = 0;
+ }
+
+ local_dir_ptr++;
+ free(job_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+ cleanup();
+ if (failed == 0) {
+ return INITIALIZE_JOB_FAILED;
+ }
+ return 0;
+}
+
+/**
+ * Function used to initialize task. Prepares attempt_dir, jars_dir and
+ * log_dir to be accessible by the child
+ * TODO: fix exit codes
+ */
+int initialize_task(const char *jobid, const char *taskid,
+ const char *log_dir, const char *user) {
+ int exit_code = 0;
+ fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid);
+ fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid);
+ fprintf(LOGFILE, "log_dir passed to initialize_task : %s.\n", log_dir);
+
+ if (prepare_attempt_directories(jobid, taskid, user) != 0) {
+ fprintf(LOGFILE,
+ "Couldn't prepare the attempt directories for %s of user %s.\n",
+ taskid, user);
+ exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ goto cleanup;
+ }
+
+ if (check_task_logs_dir(log_dir) < 0) {
+ fprintf(LOGFILE, "Problem with the log directory passed or configured.\n");
+ exit_code = INVALID_TT_LOG_DIR;
+ goto cleanup;
+ }
+
+ if (prepare_task_logs(log_dir, taskid) != 0) {
+ fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
+ log_dir, taskid);
+ exit_code = PREPARE_TASK_LOGS_FAILED;
+ }
+
+ cleanup:
+ // free configurations
+ cleanup();
+ return exit_code;
+}
+
/*
- *Function used to launch a task as the provided user.
- * First the function checks if the tt_root passed is found in
- * hadoop.temp.dir
- * Uses get_task_file_path to fetch the task script file path.
- * Does an execlp on the same in order to replace the current image with
+ * Function used to launch a task as the provided user. It does the following :
+ * 1) Checks if the tt_root passed is found in mapred.local.dir
+ * 2) Prepares attempt_dir, jars_dir and log_dir to be accessible by the child
+ * 3) Uses get_task_launcher_file to fetch the task script file path
+ * 4) Does an execlp on the same in order to replace the current image with
* task image.
*/
-
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
- const char *tt_root) {
- char *task_script_path = NULL;
+ const char *tt_root, const char *log_dir) {
int exit_code = 0;
- uid_t uid = getuid();
- if(jobid == NULL || taskid == NULL) {
+ if (jobid == NULL || taskid == NULL || tt_root == NULL || log_dir == NULL) {
return INVALID_ARGUMENT_NUMBER;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid);
- fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid);
- fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root);
- fflush(LOGFILE);
-#endif
+ fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
+ fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
+ fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
+ fprintf(LOGFILE, "log_dir passed to run_task_as_user : %s.\n", log_dir);
+
//Check tt_root before switching the user, as reading configuration
//file requires privileged access.
if (check_tt_root(tt_root) < 0) {
@@ -213,41 +666,94 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid,
return INVALID_TT_ROOT;
}
- //change the user
- fclose(LOGFILE);
- fcloseall();
- umask(0);
- if (change_user(user) != 0) {
- cleanup();
- return SETUID_OPER_FAILED;
+ char *job_dir = NULL, *jars_dir = NULL, *task_script_path = NULL;
+
+ if (prepare_attempt_directories(jobid, taskid, user) != 0) {
+ fprintf(LOGFILE,
+ "Couldn't prepare the attempt directories for %s of user %s.\n",
+ taskid, user);
+ exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ goto cleanup;
}
- get_task_file_path(jobid, taskid, tt_root, &task_script_path);
+ if (check_task_logs_dir(log_dir) < 0) {
+ fprintf(LOGFILE, "Problem with the log directory passed or configured.\n");
+ exit_code = INVALID_TT_LOG_DIR;
+ goto cleanup;
+ }
+
+ if (prepare_task_logs(log_dir, taskid) != 0) {
+ fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", log_dir,
+ taskid);
+ exit_code = PREPARE_TASK_LOGS_FAILED;
+ goto cleanup;
+ }
+
+ job_dir = get_job_directory(tt_root, jobid);
+ if (job_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
+ exit_code = OUT_OF_MEMORY;
+ goto cleanup;
+ }
+
+ task_script_path = get_task_launcher_file(job_dir, taskid);
if (task_script_path == NULL) {
- cleanup();
- return INVALID_TASK_SCRIPT_PATH;
+ fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir);
+ exit_code = OUT_OF_MEMORY;
+ goto cleanup;
}
+
errno = 0;
exit_code = check_path(task_script_path);
if(exit_code != 0) {
goto cleanup;
}
- errno = 0;
- exit_code = check_owner(uid, task_script_path);
- if(exit_code != 0) {
+
+ //change the user
+ fcloseall();
+ umask(0007);
+ if (change_user(user) != 0) {
+ exit_code = SETUID_OPER_FAILED;
goto cleanup;
}
+
errno = 0;
cleanup();
execlp(task_script_path, task_script_path, NULL);
if (errno != 0) {
- free(task_script_path);
- exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
- }
+ switch (errno) {
+ case E2BIG:
+ fprintf(LOGFILE,
+ "The total number of bytes in the environment (envp)\
+ and argument list (argv) is too large.");
+ break;
+ case EACCES:
+ fprintf(LOGFILE,
+ "Search permission is denied on a component of the path prefix\
+ of filename or the name of a script interpreter. or");
+ fprintf(LOGFILE,
+ "Execute permission is denied for the file or a script or \
+ ELF interpreter.");
+ break;
+ case ENOENT:
+ fprintf(LOGFILE,
+ "The file filename or a script or ELF interpreter does not exist,\
+ or a shared library needed for file or interpreter cannot be found.");
+ break;
+ }
+ free(task_script_path);
+ exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+ }
return exit_code;
cleanup:
+ if (job_dir != NULL) {
+ free(job_dir);
+ }
+ if (jars_dir != NULL) {
+ free(jars_dir);
+ }
if (task_script_path != NULL) {
free(task_script_path);
}
@@ -261,19 +767,23 @@ cleanup:
* The function sends appropriate signal to the process group
* specified by the task_pid.
*/
-
int kill_user_task(const char *user, const char *task_pid, int sig) {
int pid = 0;
if(task_pid == NULL) {
return INVALID_ARGUMENT_NUMBER;
}
+
+ fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user);
+ fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid);
+ fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig);
+
pid = atoi(task_pid);
if(pid <= 0) {
return INVALID_TASK_PID;
}
- fclose(LOGFILE);
+
fcloseall();
if (change_user(user) != 0) {
cleanup();
@@ -283,6 +793,7 @@ int kill_user_task(const char *user, const char *task_pid, int sig) {
//Don't continue if the process-group is not alive anymore.
if(kill(-pid,0) < 0) {
errno = 0;
+ cleanup();
return 0;
}
@@ -297,4 +808,3 @@ int kill_user_task(const char *user, const char *task_pid, int sig) {
cleanup();
return 0;
}
-
diff --git src/c++/task-controller/task-controller.h src/c++/task-controller/task-controller.h
index 8e545b5..f160688 100644
--- src/c++/task-controller/task-controller.h
+++ src/c++/task-controller/task-controller.h
@@ -28,14 +28,20 @@
#include
#include
#include
-#include
+#include
+#include
+#include
+#include
+
#include "configuration.h"
//command definitions
enum command {
+ INITIALIZE_JOB,
LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM
+ KILL_TASK_JVM,
};
enum errorcodes {
@@ -45,22 +51,43 @@ enum errorcodes {
SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
INVALID_TT_ROOT, //5
SETUID_OPER_FAILED, //6
- INVALID_TASK_SCRIPT_PATH, //7
- UNABLE_TO_EXECUTE_TASK_SCRIPT, //8
- UNABLE_TO_KILL_TASK, //9
- INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10
- INVALID_TASK_PID, //11
- ERROR_RESOLVING_FILE_PATH, //12
- RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
- UNABLE_TO_STAT_FILE, //14
- FILE_NOT_OWNED_BY_TASKTRACKER //15
+ UNABLE_TO_EXECUTE_TASK_SCRIPT, //7
+ UNABLE_TO_KILL_TASK, //8
+ INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //9
+ INVALID_TASK_PID, //10
+ ERROR_RESOLVING_FILE_PATH, //11
+ RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //12
+ UNABLE_TO_STAT_FILE, //13
+ FILE_NOT_OWNED_BY_TASKTRACKER, //14
+ PREPARE_ATTEMPT_DIRECTORIES_FAILED, //15
+ INITIALIZE_JOB_FAILED, //16
+ PREPARE_TASK_LOGS_FAILED, //17
+ INVALID_TT_LOG_DIR, //18
+ FINALIZE_TASK_DIRS_FAILED, //19
+ FINALIZE_JOB_FAILED, //20
+ OUT_OF_MEMORY, //21
};
+#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+
+#define TT_ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN"/%s"
+
+#define JOB_DIR_TO_JARS_PATTERN "%s/jars"
+
+#define JOB_DIR_TO_WORK_DIR_PATTERN "%s/work"
-#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
+#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
+
+#define JOB_DIR_TO_TASK_WORK_DIR_PATTERN "%s/%s/work"
+
+#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
#define TT_SYS_DIR_KEY "mapred.local.dir"
+#define TT_LOG_DIR_KEY "hadoop.log.dir"
+
#define MAX_ITEMS 10
#ifndef HADOOP_CONF_DIR
@@ -74,8 +101,25 @@ extern FILE *LOGFILE;
void display_usage(FILE *stream);
-int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root);
+int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root, const char *log_dir);
+
+int initialize_task(const char *jobid, const char *taskid,
+ const char *log_dir, const char *user);
+
+int initialize_job(const char *jobid, const char *user);
int kill_user_task(const char *user, const char *task_pid, int sig);
+int prepare_attempt_directory(const char *attempt_dir, const char *user);
+
+// The following functions are exposed for testing
+
+int check_variable_against_config(const char *config_key,
+ const char *passed_value);
+
int get_user_details(const char *user);
+
+int check_path(char *path);
+
+char *get_job_cache_directory(const char * tt_root);
diff --git src/c++/task-controller/test-task-controller.c src/c++/task-controller/test-task-controller.c
new file mode 100644
index 0000000..9660f1a
--- /dev/null
+++ src/c++/task-controller/test-task-controller.c
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "task-controller.h"
+
+#define HADOOP_CONF_DIR "/tmp"
+
+int write_config_file(char *file_name) {
+ FILE *file;
+ char const *str =
+ "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+
+ file = fopen(file_name, "w");
+ if (file == NULL) {
+ printf("Failed to open %s.\n", file_name);
+ return EXIT_FAILURE;
+ }
+ fwrite(str, 1, strlen(str), file);
+ fclose(file);
+ return 0;
+}
+
+void test_check_variable_against_config() {
+
+ // A temporary configuration directory
+ char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
+
+ // To accomodate "/conf/taskcontroller.cfg"
+ char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
+
+ strcpy(template, conf_dir_templ);
+ char *temp_dir = mkdtemp(template);
+ if (temp_dir == NULL) {
+ printf("Couldn't create a temporary dir for conf.\n");
+ goto cleanup;
+ }
+
+ // Set the configuration directory
+ hadoop_conf_dir = strdup(temp_dir);
+
+ // create the configuration directory
+ strcat(template, "/conf");
+ char *conf_dir = strdup(template);
+ mkdir(conf_dir, S_IRWXU);
+
+ // create the configuration file
+ strcat(template, "/taskcontroller.cfg");
+ if (write_config_file(template) != 0) {
+ printf("Couldn't write the configuration file.\n");
+ goto cleanup;
+ }
+
+ // Test obtaining a value for a key from the config
+ char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
+ "/tmp/testing3", "/tmp/testing4" };
+ char *value = (char *) get_value("mapred.local.dir");
+ if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
+ != 0) {
+ printf("Obtaining a value for a key from the config failed.\n");
+ goto cleanup;
+ }
+
+ // Test the parsing of a multiple valued key from the config
+ char **values = (char **)get_values("mapred.local.dir");
+ char **values_ptr = values;
+ int i = 0;
+ while (*values_ptr != NULL) {
+ printf(" value : %s\n", *values_ptr);
+ if (strcmp(*values_ptr, config_values[i++]) != 0) {
+ printf("Configured values are not read out properly. Test failed!");
+ goto cleanup;;
+ }
+ values_ptr++;
+ }
+
+ if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) {
+ printf("Configuration should not contain /tmp/testing5! \n");
+ goto cleanup;
+ }
+
+ if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) {
+ printf("Configuration should contain /tmp/testing4! \n");
+ goto cleanup;
+ }
+
+ cleanup: if (value != NULL) {
+ free(value);
+ }
+ if (values != NULL) {
+ free(values);
+ }
+ if (hadoop_conf_dir != NULL) {
+ free(hadoop_conf_dir);
+ }
+ unlink(template);
+ rmdir(conf_dir);
+ rmdir(hadoop_conf_dir);
+}
+
+void test_get_job_directory() {
+ char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+ printf("job_dir obtained is %s\n", job_dir);
+ int ret = 0;
+ if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+ ret = -1;
+ }
+ free(job_dir);
+ assert(ret == 0);
+}
+
+void test_get_attempt_directory() {
+ char *attempt_dir = (char *) get_attempt_directory(
+ "/tmp/taskTracker/jobcache/job_200906101234_0001",
+ "attempt_200906112028_0001_m_000000_0");
+ printf("attempt_dir obtained is %s\n", attempt_dir);
+ int ret = 0;
+ if (strcmp(
+ attempt_dir,
+ "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+ != 0) {
+ ret = -1;
+ }
+ free(attempt_dir);
+ assert(ret == 0);
+}
+
+void test_get_task_launcher_file() {
+ char
+ *task_file =
+ (char *) get_task_launcher_file(
+ "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0");
+ printf("task_file obtained is %s\n", task_file);
+ int ret = 0;
+ if (strcmp(
+ task_file,
+ "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+ != 0) {
+ ret = -1;
+ }
+ free(task_file);
+ assert(ret == 0);
+}
+
+void test_get_task_log_dir() {
+ char *logdir = (char *) get_task_log_dir("/tmp/testing",
+ "attempt_200906112028_0001_m_000000_0");
+ printf("logdir obtained is %s\n", logdir);
+ int ret = 0;
+ if (strcmp(logdir,
+ "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+ ret = -1;
+ }
+ free(logdir);
+ assert(ret == 0);
+}
+
+int main(int argc, char **argv) {
+ printf("Starting tests\n");
+ LOGFILE = stdout;
+ test_check_variable_against_config();
+ test_get_job_directory();
+ test_get_attempt_directory();
+ test_get_task_launcher_file();
+ test_get_task_log_dir();
+ printf("Finished tests\n");
+ return 0;
+}
diff --git src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
index 8e6842e..8439824 100644
--- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
+++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
@@ -65,7 +65,7 @@ public class TestStreamingAsDifferentUser extends
"stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") };
StreamJob streamJob = new StreamJob(args, true);
streamJob.setConf(myConf);
- streamJob.go();
+ assertTrue("Job has not succeeded", streamJob.go() == 0);
assertOwnerShip(outputPath);
}
}
diff --git src/java/org/apache/hadoop/mapred/BackupStore.java src/java/org/apache/hadoop/mapred/BackupStore.java
index 52d0175..77724a3 100644
--- src/java/org/apache/hadoop/mapred/BackupStore.java
+++ src/java/org/apache/hadoop/mapred/BackupStore.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
@@ -548,10 +547,9 @@ public class BackupStore {
boolean isActive() { return isActive; }
private Writer createSpillFile() throws IOException {
- Path tmp = new Path(
- TaskTracker.getIntermediateOutputDir(
- tid.getJobID().toString(), tid.toString()) +
- "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out");
+ Path tmp =
+ new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_"
+ + (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
diff --git src/java/org/apache/hadoop/mapred/Child.java src/java/org/apache/hadoop/mapred/Child.java
index 3ea7dc8..ab593cf 100644
--- src/java/org/apache/hadoop/mapred/Child.java
+++ src/java/org/apache/hadoop/mapred/Child.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.log4j.LogManager;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
/**
* The main() for child processes.
@@ -168,14 +169,15 @@ class Child {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage());
} catch (Throwable throwable) {
- LOG.warn("Error running child", throwable);
+ LOG.warn("Error running child : "
+ + StringUtils.stringifyException(throwable));
try {
if (task != null) {
// do cleanup for the task
task.taskCleanup(umbilical);
}
} catch (Throwable th) {
- LOG.info("Error cleaning up" + th);
+ LOG.info("Error cleaning up : " + StringUtils.stringifyException(th));
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git src/java/org/apache/hadoop/mapred/DefaultTaskController.java src/java/org/apache/hadoop/mapred/DefaultTaskController.java
index 174403b..c8e42bf 100644
--- src/java/org/apache/hadoop/mapred/DefaultTaskController.java
+++ src/java/org/apache/hadoop/mapred/DefaultTaskController.java
@@ -85,7 +85,7 @@ class DefaultTaskController extends TaskController {
* extra from what TaskTracker has done.
*/
@Override
- void initializeJob(JobID jobId) {
+ void initializeJob(InitializeJobContext context) {
}
@Override
diff --git src/java/org/apache/hadoop/mapred/IsolationRunner.java src/java/org/apache/hadoop/mapred/IsolationRunner.java
index 4ae1b2a..6d302f2 100644
--- src/java/org/apache/hadoop/mapred/IsolationRunner.java
+++ src/java/org/apache/hadoop/mapred/IsolationRunner.java
@@ -31,13 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JvmTask;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
* IsolationRunner is intended to facilitate debugging by re-running a specific
@@ -169,17 +164,23 @@ public class IsolationRunner {
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf);
- local.setWorkingDirectory(new Path(workDirName.toString()));
+ // Child's conf.xml is the sandboxed version, so the mapred.local.dir list
+ // consists of attempt-dirs.
+ Path workDirName = lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+ local.setWorkingDirectory(workDirName);
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
// set up a classloader with the right classpath
- ClassLoader classLoader = makeClassLoader(conf, workDirName);
+ ClassLoader classLoader =
+ makeClassLoader(conf, new File(workDirName.toString()));
Thread.currentThread().setContextClassLoader(classLoader);
conf.setClassLoader(classLoader);
-
- Path localSplit = new Path(new Path(jobFilename.toString()).getParent(),
- "split.dta");
+
+ // Child's conf.xml is the sandboxed version, so the mapred.local.dir list
+ // consists of attempt-dirs.
+ Path localSplit =
+ new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+ TaskTracker.LOCAL_SPLIT_FILE, conf);
DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit);
String splitClass = Text.readString(splitFile);
BytesWritable split = new BytesWritable();
diff --git src/java/org/apache/hadoop/mapred/JvmManager.java src/java/org/apache/hadoop/mapred/JvmManager.java
index c2a463b..d1566aa 100644
--- src/java/org/apache/hadoop/mapred/JvmManager.java
+++ src/java/org/apache/hadoop/mapred/JvmManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.StringUtils;
class JvmManager {
@@ -111,7 +112,8 @@ class JvmManager {
}
}
- public TaskInProgress getTaskForJvm(JVMId jvmId) {
+ public TaskInProgress getTaskForJvm(JVMId jvmId)
+ throws IOException {
if (jvmId.isMapJVM()) {
return mapJvmManager.getTaskForJvm(jvmId);
} else {
@@ -177,7 +179,8 @@ class JvmManager {
jvmIdToRunner.get(jvmId).setBusy(true);
}
- synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) {
+ synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
+ throws IOException {
if (jvmToRunningTask.containsKey(jvmId)) {
//Incase of JVM reuse, tasks are returned to previously launched
//JVM via this method. However when a new task is launched
@@ -185,16 +188,26 @@ class JvmManager {
TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
Task task = taskRunner.getTaskInProgress().getTask();
- TaskControllerContext context =
- new TaskController.TaskControllerContext();
+
+ // Initialize task dirs
+ TaskControllerContext context =
+ new TaskController.TaskControllerContext();
context.env = jvmRunner.env;
context.task = task;
- //If we are returning the same task as which the JVM was launched
- //we don't initialize task once again.
- if(!jvmRunner.env.conf.get("mapred.task.id").
- equals(task.getTaskID().toString())) {
- tracker.getTaskController().initializeTask(context);
+ // If we are returning the same task as which the JVM was launched
+ // we don't initialize task once again.
+ if (!jvmRunner.env.conf.get("mapred.task.id").equals(
+ task.getTaskID().toString())) {
+ try {
+ tracker.getTaskController().initializeTask(context);
+ } catch (IOException e) {
+ LOG.warn("Failed to initialize the new task "
+ + task.getTaskID().toString() + " to be given to JVM with id "
+ + jvmId);
+ throw e;
+ }
}
+
return taskRunner.getTaskInProgress();
}
return null;
@@ -393,23 +406,24 @@ class JvmManager {
//Launch the task controller to run task JVM
initalContext.task = jvmToRunningTask.get(jvmId).getTask();
initalContext.env = env;
- tracker.getTaskController().initializeTask(initalContext);
tracker.getTaskController().launchTaskJVM(initalContext);
} catch (IOException ioe) {
// do nothing
// error and output are appropriately redirected
+ LOG.info("Child exited with the exception "
+ + StringUtils.stringifyException(ioe));
} finally { // handle the exit code
shexec = initalContext.shExec;
if (shexec == null) {
return;
}
-
+
kill();
-
+
int exitCode = shexec.getExitCode();
updateOnJvmExit(jvmId, exitCode);
- LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
- numTasksRan);
+ LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
+ + ". Number of tasks it ran: " + numTasksRan);
try {
// In case of jvm-reuse,
//the task jvm cleans up the common workdir for every
@@ -438,6 +452,7 @@ class JvmManager {
.getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ // Destroy the task jvm
controller.destroyTaskJVM(initalContext);
} else {
LOG.info(String.format("JVM Not killed %s but just removed", jvmId
diff --git src/java/org/apache/hadoop/mapred/LinuxTaskController.java src/java/org/apache/hadoop/mapred/LinuxTaskController.java
index c21ccb1..f67bcc0 100644
--- src/java/org/apache/hadoop/mapred/LinuxTaskController.java
+++ src/java/org/apache/hadoop/mapred/LinuxTaskController.java
@@ -24,13 +24,13 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -73,44 +73,19 @@ class LinuxTaskController extends TaskController {
new File(hadoopBin, "task-controller").getAbsolutePath();
}
- // The list of directory paths specified in the
- // variable mapred.local.dir. This is used to determine
- // which among the list of directories is picked up
- // for storing data for a particular task.
- private String[] mapredLocalDirs;
-
- // permissions to set on files and directories created.
- // When localized files are handled securely, this string
- // will change to something more restrictive. Until then,
- // it opens up the permissions for all, so that the tasktracker
- // and job owners can access files together.
- private static final String FILE_PERMISSIONS = "ugo+rwx";
-
- // permissions to set on components of the path leading to
- // localized files and directories. Read and execute permissions
- // are required for different users to be able to access the
- // files.
- private static final String PATH_PERMISSIONS = "go+rx";
-
public LinuxTaskController() {
super();
}
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- mapredLocalDirs = conf.getStrings("mapred.local.dir");
- //Setting of the permissions of the local directory is done in
- //setup()
- }
-
/**
* List of commands that the setuid script will execute.
*/
enum TaskCommands {
+ INITIALIZE_JOB,
LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM
+ KILL_TASK_JVM,
}
/**
@@ -150,48 +125,105 @@ class LinuxTaskController extends TaskController {
ShellCommandExecutor shExec = buildTaskControllerExecutor(
TaskCommands.LAUNCH_TASK_JVM,
env.conf.getUser(),
- launchTaskJVMArgs, env);
+ launchTaskJVMArgs, env.workDir, env.env);
context.shExec = shExec;
try {
shExec.execute();
} catch (Exception e) {
- LOG.warn("Exception thrown while launching task JVM : " +
- StringUtils.stringifyException(e));
+ // TODO: check for 143 (SIGTERM) and 137 (SIGKILL) exit codes
+ LOG.warn("Exception thrown while launching task JVM : "
+ + StringUtils.stringifyException(e));
LOG.warn("Exit code from task is : " + shExec.getExitCode());
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
throw new IOException(e);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
}
}
/**
- * Returns list of arguments to be passed while launching task VM.
- * See {@code buildTaskControllerExecutor(TaskCommands,
- * String, List, JvmEnv)} documentation.
+ * Helper method that runs a LinuxTaskController command
+ *
+ * @param taskCommand
+ * @param user
+ * @param cmdArgs
+ * @param env
+ * @throws IOException
+ */
+ private void runCommand(TaskCommands taskCommand, String user,
+ List cmdArgs, File workDir, Map env)
+ throws IOException {
+
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+ + shExec.getExitCode());
+ LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+ + " follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+ + " follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+ /**
+ * Returns list of arguments to be passed while initializing a new task. See
+ * {@code buildTaskControllerExecutor(TaskCommands, String, List,
+ * JvmEnv)} documentation.
+ *
* @param context
* @return Argument to be used while launching Task VM
*/
- private List buildLaunchTaskArgs(TaskControllerContext context) {
+ private List buildInitializeTaskArgs(TaskControllerContext context) {
List commandArgs = new ArrayList(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
- LOG.debug("getting the task directory as: "
- + getTaskCacheDirectory(context));
- commandArgs.add(getDirectoryChosenForTask(
- new File(getTaskCacheDirectory(context)),
- context));
commandArgs.add(jobId);
- if(!context.task.isTaskCleanupTask()) {
+ if (!context.task.isTaskCleanupTask()) {
commandArgs.add(taskId);
- }else {
+ } else {
commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
}
+ File logDir = new File(TaskLog.getBaseLogDir());
+ try {
+ commandArgs.add(logDir.getCanonicalPath());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
return commandArgs;
}
-
- // get the Job ID from the information in the TaskControllerContext
+
+ @Override
+ void initializeTask(TaskControllerContext context)
+ throws IOException {
+ LOG.info("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+ + " for " + context.task.getTaskID().toString());
+ runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+ buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+ }
+
+ private void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
+ }
+ }
+ }
+
private String getJobId(TaskControllerContext context) {
String taskId = context.task.getTaskID().toString();
TaskAttemptID tId = TaskAttemptID.forName(taskId);
@@ -199,6 +231,29 @@ class LinuxTaskController extends TaskController {
return jobId;
}
+ /**
+ * Returns list of arguments to be passed while launching task VM.
+ * See {@code buildTaskControllerExecutor(TaskCommands,
+ * String, List, JvmEnv)} documentation.
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List buildLaunchTaskArgs(TaskControllerContext context) {
+ List commandArgs = new ArrayList(3);
+ String taskId = context.task.getTaskID().toString();
+ String jobId = getJobId(context);
+ LOG.info("getting the task directory as: "
+ + getTaskCacheDirectory(context));
+ LOG.info("getting the tt_root as " +getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context) );
+ commandArgs.add(getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context));
+ commandArgs.addAll(buildInitializeTaskArgs(context));
+ return commandArgs;
+ }
+
// Get the directory from the list of directories configured
// in mapred.local.dir chosen for storing data pertaining to
// this task.
@@ -208,8 +263,9 @@ class LinuxTaskController extends TaskController {
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
File mapredDir = new File(dir);
- File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
- jobId, taskId, context.task.isTaskCleanupTask()));
+ // TODO: fix
+ File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
+ jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
if (directory.equals(taskDir)) {
return dir;
}
@@ -219,68 +275,7 @@ class LinuxTaskController extends TaskController {
throw new IllegalArgumentException("invalid task cache directory "
+ directory.getAbsolutePath());
}
-
- /**
- * Setup appropriate permissions for directories and files that
- * are used by the task.
- *
- * As the LinuxTaskController launches tasks as a user, different
- * from the daemon, all directories and files that are potentially
- * used by the tasks are setup with appropriate permissions that
- * will allow access.
- *
- * Until secure data handling is implemented (see HADOOP-4491 and
- * HADOOP-4493, for e.g.), the permissions are set up to allow
- * read, write and execute access for everyone. This will be
- * changed to restricted access as data is handled securely.
- */
- void initializeTask(TaskControllerContext context) {
- // Setup permissions for the job and task cache directories.
- setupTaskCacheFileAccess(context);
- // setup permissions for task log directory
- setupTaskLogFileAccess(context);
- }
-
- // Allows access for the task to create log files under
- // the task log directory
- private void setupTaskLogFileAccess(TaskControllerContext context) {
- TaskAttemptID taskId = context.task.getTaskID();
- File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
- String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
- changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
- }
-
- // Allows access for the task to read, write and execute
- // the files under the job and task cache directories
- private void setupTaskCacheFileAccess(TaskControllerContext context) {
- String taskId = context.task.getTaskID().toString();
- JobID jobId = JobID.forName(getJobId(context));
- //Change permission for the task across all the disks
- for(String localDir : mapredLocalDirs) {
- File f = new File(localDir);
- File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
- jobId.toString(), taskId, context.task.isTaskCleanupTask()));
- if(taskCacheDir.exists()) {
- changeDirectoryPermissions(taskCacheDir.getPath(),
- FILE_PERMISSIONS, true);
- }
- }//end of local directory Iteration
- }
- // convenience method to execute chmod.
- private void changeDirectoryPermissions(String dir, String mode,
- boolean isRecursive) {
- int ret = 0;
- try {
- ret = FileUtil.chmod(dir, mode, isRecursive);
- } catch (Exception e) {
- LOG.warn("Exception in changing permissions for directory " + dir +
- ". Exception: " + e.getMessage());
- }
- if (ret != 0) {
- LOG.warn("Could not change permissions for directory " + dir);
- }
- }
/**
* Builds the command line for launching/terminating/killing task JVM.
* Following is the format for launching/terminating/killing task JVM
@@ -295,14 +290,14 @@ class LinuxTaskController extends TaskController {
* @param command command to be executed.
* @param userName user name
* @param cmdArgs list of extra arguments
- * @param env JVM environment variables.
+ * @param env JVM environment variables. : TODO: fix
* @return {@link ShellCommandExecutor}
* @throws IOException
*/
- private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command,
- String userName,
- List cmdArgs, JvmEnv env)
- throws IOException {
+ private ShellCommandExecutor buildTaskControllerExecutor(
+ TaskCommands command, String userName, List cmdArgs,
+ File workDir, Map env)
+ throws IOException {
String[] taskControllerCmd = new String[3 + cmdArgs.size()];
taskControllerCmd[0] = getTaskControllerExecutablePath();
taskControllerCmd[1] = userName;
@@ -317,9 +312,9 @@ class LinuxTaskController extends TaskController {
}
}
ShellCommandExecutor shExec = null;
- if(env.workDir != null && env.workDir.exists()) {
+ if(workDir != null && workDir.exists()) {
shExec = new ShellCommandExecutor(taskControllerCmd,
- env.workDir, env.env);
+ workDir, env);
} else {
shExec = new ShellCommandExecutor(taskControllerCmd);
}
@@ -333,6 +328,7 @@ class LinuxTaskController extends TaskController {
// is different from what is set with respect with
// env.workDir. Hence building this from the taskId everytime.
String taskId = context.task.getTaskID().toString();
+ // TODO: fix the following line or document it.
File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
if(context.task.isTaskCleanupTask()) {
taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
@@ -371,68 +367,27 @@ class LinuxTaskController extends TaskController {
}
}
}
-
- /**
- * Sets up the permissions of the following directories:
- *
- * Job cache directory
- * Archive directory
- * Hadoop log directories
- *
- */
- @Override
- void setup() {
- //set up job cache directory and associated permissions
- String localDirs[] = this.mapredLocalDirs;
- for(String localDir : localDirs) {
- //Cache root
- File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
- File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
- if(!cacheDirectory.exists()) {
- if(!cacheDirectory.mkdirs()) {
- LOG.warn("Unable to create cache directory : " +
- cacheDirectory.getPath());
- }
- }
- if(!jobCacheDirectory.exists()) {
- if(!jobCacheDirectory.mkdirs()) {
- LOG.warn("Unable to create job cache directory : " +
- jobCacheDirectory.getPath());
- }
- }
- //Give world writable permission for every directory under
- //mapred-local-dir.
- //Child tries to write files under it when executing.
- changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
- }//end of local directory manipulations
- //setting up perms for user logs
- File taskLog = TaskLog.getUserLogDir();
- changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+ private List buildInitializeJobCommandArgs(
+ InitializeJobContext context) {
+ List initJobCmdArgs = new ArrayList();
+ initJobCmdArgs.add(context.jobid.toString());
+ return initJobCmdArgs;
}
/*
+ *TODO: fix
* Create Job directories across disks and set their permissions to 777
* This way when tasks are run we just need to setup permissions for
* task folder.
*/
@Override
- void initializeJob(JobID jobid) {
- for(String localDir : this.mapredLocalDirs) {
- File jobDirectory = new File(localDir,
- TaskTracker.getLocalJobDir(jobid.toString()));
- if(!jobDirectory.exists()) {
- if(!jobDirectory.mkdir()) {
- LOG.warn("Unable to create job cache directory : "
- + jobDirectory.getPath());
- continue;
- }
- }
- //Should be recursive because the jar and work folders might be
- //present under the job cache directory
- changeDirectoryPermissions(
- jobDirectory.getPath(), FILE_PERMISSIONS, true);
- }
+ void initializeJob(InitializeJobContext context)
+ throws IOException {
+ LOG.info("Going to initialize job " + context.jobid.toString()
+ + " on the TT");
+ runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+ buildInitializeJobCommandArgs(context), context.workDir, null);
}
/**
@@ -467,7 +422,7 @@ class LinuxTaskController extends TaskController {
}
ShellCommandExecutor shExec = buildTaskControllerExecutor(
command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env);
+ buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
try {
shExec.execute();
} catch (Exception e) {
@@ -498,6 +453,5 @@ class LinuxTaskController extends TaskController {
protected String getTaskControllerExecutablePath() {
return taskControllerExe;
- }
+ }
}
-
diff --git src/java/org/apache/hadoop/mapred/LocalJobRunner.java src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index bc80fd4..6df3093 100644
--- src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
@@ -64,7 +64,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
private JobStatus status;
private ArrayList mapIds = new ArrayList();
- private MapOutputFile mapoutputFile;
+
private JobProfile profile;
private Path localFile;
private FileSystem localFs;
@@ -84,8 +84,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
public Job(JobID jobid, JobConf conf) throws IOException {
this.file = new Path(getSystemDir(), jobid + "/job.xml");
this.id = jobid;
- this.mapoutputFile = new MapOutputFile(jobid);
- this.mapoutputFile.setConf(conf);
this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
this.localFs = FileSystem.getLocal(conf);
@@ -160,7 +158,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
-
+
+ Map mapOutputFiles =
+ new HashMap();
for (int i = 0; i < rawSplits.length; i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = new TaskAttemptID(
@@ -171,6 +171,12 @@ class LocalJobRunner implements JobSubmissionProtocol {
rawSplits[i].getClassName(),
rawSplits[i].getBytes(), 1);
JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+ MapOutputFile mapOutput = new MapOutputFile(map.getJobID());
+ mapOutput.setConf(localConf);
+ mapOutputFiles.put(mapId, mapOutput);
+
map.setJobFile(localFile.toString());
map.localizeConfiguration(localConf);
map.setConf(localConf);
@@ -188,14 +194,20 @@ class LocalJobRunner implements JobSubmissionProtocol {
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
try {
if (numReduceTasks > 0) {
+ ReduceTask reduce = new ReduceTask(file.toString(),
+ reduceId, 0, mapIds.size(), 1);
+ JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn = this.mapoutputFile.getInputFileForWrite(
- mapId.getTaskID(),reduceId,
- localFs.getFileStatus(mapOut).getLen());
+ Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+ MapOutputFile localOutputFile = new MapOutputFile(jobId);
+ localOutputFile.setConf(localConf);
+ Path reduceIn =
+ localOutputFile.getInputFileForWrite(mapId.getTaskID(),
+ localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@@ -207,9 +219,6 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
}
if (!this.isInterrupted()) {
- ReduceTask reduce = new ReduceTask(file.toString(),
- reduceId, 0, mapIds.size(), 1);
- JobConf localConf = new JobConf(job);
reduce.setJobFile(localFile.toString());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
@@ -224,11 +233,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
}
} finally {
- for (TaskAttemptID mapId: mapIds) {
- this.mapoutputFile.removeAll(mapId);
- }
- if (numReduceTasks == 1) {
- this.mapoutputFile.removeAll(reduceId);
+ for (MapOutputFile output : mapOutputFiles.values()) {
+ output.removeAll();
}
}
// delete the temporary directory in output directory
diff --git src/java/org/apache/hadoop/mapred/MapOutputFile.java src/java/org/apache/hadoop/mapred/MapOutputFile.java
index 30b71c9..d4afd05 100644
--- src/java/org/apache/hadoop/mapred/MapOutputFile.java
+++ src/java/org/apache/hadoop/mapred/MapOutputFile.java
@@ -31,7 +31,9 @@ class MapOutputFile {
private JobConf conf;
private JobID jobId;
-
+
+ static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
MapOutputFile() {
}
@@ -42,132 +44,143 @@ class MapOutputFile {
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
- /** Return the path to local map output file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", conf);
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", conf);
}
- /** Create a local map output file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", size, conf);
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", size, conf);
}
- /** Return the path to a local map output index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index", conf);
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", conf);
}
- /** Create a local map output index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output index file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index",
- size, conf);
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", size, conf);
}
- /** Return a local map spill file created earlier.
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill file created earlier.
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill"
- + spillNumber + ".out", conf);
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", conf);
}
- /** Create a local map spill file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out", size, conf);
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", size, conf);
}
- /** Return a local map spill index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill index file created earlier
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out.index", conf);
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", conf);
}
- /** Create a local map spill index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill index file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" + spillNumber +
- ".out.index", size, conf);
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", size, conf);
}
- /** Return a local reduce input file created earlier
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
*/
- public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId + ".out",
- conf);
+ public Path getInputFile(int mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
+ .valueOf(mapId)), conf);
}
- /** Create a local reduce input file name.
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId,
- long size)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId.getId() + ".out",
- size, conf);
+ public Path getInputFileForWrite(TaskID mapId, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
+ size, conf);
}
/** Removes all of the files related to a task. */
- public void removeAll(TaskAttemptID taskId) throws IOException {
- conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), taskId.toString())
-);
+ public void removeAll()
+ throws IOException {
+ conf.deleteLocalFiles(TaskTracker.OUTPUT);
}
public void setConf(Configuration conf) {
diff --git src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/MapTask.java
index 996415d..780af71 100644
--- src/java/org/apache/hadoop/mapred/MapTask.java
+++ src/java/org/apache/hadoop/mapred/MapTask.java
@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -66,7 +67,6 @@ class MapTask extends Task {
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
private BytesWritable split = new BytesWritable();
private String splitClass;
@@ -101,11 +101,20 @@ class MapTask extends Task {
}
@Override
- public void localizeConfiguration(JobConf conf) throws IOException {
+ public void localizeConfiguration(JobConf conf)
+ throws IOException {
super.localizeConfiguration(conf);
- if (isMapOrReduce()) {
- Path localSplit = new Path(new Path(getJobFile()).getParent(),
- "split.dta");
+ // Write the split file to the local disk if it is a normal map task (not a
+ // job-setup or a job-cleanup task) and if the user wishes to run
+ // IsolationRunner either by setting keep.failed.tasks.files to true or by
+ // using keep.tasks.files.pattern
+ if (isMapOrReduce()
+ && (conf.getKeepTaskFilesPattern() != null || conf
+ .getKeepFailedTaskFiles())) {
+ Path localSplit =
+ new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
+ TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
+ .toString()), conf);
LOG.debug("Writing local split to " + localSplit);
DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
Text.writeString(out, splitClass);
@@ -1222,8 +1231,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
@@ -1287,9 +1296,9 @@ class MapTask extends Task {
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1315,8 +1324,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
@@ -1352,9 +1361,9 @@ class MapTask extends Task {
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1444,14 +1453,14 @@ class MapTask extends Task {
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
+ filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+ rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
@@ -1462,7 +1471,7 @@ class MapTask extends Task {
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
@@ -1470,10 +1479,10 @@ class MapTask extends Task {
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
diff --git src/java/org/apache/hadoop/mapred/MapTaskRunner.java src/java/org/apache/hadoop/mapred/MapTaskRunner.java
index 918b2a6..94750d0 100644
--- src/java/org/apache/hadoop/mapred/MapTaskRunner.java
+++ src/java/org/apache/hadoop/mapred/MapTaskRunner.java
@@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner {
return false;
}
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
diff --git src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java
index 9401f90..f01f363 100644
--- src/java/org/apache/hadoop/mapred/ReduceTask.java
+++ src/java/org/apache/hadoop/mapred/ReduceTask.java
@@ -208,7 +208,7 @@ class ReduceTask extends Task {
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+ fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
@@ -1284,12 +1284,11 @@ class ReduceTask extends Task {
// else, we will check the localFS to find a suitable final location
// for this path
TaskAttemptID reduceId = reduceTask.getTaskID();
- Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
- reduceId.getJobID().toString(),
- reduceId.toString())
- + "/map_" +
- loc.getTaskId().getId() + ".out");
-
+ Path filename =
+ new Path(String.format(
+ MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
+ TaskTracker.OUTPUT, loc.getTaskId().getId()));
+
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
@@ -2326,8 +2325,8 @@ class ReduceTask extends Task {
sortPhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), inMemToDiskBytes);
+ final Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
@@ -2621,8 +2620,8 @@ class ReduceTask extends Task {
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), mergeOutputSize);
+ Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
diff --git src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
index 903354e..fc6d1ae 100644
--- src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
+++ src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
@@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunner {
}
// cleanup from failures
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
@@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunner {
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
getTask().getProgress().setStatus("closed");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
diff --git src/java/org/apache/hadoop/mapred/TaskController.java src/java/org/apache/hadoop/mapred/TaskController.java
index 030bf6b..8ec42a3 100644
--- src/java/org/apache/hadoop/mapred/TaskController.java
+++ src/java/org/apache/hadoop/mapred/TaskController.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -45,18 +47,74 @@ abstract class TaskController implements Configurable {
public Configuration getConf() {
return conf;
}
-
+
+ // The list of directory paths specified in the variable mapred.local.dir.
+ // This is used to determine which among the list of directories is picked up
+ // for storing data for a particular task.
+ protected String[] mapredLocalDirs;
+
public void setConf(Configuration conf) {
this.conf = conf;
+ mapredLocalDirs = conf.getStrings("mapred.local.dir");
}
-
+
+ /**
+ * Sets up the permissions of the following directories on all the configured
+ * disks:
+ *
+ * - mapred-local directories
+ * - Job cache directories
+ * - Archive directories
+ * - Hadoop log directories
+ *
+ */
+ void setup() {
+ for (String localDir : this.mapredLocalDirs) {
+ // Set up the mapred-local directories.
+ File mapredlocalDir = new File(localDir);
+ if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
+ LOG.warn("Unable to create mapred-local directory : "
+ + mapredlocalDir.getPath());
+ } else {
+ FileUtil.setPermissions(mapredlocalDir, FileUtil.sevenFiveFive);
+ }
+ // TODO: should throw exception if failed on all disks?
+
+ // Set up the cache directory used for distributed cache files
+ File distributedCacheDir = new File(localDir, TaskTracker.getDistributedCacheDir());
+ if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
+ LOG.warn("Unable to create cache directory : "
+ + distributedCacheDir.getPath());
+ } else {
+ FileUtil.setPermissions(distributedCacheDir, FileUtil.sevenFiveFive);
+ }
+
+ // Set up the jobcache directory
+ File jobCacheDir =
+ new File(localDir, TaskTracker.getJobCacheSubdir());
+ if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
+ LOG.warn("Unable to create job cache directory : "
+ + jobCacheDir.getPath());
+ } else {
+ FileUtil.setPermissions(jobCacheDir, FileUtil.sevenFiveFive);
+ }
+ }
+
+ // Set up the user log directory
+ File taskLog = TaskLog.getUserLogDir();
+ FileUtil.setPermissions(taskLog, FileUtil.sevenFiveFive);
+ }
+
/**
- * Setup task controller component.
+ * Take task-controller specific actions to initialize job. This involves
+ * moving the jobfiles i.e. contents of the directory
+ * taskTracker/system/jobid/userfiles to taskTracker/user/jobid/ so as to
+ * enable the launch tasks to share the contents from the users directory.
*
+ * @throws IOException
*/
- abstract void setup();
-
-
+ abstract void initializeJob(InitializeJobContext context) throws IOException;
+
/**
* Launch a task JVM
*
@@ -65,7 +123,7 @@ abstract class TaskController implements Configurable {
*/
abstract void launchTaskJVM(TaskControllerContext context)
throws IOException;
-
+
/**
* Top level cleanup a task JVM method.
*
@@ -90,47 +148,44 @@ abstract class TaskController implements Configurable {
}
killTask(context);
}
-
- /**
- * Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, PID and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context);
-
-
+
+ /** Perform initializing actions required before a task can run.
+ *
+ * For instance, this method can be used to setup appropriate
+ * access permissions for files and directories that will be
+ * used by tasks. Tasks use the job cache, log, PID and distributed cache
+ * directories and files as part of their functioning. Typically,
+ * these files are shared between the daemon and the tasks
+ * themselves. So, a TaskController that is launching tasks
+ * as different users can implement this method to setup
+ * appropriate ownership and permissions for these directories
+ * and files.
+ */
+ abstract void initializeTask(TaskControllerContext context)
+ throws IOException;
+
/**
* Contains task information required for the task controller.
*/
static class TaskControllerContext {
// task being executed
- Task task;
- // the JVM environment for the task
- JvmEnv env;
- // the Shell executor executing the JVM for this task
- ShellCommandExecutor shExec;
- // process handle of task JVM
- String pid;
- // waiting time before sending SIGKILL to task JVM after sending SIGTERM
- long sleeptimeBeforeSigkill;
+ Task task;
+ ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
+
+ // Information used only when this context is used for launching new tasks.
+ JvmEnv env; // the JVM environment for the task.
+
+ // Information used only when this context is used for destroying a task jvm.
+ String pid; // process handle of task JVM.
+ long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+ }
+
+ static class InitializeJobContext {
+ JobID jobid;
+ File workDir;
+ String user;
}
- /**
- * Method which is called after the job is localized so that task controllers
- * can implement their own job localization logic.
- *
- * @param tip Task of job for which localization happens.
- */
- abstract void initializeJob(JobID jobId);
-
/**
* Sends a graceful terminate signal to taskJVM and it sub-processes.
*
@@ -144,6 +199,5 @@ abstract class TaskController implements Configurable {
*
* @param context task context
*/
-
abstract void killTask(TaskControllerContext context);
}
diff --git src/java/org/apache/hadoop/mapred/TaskLog.java src/java/org/apache/hadoop/mapred/TaskLog.java
index 9baafd8..2091cfc 100644
--- src/java/org/apache/hadoop/mapred/TaskLog.java
+++ src/java/org/apache/hadoop/mapred/TaskLog.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -55,8 +56,7 @@ public class TaskLog {
LogFactory.getLog(TaskLog.class);
private static final File LOG_DIR =
- new File(System.getProperty("hadoop.log.dir"),
- "userlogs").getAbsoluteFile();
+ new File(getBaseLogDir(), "userlogs").getAbsoluteFile();
static LocalFileSystem localFS = null;
static {
@@ -156,8 +156,12 @@ public class TaskLog {
return new File(getBaseDir(taskid), "log.index");
}
}
-
- private static File getBaseDir(String taskid) {
+
+ static String getBaseLogDir() {
+ return System.getProperty("hadoop.log.dir");
+ }
+
+ static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
private static long prevOutLength;
diff --git src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskRunner.java
index 86c5868..c80208f 100644
--- src/java/org/apache/hadoop/mapred/TaskRunner.java
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java
@@ -121,213 +121,51 @@ abstract class TaskRunner extends Thread {
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
+
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
+ // We don't create any symlinks yet, so presence/absence of workDir
+ // actually on the file system doesn't matter.
setupDistributedCache(lDirAlloc, workDir, archives, files);
-
- if (!prepare()) {
- return;
- }
- // Accumulates class paths for child.
- List classPaths = new ArrayList();
- // start with same classpath as parent process
- appendSystemClasspaths(classPaths);
+ // Set up the child task's configuration. After this call, no localization
+ // of files should happen in the TaskTracker's process space. Any changes to
+ // the conf object after this will NOT be reflected to the child.
+ setupChildTaskConfiguration(lDirAlloc);
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- LOG.fatal("Mkdirs failed to create " + workDir.toString());
- }
+ if (!prepare()) {
+ return;
}
- // include the user specified classpath
- appendJobJarClasspaths(conf.getJar(), classPaths);
-
- // Distributed cache paths
- appendDistributedCacheClasspaths(conf, archives, files, classPaths);
-
- // Include the working dir too
- classPaths.add(workDir.toString());
-
// Build classpath
-
-
- // Build exec child JVM args.
- Vector vargs = new Vector(8);
- File jvm = // use same jvm as parent
- new File(new File(System.getProperty("java.home"), "bin"), "java");
-
- vargs.add(jvm.toString());
-
- // Add child (task) java-vm options.
- //
- // The following symbols if present in mapred.child.java.opts value are
- // replaced:
- // + @taskid@ is interpolated with value of TaskID.
- // Other occurrences of @ will not be altered.
- //
- // Example with multiple arguments and substitutions, showing
- // jvm GC logging, and start of a passwordless JVM JMX agent so can
- // connect with jconsole and the likes to watch child memory, threads
- // and get thread dumps.
- //
- //
- // mapred.child.java.opts
- // -verbose:gc -Xloggc:/tmp/@taskid@.gc \
- // -Dcom.sun.management.jmxremote.authenticate=false \
- // -Dcom.sun.management.jmxremote.ssl=false \
- //
- //
- //
- String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
- javaOpts = javaOpts.replace("@taskid@", taskid.toString());
- String [] javaOptsSplit = javaOpts.split(" ");
-
- // Add java.library.path; necessary for loading native libraries.
- //
- // 1. To support native-hadoop library i.e. libhadoop.so, we add the
- // parent processes' java.library.path to the child.
- // 2. We also add the 'cwd' of the task to it's java.library.path to help
- // users distribute native libraries via the DistributedCache.
- // 3. The user can also specify extra paths to be added to the
- // java.library.path via mapred.child.java.opts.
- //
- String libraryPath = System.getProperty("java.library.path");
- if (libraryPath == null) {
- libraryPath = workDir.getAbsolutePath();
- } else {
- libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
- }
- boolean hasUserLDPath = false;
- for(int i=0; i classPaths = getClassPaths(conf, workDir, archives, files);
- // Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
- vargs.add("-Dhadoop.log.dir=" +
- new File(System.getProperty("hadoop.log.dir")
- ).getAbsolutePath());
- vargs.add("-Dhadoop.root.logger=INFO,TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
-
- if (conf.getProfileEnabled()) {
- if (conf.getProfileTaskRange(t.isMapTask()
- ).isIncluded(t.getPartition())) {
- File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
- vargs.add(String.format(conf.getProfileParams(), prof.toString()));
- }
- }
- // Add main class and its arguments
- vargs.add(Child.class.getName()); // main of Child
- // pass umbilical address
- InetSocketAddress address = tracker.getTaskTrackerReportAddress();
- vargs.add(address.getAddress().getHostAddress());
- vargs.add(Integer.toString(address.getPort()));
- vargs.add(taskid.toString()); // pass task identifier
+ // Build exec child JVM args.
+ Vector vargs =
+ getVMArgs(taskid, workDir, classPaths, logSize);
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
- List setup = null;
- if (ulimitCmd != null) {
- setup = new ArrayList();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
- }
+ List setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
- boolean b = stdout.getParentFile().mkdirs();
+ File logDir = stdout.getParentFile();
+ boolean b = logDir.mkdirs();
if (!b) {
LOG.warn("mkdirs failed. Ignoring");
+ } else {
+ // TODO: This is not needed once MAPREDUCE-AAAA
+ FileUtil.setPermissions(logDir, FileUtil.sevenSevenSeven);
}
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
Map env = new HashMap();
- StringBuffer ldLibraryPath = new StringBuffer();
- ldLibraryPath.append(workDir.toString());
- String oldLdLibraryPath = null;
- oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
- if (oldLdLibraryPath != null) {
- ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
- ldLibraryPath.append(oldLdLibraryPath);
- }
- env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-
- // add the env variables passed by the user
- String mapredChildEnv = conf.get("mapred.child.env");
- if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
- String childEnvs[] = mapredChildEnv.split(",");
- for (String cEnv : childEnvs) {
- try {
- String[] parts = cEnv.split("="); // split on '='
- String value = env.get(parts[0]);
- if (value != null) {
- // replace $env with the child's env constructed by tt's
- // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // this key is not configured by the tt for the child .. get it
- // from the tt's env
- // example PATH=$PATH:/tmp
- value = System.getenv(parts[0]);
- if (value != null) {
- // the env key is present in the tt's env
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // the env key is note present anywhere .. simply set it
- // example X=$X:/tmp or X=/tmp
- value = parts[1].replace("$" + parts[0], "");
- }
- }
- env.put(parts[0], value);
- } catch (Throwable t) {
- // set the error msg
- errorInfo = "Invalid User environment settings : " + mapredChildEnv
- + ". Failed to parse user-passed environment param."
- + " Expecting : env1=value1,env2=value2...";
- LOG.warn(errorInfo);
- throw t;
- }
- }
- }
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -355,7 +193,7 @@ abstract class TaskRunner extends Thread {
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
} catch (Throwable throwable) {
- LOG.warn(t.getTaskID() + errorInfo, throwable);
+ LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
Throwable causeThrowable = new Throwable(errorInfo, throwable);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
causeThrowable.printStackTrace(new PrintStream(baos));
@@ -385,15 +223,309 @@ abstract class TaskRunner extends Thread {
}
}
+ /**
+ * @param lDirAlloc
+ * @throws IOException
+ */
+ void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
+ throws IOException {
+
+ Path localTaskFile =
+ lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
+ .getJobID().toString(), t.getTaskID().toString(), t
+ .isTaskCleanupTask()), conf);
+
+ // setup the child's mapred-local-dir. This is done just before writing the
+ // task file to the disk.
+ setupChildMapredLocalDirs(t, conf);
+
+ // write the child's task configuration file to the local disk
+ writeLocalTaskFile(localTaskFile.toString(), conf);
+
+ // Set the final job file in the task. The child needs to know the correct
+ // path to job.xml. So set this path accordingly.
+ LOG.info(" $$$$$$$$$$$$ jobfile " + localTaskFile);
+ t.setJobFile(localTaskFile.toString());
+ }
+
+ /**
+ * @return
+ */
+ private List getVMSetupCmd() {
+ String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+ List setup = null;
+ if (ulimitCmd != null) {
+ setup = new ArrayList();
+ for (String arg : ulimitCmd) {
+ setup.add(arg);
+ }
+ }
+ return setup;
+ }
+
+ /**
+ * @param taskid
+ * @param workDir
+ * @param classPaths
+ * @param logSize
+ * @return
+ * @throws IOException
+ */
+ private Vector getVMArgs(TaskAttemptID taskid, File workDir,
+ List classPaths, long logSize)
+ throws IOException {
+ Vector vargs = new Vector(8);
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+
+ vargs.add(jvm.toString());
+
+ // Add child (task) java-vm options.
+ //
+ // The following symbols if present in mapred.child.java.opts value are
+ // replaced:
+ // + @taskid@ is interpolated with value of TaskID.
+ // Other occurrences of @ will not be altered.
+ //
+ // Example with multiple arguments and substitutions, showing
+ // jvm GC logging, and start of a passwordless JVM JMX agent so can
+ // connect with jconsole and the likes to watch child memory, threads
+ // and get thread dumps.
+ //
+ //
+ // mapred.child.java.opts
+ // -verbose:gc -Xloggc:/tmp/@taskid@.gc \
+ // -Dcom.sun.management.jmxremote.authenticate=false \
+ // -Dcom.sun.management.jmxremote.ssl=false \
+ //
+ //
+ //
+ String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+ javaOpts = javaOpts.replace("@taskid@", taskid.toString());
+ String [] javaOptsSplit = javaOpts.split(" ");
+
+ // Add java.library.path; necessary for loading native libraries.
+ //
+ // 1. To support native-hadoop library i.e. libhadoop.so, we add the
+ // parent processes' java.library.path to the child.
+ // 2. We also add the 'cwd' of the task to it's java.library.path to help
+ // users distribute native libraries via the DistributedCache.
+ // 3. The user can also specify extra paths to be added to the
+ // java.library.path via mapred.child.java.opts.
+ //
+ String libraryPath = System.getProperty("java.library.path");
+ if (libraryPath == null) {
+ libraryPath = workDir.getAbsolutePath();
+ } else {
+ libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
+ }
+ boolean hasUserLDPath = false;
+ for(int i=0; i getClassPaths(JobConf conf, File workDir,
+ URI[] archives, URI[] files)
+ throws IOException {
+ // Accumulates class paths for child.
+ List classPaths = new ArrayList();
+ // start with same classpath as parent process
+ appendSystemClasspaths(classPaths);
+
+ // include the user specified classpath
+ appendJobJarClasspaths(conf.getJar(), classPaths);
+
+ // Distributed cache paths
+ appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+
+ // Include the working dir too
+ classPaths.add(workDir.toString());
+ return classPaths;
+ }
+
+ /**
+ * @param errorInfo
+ * @param workDir
+ * @param env
+ * @return
+ * @throws Throwable
+ */
+ private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
+ Map env)
+ throws Throwable {
+ StringBuffer ldLibraryPath = new StringBuffer();
+ ldLibraryPath.append(workDir.toString());
+ String oldLdLibraryPath = null;
+ oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
+ if (oldLdLibraryPath != null) {
+ ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+ ldLibraryPath.append(oldLdLibraryPath);
+ }
+ env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+
+ // add the env variables passed by the user
+ String mapredChildEnv = conf.get("mapred.child.env");
+ if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+ String childEnvs[] = mapredChildEnv.split(",");
+ for (String cEnv : childEnvs) {
+ try {
+ String[] parts = cEnv.split("="); // split on '='
+ String value = env.get(parts[0]);
+ if (value != null) {
+ // replace $env with the child's env constructed by tt's
+ // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // this key is not configured by the tt for the child .. get it
+ // from the tt's env
+ // example PATH=$PATH:/tmp
+ value = System.getenv(parts[0]);
+ if (value != null) {
+ // the env key is present in the tt's env
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // the env key is note present anywhere .. simply set it
+ // example X=$X:/tmp or X=/tmp
+ value = parts[1].replace("$" + parts[0], "");
+ }
+ }
+ env.put(parts[0], value);
+ } catch (Throwable t) {
+ // set the error msg
+ errorInfo = "Invalid User environment settings : " + mapredChildEnv
+ + ". Failed to parse user-passed environment param."
+ + " Expecting : env1=value1,env2=value2...";
+ LOG.warn(errorInfo);
+ throw t;
+ }
+ }
+ }
+ return errorInfo;
+ }
+
+ /**
+ * Write the task specific job-configuration file.
+ *
+ * @param localFs
+ * @throws IOException
+ */
+ private static void writeLocalTaskFile(String jobFile, JobConf conf)
+ throws IOException {
+ Path localTaskFile = new Path(jobFile);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(localTaskFile, true);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+ * Whenever it uses LocalDirAllocator from now on inside the child, it will
+ * only see files inside the attempt-directory.
+ */
+ static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+ String[] localDirs = conf.getStrings("mapred.local.dir");
+ String jobId = t.getJobID().toString();
+ String taskId = t.getTaskID().toString();
+ boolean isCleanup = t.isTaskCleanupTask();
+ // TODO: check localDirs length
+ String childMapredLocalDir =
+ localDirs[0] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup);
+ for (int i = 1; i < localDirs.length; i++) {
+ childMapredLocalDir +=
+ "," + localDirs[i] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup);
+ }
+ LOG.info("mapred.local.dir for child : " + childMapredLocalDir);
+ conf.set("mapred.local.dir", childMapredLocalDir);
+ }
+
/** Creates the working directory pathname for a task attempt. */
static File formWorkDir(LocalDirAllocator lDirAlloc,
TaskAttemptID task, boolean isCleanup, JobConf conf)
throws IOException {
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.toString(), isCleanup)
- + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
- return workDir;
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.toString(), isCleanup), conf);
+
+ return new File(workDir.toString());
}
private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
@@ -412,7 +544,7 @@ abstract class TaskRunner extends Thread {
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -438,7 +570,7 @@ abstract class TaskRunner extends Thread {
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -455,20 +587,12 @@ abstract class TaskRunner extends Thread {
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
- Path localTaskFile = new Path(t.getJobFile());
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
}
}
- private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives,
- URI[] files, List classPaths) throws IOException {
+ private static void appendDistributedCacheClasspaths(JobConf conf,
+ URI[] archives, URI[] files, List classPaths)
+ throws IOException {
// Archive paths
Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
if (archiveClasspaths != null && archives != null) {
@@ -503,8 +627,9 @@ abstract class TaskRunner extends Thread {
}
}
- private void appendSystemClasspaths(List classPaths) {
- for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+ private static void appendSystemClasspaths(List classPaths) {
+ for (String c : System.getProperty("java.class.path").split(
+ SYSTEM_PATH_SEPARATOR)) {
classPaths.add(c);
}
}
@@ -539,6 +664,7 @@ abstract class TaskRunner extends Thread {
//cache, we didn't create the symlinks. This is done on a per task basis
//by the currently executing task.
public static void setupWorkDir(JobConf conf) throws IOException {
+ // TODO: why do it for the first task
File workDir = new File(".").getAbsoluteFile();
FileUtil.fullyDelete(workDir);
if (DistributedCache.getSymlink(conf)) {
@@ -586,19 +712,8 @@ abstract class TaskRunner extends Thread {
// Do not exit even if symlinks have not been created.
LOG.warn(StringUtils.stringifyException(ie));
}
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
- // if temp directory path is not absolute
- // prepend it with workDir.
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
+ createChildTmpDir(workDir, conf);
}
/**
diff --git src/java/org/apache/hadoop/mapred/TaskTracker.java src/java/org/apache/hadoop/mapred/TaskTracker.java
index a044326..cad6b17 100644
--- src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.InitializeJobContext;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
@@ -185,10 +186,16 @@ public class TaskTracker
//for serving map output to the other nodes
static Random r = new Random();
- private static final String SUBDIR = "taskTracker";
- private static final String CACHEDIR = "archive";
- private static final String JOBCACHE = "jobcache";
- private static final String OUTPUT = "output";
+ static final String SUBDIR = "taskTracker";
+ private static final String DISTCACHEDIR = "distcache";
+ static final String JOBCACHE = "jobcache";
+ static final String OUTPUT = "output";
+ private static final String JARSDIR = "jars";
+ static final String LOCAL_SPLIT_FILE = "split.dta";
+ static final String JOBFILE = "job.xml";
+
+ static final String JOB_LOCAL_DIR = "job.local.dir";
+
private JobConf fConf;
private FileSystem localFs;
private int maxMapSlots;
@@ -376,25 +383,49 @@ public class TaskTracker
}
}
- static String getCacheSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+ static String getDistributedCacheDir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
}
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
-
+
static String getLocalJobDir(String jobid) {
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
- static String getLocalTaskDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid, false) ;
+ static String getLocalJobConfFile(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+ }
+
+ static String getTaskConfFile(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
+ + TaskTracker.JOBFILE;
+ }
+
+ static String getJobJarFile(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR
+ + Path.SEPARATOR + "job.jar";
+ }
+
+ static String getJobWorkDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
+ static String getLocalSplitFile(String jobid, String taskid) {
+ return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.LOCAL_SPLIT_FILE;
}
static String getIntermediateOutputDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid)
- + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.OUTPUT;
+ }
+
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false);
}
static String getLocalTaskDir(String jobid,
@@ -406,7 +437,17 @@ public class TaskTracker
}
return taskDir;
}
-
+
+ static String getTaskWorkDir(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String dir =
+ getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ dir = dir + TASK_CLEANUP_SUFFIX;
+ }
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
if (tip != null) {
@@ -748,10 +789,133 @@ public class TaskTracker
// intialize the job directory
private void localizeJob(TaskInProgress tip) throws IOException {
- Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
- Path jobFile = new Path(t.getJobFile());
+ RunningJob rjob = addTaskToJob(jobId, tip);
+
+ synchronized (rjob) {
+ if (!rjob.localized) {
+
+ JobConf localJobConf = doJobLocalization(t);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ InitializeJobContext context = new InitializeJobContext();
+ context.jobid = jobId;
+ context.user = localJobConf.getUser();
+ context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR)); // TODO: checks
+ taskController.initializeJob(context);
+
+ rjob.jobConf = localJobConf;
+ rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+ localJobConf.getKeepFailedTaskFiles());
+ rjob.localized = true;
+ }
+ }
+ launchTaskForJob(tip, new JobConf(rjob.jobConf));
+ }
+
+ /**
+ * Localize the job on this tasktracker. Specifically
+ *
+ * - Cleanup and create job directories on all disks
+ * - Download the job config file job.xml from the DFS
+ * - Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
+ * in the configuration.
+ *
- Download the job jar file job.jar from the DFS, unjar it and set jar
+ * file in the configuration.
+ *
+ *
+ * @param t task whose job has to be localized on this TT
+ * @return the modified job configuration to be used for all the tasks of this
+ * job as a starting point.
+ * @throws IOException
+ */
+ JobConf doJobLocalization(Task t)
+ throws IOException {
+ JobID jobId = t.getJobID();
+
+ // Initialize the job directories first
+ FileSystem localFs = FileSystem.getLocal(fConf);
+ initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+ // Download the job.xml for this job from the system FS
+ Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId);
+
+ // TODO: Fix this. TT config is not loaded at all!
+ JobConf localJobConf = new JobConf(localJobFile);
+
+ // create the 'job-work' directory: job-specific shared directory for use as
+ // scratch space by all tasks of the same job running on this TaskTracker.
+ Path workDir =
+ lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
+ fConf);
+ if (!localFs.mkdirs(workDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
+ localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+
+ // Download the job.jar for this job from the system FS
+ localizeJobJarFile(jobId, localFs, localJobConf);
+ return localJobConf;
+ }
+
+ /**
+ * Prepare the job directories for a given job. To be called by the job
+ * localization code, only if the job is not already localized.
+ *
+ * @param jobId
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeJobDirs(JobID jobId, FileSystem fs,
+ String[] localDirs)
+ throws IOException {
+ boolean initJobDirStatus = false;
+ String jobDirPath = getLocalJobDir(jobId.toString());
+ for (String localDir : localDirs) {
+ Path jobDir = new Path(localDir, jobDirPath);
+ if (fs.exists(jobDir)) {
+ // this will happen on a partial execution of localizeJob. Sometimes
+ // copying job.xml to the local disk succeeds but copying job.jar might
+ // throw out an exception. We should clean up and then try again.
+ fs.delete(jobDir, true);
+ }
+
+ boolean jobDirStatus = fs.mkdirs(jobDir);
+ if (!jobDirStatus) {
+ LOG.warn("Not able to create job directory " + jobDir.toString());
+ }
+
+ initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+ // TODO: fix return status for the following.
+ // TODO: is this really needed? Try to fix/use RawLocalFileSystem.mkdirs
+ // job-dir has to be private to the TT
+ FileUtil.setPermissions(new File(jobDir.toUri().getPath()),
+ FileUtil.sevenZeroZero);
+ }
+
+ if (!initJobDirStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobId.toString());
+ }
+ }
+
+ /**
+ * Download the job configuration file from the DFS.
+ *
+ * @param t Task whose job file has to be downloaded
+ * @param jobId jobid of the task
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private Path localizeJobConfFile(Path jobFile, JobID jobId)
+ throws IOException {
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
FileStatus status = null;
@@ -762,82 +926,102 @@ public class TaskTracker
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "job.xml",
- jobFileSize, fConf);
- RunningJob rjob = addTaskToJob(jobId, tip);
- synchronized (rjob) {
- if (!rjob.localized) {
-
- FileSystem localFs = FileSystem.getLocal(fConf);
- // this will happen on a partial execution of localizeJob.
- // Sometimes the job.xml gets copied but copying job.jar
- // might throw out an exception
- // we should clean up and then try again
- Path jobDir = localJobFile.getParent();
- if (localFs.exists(jobDir)){
- localFs.delete(jobDir, true);
- boolean b = localFs.mkdirs(jobDir);
- if (!b)
- throw new IOException("Not able to create job directory "
- + jobDir.toString());
- }
- systemFS.copyToLocalFile(jobFile, localJobFile);
- JobConf localJobConf = new JobConf(localJobFile);
-
- // create the 'work' directory
- // job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite(
- (getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty("job.local.dir", workDir.toString());
- localJobConf.set("job.local.dir", workDir.toString());
-
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- try {
- status = systemFS.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch(FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for and we check five times the size of jarFileSize
- // to accommodate for unjarring the jar file in work directory
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "jars",
- 5 * jarFileSize, fConf), "job.jar");
- if (!localFs.mkdirs(localJarFile.getParent())) {
- throw new IOException("Mkdirs failed to create jars directory ");
- }
- systemFS.copyToLocalFile(jarFilePath, localJarFile);
- localJobConf.setJar(localJarFile.toString());
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
- // also unjar the job.jar files
- RunJar.unJar(new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()));
- }
- rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
- localJobConf.getKeepFailedTaskFiles());
- rjob.localized = true;
- rjob.jobConf = localJobConf;
- taskController.initializeJob(jobId);
+
+ Path localJobFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+ jobFileSize, fConf);
+
+ // Download job.xml
+ systemFS.copyToLocalFile(jobFile, localJobFile);
+ return localJobFile;
+ }
+
+ /**
+ * Download the job jar file from DFS to the local file system and unjar it.
+ * Set the local jar file in the passed configuration.
+ *
+ * @param jobId
+ * @param localFs
+ * @param localJobConf
+ * @throws IOException
+ */
+ private void localizeJobJarFile(JobID jobId, FileSystem localFs,
+ JobConf localJobConf)
+ throws IOException {
+ // copy Jar file to the local FS and unjar it.
+ String jarFile = localJobConf.getJar();
+ FileStatus status = null;
+ long jarFileSize = -1;
+ if (jarFile != null) {
+ Path jarFilePath = new Path(jarFile);
+ try {
+ status = systemFS.getFileStatus(jarFilePath);
+ jarFileSize = status.getLen();
+ } catch (FileNotFoundException fe) {
+ jarFileSize = -1;
}
+ // Here we check for and we check five times the size of jarFileSize
+ // to accommodate for unjarring the jar file in userfiles directory
+ Path localJarFile =
+ lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
+ 5 * jarFileSize, fConf);
+
+ // Download job.jar
+ systemFS.copyToLocalFile(jarFilePath, localJarFile);
+
+ localJobConf.setJar(localJarFile.toString());
+
+ // Also un-jar the job.jar files. We un-jar it so that classes inside
+ // sub-directories, for e.g., lib/, classes/ are available on class-path
+ RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
+ .getParent().toString()));
+ }
+ }
+
+ /**
+ * create taskDirs on all the disks. Otherwise, in some cases, like when
+ * LinuxTaskController is in use, child might wish to balance load across
+ * disks but cannot itself create attempt directory because of the fact that
+ * job directory is writable only by the TT.
+ *
+ * @param jobId
+ * @param attemptId
+ * @param isCleanupAttempt
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeAttemptDirs(String jobId, String attemptId,
+ boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
+ throws IOException {
+
+ boolean initStatus = false;
+ String attemptDirPath =
+ getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
+
+ for (String localDir : localDirs) {
+
+ Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+ boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+ if (!attemptDirStatus) {
+ LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ + " couldn't be created.");
+ }
+
+ // job dir is not private, set private permissions for attempt-dir
+ FileUtil.setPermissions(new File(localAttemptDir.toUri().getPath()),
+ FileUtil.sevenZeroZero);
+ // TODO: fix return code of the above.
+
+ initStatus = initStatus || attemptDirStatus;
+ }
+
+ if (!initStatus) {
+ throw new IOException("Not able to initialize attempt directories "
+ + "in any of the configured local directories for the attempt "
+ + attemptId.toString());
}
- launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
@@ -876,7 +1060,7 @@ public class TaskTracker
for (TaskInProgress tip : tasksToClose.values()) {
tip.jobHasFinished(false);
}
-
+
this.running = false;
// Clear local storage
@@ -915,6 +1099,17 @@ public class TaskTracker
}
/**
+ * For testing
+ */
+ TaskTracker() {
+ server = null;
+ }
+
+ void setConf(JobConf conf) {
+ fConf = conf;
+ }
+
+ /**
* Start with the local machine name, and the default JobTracker
*/
public TaskTracker(JobConf conf) throws IOException {
@@ -1557,7 +1752,7 @@ public class TaskTracker
mapOutputFile.setJobId(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path tmp_output = mapOutputFile.getOutputFile(taskId);
+ Path tmp_output = mapOutputFile.getOutputFile();
if(tmp_output == null)
return 0;
FileSystem localFS = FileSystem.getLocal(conf);
@@ -1833,54 +2028,36 @@ public class TaskTracker
taskTimeout = (10 * 60 * 1000);
}
- private void localizeTask(Task task) throws IOException{
+ void localizeTask(Task task) throws IOException{
- Path localTaskDir =
- lDirAlloc.getLocalPathForWrite(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask()),
- defaultJobConf );
-
FileSystem localFs = FileSystem.getLocal(fConf);
- if (!localFs.mkdirs(localTaskDir)) {
- throw new IOException("Mkdirs failed to create "
- + localTaskDir.toString());
- }
-
- // create symlink for ../work if it already doesnt exist
- String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalJobDir(task.getJobID().toString())
- + Path.SEPARATOR
- + "work", defaultJobConf).toString();
- String link = localTaskDir.getParent().toString()
- + Path.SEPARATOR + "work";
- File flink = new File(link);
- if (!flink.exists())
- FileUtil.symLink(workDir, link);
-
+
+ // create taskDirs on all the disks.
+ initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask(), localFs, fConf
+ .getStrings("mapred.local.dir"));
+
// create the working-directory of the task
- Path cwd = lDirAlloc.getLocalPathForWrite(
- getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ Path cwd =
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+ .toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
- Path localTaskFile = new Path(localTaskDir, "job.xml");
- task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
+
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
}
- localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ // Do the task-type specific localization
task.localizeConfiguration(localJobConf);
List staticResolutions = NetUtils.getAllStaticResolutions();
@@ -1913,12 +2090,6 @@ public class TaskTracker
//disable jvm reuse
localJobConf.setNumTasksToExecutePerJvm(1);
}
- OutputStream out = localFs.create(localTaskFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
task.setConf(localJobConf);
}
@@ -2165,16 +2336,13 @@ public class TaskTracker
}
File workDir = null;
try {
- workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(
- task.getJobID().toString(),
- task.getTaskID().toString(),
- task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- localJobConf). toString());
+ // localJobConf is already sandboxed.
+ workDir =
+ new File(lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR,
+ localJobConf).toString());
} catch (IOException e) {
LOG.warn("Working Directory of the task " + task.getTaskID() +
- "doesnt exist. Caught exception " +
+ " doesnt exist. Caught exception " +
StringUtils.stringifyException(e));
}
// Build the command
@@ -2449,34 +2617,42 @@ public class TaskTracker
if (localJobConf == null) {
return;
}
- String taskDir = getLocalTaskDir(task.getJobID().toString(),
- taskId.toString(), task.isTaskCleanupTask());
+ String localTaskDir =
+ getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
+ String taskWorkDir =
+ getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
//and reduce inputs get stored)
runner.close();
}
- //We don't delete the workdir
- //since some other task (running in the same JVM)
- //might be using the dir. The JVM running the tasks would clean
- //the workdir per a task in the task process itself.
+
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ // No jvm reuse, remove everything
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir));
+ localTaskDir));
}
-
else {
- directoryCleanupThread.addToQueue(localFs,
- getLocalFiles(defaultJobConf,
- taskDir+"/job.xml"));
+ // Jvm reuse. We don't delete the workdir since some other task
+ // (running in the same JVM) might be using the dir. The JVM
+ // running the tasks would clean the workdir per a task in the
+ // task process itself.
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir + Path.SEPARATOR
+ + TaskTracker.JOBFILE));
+ // TODO: fix
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir + "/taskjvm.sh"));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir+"/work"));
+ taskWorkDir));
}
}
} catch (Throwable ie) {
diff --git src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
index 521c758..07f84b2 100644
--- src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
+++ src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
@@ -97,7 +97,7 @@ public class ClusterWithLinuxTaskController extends TestCase {
MyLinuxTaskController.class.getName());
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
- .toString(), 1, null, null, conf);
+ .toString(), 4, null, null, conf);
// Get the configured taskcontroller-path
String path = System.getProperty("taskcontroller-path");
@@ -143,8 +143,18 @@ public class ClusterWithLinuxTaskController extends TestCase {
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
- writer.println(String.format("mapred.local.dir=%s", mrCluster
- .getTaskTrackerLocalDir(0)));
+ StringBuffer sb = new StringBuffer();
+ String[] localDirs = mrCluster.getTaskTrackerRunner(0).getLocalDirs();
+ for(int i = 0 ; i < localDirs.length; i++) {
+ sb.append(localDirs[i]);
+ if((i + 1) != localDirs.length) {
+ sb.append(",");
+ }
+ }
+ writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+
+ writer
+ .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
writer.flush();
writer.close();
@@ -156,7 +166,11 @@ public class ClusterWithLinuxTaskController extends TestCase {
* @return boolean
*/
protected boolean shouldRun() {
- return isTaskExecPathPassed() && isUserPassed();
+ if (!isTaskExecPathPassed() || !isUserPassed()) {
+ LOG.info("Invalid taskcontroller-path or taskcontroller-user!"
+ + " Not running test.");
+ }
+ return true;
}
private boolean isTaskExecPathPassed() {
diff --git src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
index 5c26fd1..5f2c49f 100644
--- src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
+++ src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
@@ -171,7 +171,7 @@ public class MiniMRCluster {
StringBuffer localPath = new StringBuffer();
for(int i=0; i < numDir; ++i) {
File ttDir = new File(localDirBase,
- Integer.toString(trackerId) + "_" + 0);
+ Integer.toString(trackerId) + "_" + i);
if (!ttDir.mkdirs()) {
if (!ttDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + ttDir);
diff --git src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
index 3bf0bc5..9ee549c 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
@@ -96,23 +97,20 @@ public class TestIsolationRunner extends TestCase {
});
return files.length;
}
-
+
private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType)
throws IOException {
- String[] localDirs = conf.getLocalDirs();
- assertEquals(1, localDirs.length);
- Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR +
- "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId);
- Path attemptDir = new Path(jobCacheDir,
- new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString());
- return new Path(attemptDir, "job.xml");
+ String taskid =
+ new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString();
+ return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
+ TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
}
public void testIsolationRunOfMapTask() throws
IOException, InterruptedException, ClassNotFoundException {
MiniMRCluster mr = null;
try {
- mr = new MiniMRCluster(1, "file:///", 1);
+ mr = new MiniMRCluster(1, "file:///", 4);
// Run a job succesfully; keep task files.
JobConf conf = mr.createJobConf();
@@ -130,8 +128,10 @@ public class TestIsolationRunner extends TestCase {
// Retrieve succesful job's configuration and
// run IsolationRunner against the map task.
FileSystem localFs = FileSystem.getLocal(conf);
- Path mapJobXml = getAttemptJobXml(conf, jobId,
- TaskType.MAP).makeQualified(localFs);
+ Path mapJobXml =
+ getAttemptJobXml(
+ mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId,
+ TaskType.MAP).makeQualified(localFs);
assertTrue(localFs.exists(mapJobXml));
new IsolationRunner().run(new String[] {
diff --git src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
index f923f88..ab678d5 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
/**
* Test a java-based mapred job with LinuxTaskController running the jobs as a
@@ -39,10 +41,33 @@ public class TestJobExecutionAsDifferentUser extends
startCluster();
Path inDir = new Path("input");
Path outDir = new Path("output");
- RunningJob job =
- UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir);
+
+ RunningJob job;
+
+ // Run a job with zero maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with 1 map and zero reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+ job.waitForCompletion();
assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
+
+ // Run a normal job with maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with jvm reuse
+ JobConf myConf = getClusterConf();
+ myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+ String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+ assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
+ // TODO: check reuse
}
public void testEnvironment() throws IOException {
diff --git src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
index 665dc33..2f70803 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
@@ -289,7 +289,7 @@ public class TestMapRed extends TestCase {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path input = mapOutputFile.getInputFile(0, taskId);
+ Path input = mapOutputFile.getInputFile(0);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
diff --git src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
index 1dac0ac..f14234c 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
@@ -135,7 +135,7 @@ public class TestMiniMRWithDFS extends TestCase {
int numNotDel = 0;
File localDir = new File(mr.getTaskTrackerLocalDir(i));
LOG.debug("Tracker directory: " + localDir);
- File trackerDir = new File(localDir, "taskTracker");
+ File trackerDir = new File(localDir, TaskTracker.SUBDIR);
assertTrue("local dir " + localDir + " does not exist.",
localDir.isDirectory());
assertTrue("task tracker dir " + trackerDir + " does not exist.",
@@ -150,7 +150,7 @@ public class TestMiniMRWithDFS extends TestCase {
}
for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
String name = contents[fileIdx];
- if (!("taskTracker".equals(contents[fileIdx]))) {
+ if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
LOG.debug("Looking at " + name);
assertTrue("Spurious directory " + name + " found in " +
localDir, false);
@@ -158,7 +158,7 @@ public class TestMiniMRWithDFS extends TestCase {
}
for (int idx = 0; idx < neededDirs.size(); ++idx) {
String name = neededDirs.get(idx);
- if (new File(new File(new File(trackerDir, "jobcache"),
+ if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
jobIds[idx]), name).isDirectory()) {
found[idx] = true;
numNotDel++;
diff --git src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
new file mode 100644
index 0000000..12f41f7
--- /dev/null
+++ src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.jar.JarOutputStream;
+import java.util.zip.ZipEntry;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import junit.framework.TestCase;
+
+/**
+ * Test to verify localization of a job and localization of a task on a
+ * TaskTracker.
+ *
+ */
+public class TestTaskTrackerLocalization extends TestCase {
+
+ File TEST_ROOT_DIR;
+
+ @Override
+ protected void setUp()
+ throws Exception {
+ TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp"),
+ "testJobLocalization");
+ if (!TEST_ROOT_DIR.exists()) {
+ TEST_ROOT_DIR.mkdirs();
+ }
+ }
+
+ @Override
+ protected void tearDown()
+ throws Exception {
+ FileUtil.fullyDelete(TEST_ROOT_DIR);
+ }
+
+ /**
+ * Test job localization on a TT. Tests localization of job.xml, job.jar and
+ * corresponding setting of configuration.
+ *
+ * @throws IOException
+ */
+ public void testJobLocalization()
+ throws IOException {
+ JobConf conf = new JobConf();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+ conf.setStrings("mapred.local.dir", new File(TEST_ROOT_DIR, "local_0")
+ .getPath(), new File(TEST_ROOT_DIR, "local_1").getPath());
+
+ // Create the job jar file
+ File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+ JarOutputStream jstream =
+ new JarOutputStream(new FileOutputStream(jobJarFile));
+ ZipEntry ze = new ZipEntry("lib/lib1.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ ze = new ZipEntry("lib/lib2.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ jstream.finish();
+ jstream.close();
+ conf.setJar(jobJarFile.toURI().toString());
+
+ // Create the job configuration file
+ File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+ FileOutputStream out = new FileOutputStream(jobConfFile);
+ conf.writeXml(out);
+ out.close();
+
+ // Set up the TaskTracker
+ TaskTracker tracker = new TaskTracker();
+ tracker.setConf(conf);
+ tracker.systemFS = FileSystem.getLocal(conf); // for test case
+
+ // Set up the task to be localized
+ String jtIdentifier = "200907202331";
+ JobID jobId = new JobID(jtIdentifier, 1);
+ TaskAttemptID taskId =
+ new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 1);
+ Task t =
+ new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+
+ // /////////// The main method being tested
+ JobConf localizedJobConf = tracker.doJobLocalization(t);
+ // ///////////
+
+ // Check the directory structure
+ for (String dir : conf.getStrings("mapred.local.dir")) {
+ File localDir = new File(dir);
+ assertTrue("mapred.local.dir " + localDir + " isn't created!", localDir
+ .exists());
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ + "is not created!", taskTrackerSubDir.exists());
+ File systemDir = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
+ assertTrue("system dir in the taskTrackerSubdir " + taskTrackerSubDir
+ + " isn't created!", systemDir.exists());
+ File systemJobDir = new File(systemDir, jobId.toString());
+ assertTrue("job-dir in system dir " + systemDir + " isn't created!",
+ systemJobDir.exists());
+ }
+
+ // check the localization of job.xml
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+
+ assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
+ conf) != null);
+
+ // check the localization of job.jar
+ Path jarFileLocalized =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
+ .toString()), conf);
+ assertTrue("job.jar is not localized on this TaskTracker!!",
+ jarFileLocalized != null);
+ assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
+ jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar")
+ .exists());
+ assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File(
+ jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar")
+ .exists());
+
+ // check the creation of job work directory
+ assertTrue("job-work dir is not created on this TaskTracker!!",
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
+ .toString()), conf) != null);
+
+ // Check the setting of job.local.dir and job.jar which will eventually be
+ // used by the user's task
+ boolean jobLocalDirFlag = false, mapredJarFlag = false;
+ String localizedJobLocalDir =
+ localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR);
+ String localizedJobJar = localizedJobConf.getJar();
+ for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
+ if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
+ + TaskTracker.getJobWorkDir(jobId.toString()))) {
+ jobLocalDirFlag = true;
+ }
+ if (localizedJobJar.equals(localDir + Path.SEPARATOR
+ + TaskTracker.getJobJarFile(jobId.toString()))) {
+ mapredJarFlag = true;
+ }
+ }
+ assertTrue(TaskTracker.JOB_LOCAL_DIR
+ + " is not set properly to the target users directory : "
+ + localizedJobLocalDir, jobLocalDirFlag);
+ assertTrue(
+ "mapred.jar is not set properly to the target users directory : "
+ + localizedJobJar, mapredJarFlag);
+ }
+
+ /**
+ * Test task localization on a TT.
+ *
+ * @throws IOException
+ */
+ public void testTaskLocalization()
+ throws IOException {
+ JobConf conf = new JobConf();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+
+ conf.setStrings("mapred.local.dir", new File(TEST_ROOT_DIR, "local_0")
+ .getPath(), new File(TEST_ROOT_DIR, "local_1").getPath());
+
+ // Create the job configuration file
+ File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+ FileOutputStream out = new FileOutputStream(jobConfFile);
+ conf.writeXml(out);
+ out.close();
+
+ // Set up the TaskTracker
+ TaskTracker tracker = new TaskTracker();
+ tracker.setConf(conf);
+ tracker.systemFS = FileSystem.getLocal(conf); // for test case
+
+ // Set up the task to be localized
+ String jtIdentifier = "200907202331";
+ JobID jobId = new JobID(jtIdentifier, 1);
+ TaskAttemptID taskId =
+ new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0);
+ Task task =
+ new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1);
+
+ JobConf localizedJobConf = tracker.doJobLocalization(task);
+
+ TaskInProgress tip = tracker.new TaskInProgress(task, conf);
+ tip.setJobConf(localizedJobConf);
+
+ // ////////// The central method being tested
+ tip.localizeTask(task);
+ // //////////
+
+ // check the functionality of localizeTask
+ for (String dir : conf.getStrings("mapred.local.dir")) {
+ assertTrue("attempt-dir in localDir " + dir + " is not creted!!",
+ new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
+ .toString())).exists());
+ }
+
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ assertTrue("atttempt work dir for " + taskId.toString()
+ + " is not created in any of the configured dirs!!", lDirAlloc
+ .getLocalPathToRead(TaskTracker
+ .getTaskWorkDir(task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), conf) != null);
+
+ TaskRunner runner = task.createRunner(tracker, tip);
+
+ // /////// Another method being tested
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ // ///////
+
+ // Make sure the task-conf file is created
+ Path localTaskFile =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getJobID().toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), conf);
+ assertTrue("Task conf file " + localTaskFile.toString()
+ + " is not created!!", new File(localTaskFile.toUri().getPath())
+ .exists());
+
+ // Make sure that the mapred.local.dir is sandboxed
+ JobConf localizedTaskConf = new JobConf(localTaskFile);
+ for (String childMapredLocalDir : localizedTaskConf
+ .getStrings("mapred.local.dir")) {
+ assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
+ childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
+ .toString(), taskId.toString(), false)));
+ }
+
+ // Make sure task task.getJobFile is changed and pointed correctly.
+ assertTrue(task.getJobFile().endsWith(
+ TaskTracker
+ .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+ }
+}