diff --git build.xml build.xml index f9e0694..e69d869 100644 --- build.xml +++ build.xml @@ -1658,5 +1658,16 @@ - + + + + + + + + + + diff --git conf/taskcontroller.cfg conf/taskcontroller.cfg index fb4cced..eb2e5ee 100644 --- conf/taskcontroller.cfg +++ conf/taskcontroller.cfg @@ -1,3 +1,2 @@ -mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated -hadoop.pid.dir=#configured HADOOP_PID_DIR -hadoop.indent.str=#configured HADOOP_IDENT_STR +mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths. +hadoop.log.dir=#configured value of hadoop.log.dir. \ No newline at end of file diff --git lib/hadoop-core-0.21.0-dev.jar lib/hadoop-core-0.21.0-dev.jar index 9494544..ad05a05 100644 Binary files lib/hadoop-core-0.21.0-dev.jar and lib/hadoop-core-0.21.0-dev.jar differ diff --git src/c++/task-controller/Makefile.in src/c++/task-controller/Makefile.in index ba35f96..83f418f 100644 --- src/c++/task-controller/Makefile.in +++ src/c++/task-controller/Makefile.in @@ -16,9 +16,11 @@ # limitations under the License. # OBJS=main.o task-controller.o configuration.o +TESTOBJS=test-task-controller.o task-controller.o configuration.o CC=@CC@ CFLAGS = @CFLAGS@ BINARY=task-controller +TESTBINARY=test-task-controller installdir = @prefix@ @@ -34,9 +36,15 @@ task-controller.o: task-controller.c task-controller.h configuration.o: configuration.h configuration.c $(CC) $(CFLAG) -o configuration.o -c configuration.c +test-task-controller.o: task-controller.c task-controller.h + $(CC) $(CFLAG) -o test-task-controller.o -c test-task-controller.c + +test: $(TESTOBJS) + $(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS) + cp $(TESTBINARY) $(installdir) clean: - rm -rf $(BINARY) $(OBJS) + rm -rf $(BINARY) $(OBJS) $(TESTOBJS) install: all cp $(BINARY) $(installdir) diff --git src/c++/task-controller/configuration.c src/c++/task-controller/configuration.c index 05b267a..b6abbf5 100644 --- src/c++/task-controller/configuration.c +++ src/c++/task-controller/configuration.c @@ -71,9 +71,8 @@ void get_configs() { snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR); #endif -#ifdef DEBUG - fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name); -#endif + fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name); + //allocate space for ten configuration items. config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *) * MAX_SIZE); @@ -87,7 +86,7 @@ void get_configs() { while(!feof(conf_file)) { line = (char *) malloc(linesize); if(line == NULL) { - fprintf(LOGFILE,"malloc failed while reading configuration file.\n"); + fprintf(LOGFILE, "malloc failed while reading configuration file.\n"); goto cleanup; } size_read = getline(&line,&linesize,conf_file); @@ -123,9 +122,9 @@ void get_configs() { "Failed allocating memory for single configuration item\n"); goto cleanup; } -#ifdef DEBUG - fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok); -#endif + + fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok); + memset(config.confdetails[config.size], 0, sizeof(struct confentry)); config.confdetails[config.size]->key = (char *) malloc( sizeof(char) * (strlen(equaltok)+1)); @@ -142,9 +141,9 @@ void get_configs() { free(config.confdetails[config.size]); continue; } -#ifdef DEBUG - fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok); -#endif + + fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok); + config.confdetails[config.size]->value = (char *) malloc( sizeof(char) * (strlen(equaltok)+1)); strcpy((char *)config.confdetails[config.size]->value, equaltok); @@ -184,8 +183,7 @@ void get_configs() { * array, next time onwards used the populated array. * */ - -const char * get_value(char* key) { +const char * get_value(const char* key) { int count; if (config.size == 0) { get_configs(); @@ -196,15 +194,19 @@ const char * get_value(char* key) { } for (count = 0; count < config.size; count++) { if (strcmp(config.confdetails[count]->key, key) == 0) { - return config.confdetails[count]->value; + return strdup(config.confdetails[count]->value); } } return NULL; } -const char ** get_values(char * key) { +/** + * Function to return an array of values for a key. + * Value delimiter is assumed to be a comma. + */ +const char ** get_values(const char * key) { const char ** toPass = NULL; - const char * value = get_value(key); + const char *value = get_value(key); char *tempTok = NULL; char *tempstr = NULL; int size = 0; diff --git src/c++/task-controller/configuration.h.in src/c++/task-controller/configuration.h.in index af8086b..ebdf4d8 100644 --- src/c++/task-controller/configuration.h.in +++ src/c++/task-controller/configuration.h.in @@ -53,10 +53,10 @@ extern struct configuration config; extern char *hadoop_conf_dir; #endif //method exposed to get the configurations -const char * get_value(char* key); +const char * get_value(const char* key); //method to free allocated configuration void free_configurations(); //function to return array of values pointing to the key. Values are //comma seperated strings. -const char ** get_values(char* key); +const char ** get_values(const char* key); diff --git src/c++/task-controller/configure.ac src/c++/task-controller/configure.ac index 2fd52a8..8605ac8 100644 --- src/c++/task-controller/configure.ac +++ src/c++/task-controller/configure.ac @@ -38,7 +38,7 @@ AC_PROG_CC # Checks for header files. AC_HEADER_STDC -AC_CHECK_HEADERS([stdlib.h string.h unistd.h]) +AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h]) #check for HADOOP_CONF_DIR @@ -50,12 +50,14 @@ fi # Checks for typedefs, structures, and compiler characteristics. AC_C_CONST AC_TYPE_PID_T +AC_TYPE_MODE_T +AC_TYPE_SIZE_T # Checks for library functions. AC_FUNC_MALLOC AC_FUNC_REALLOC -AC_CHECK_FUNCS([strerror]) +AC_FUNC_CHOWN +AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup]) AC_CONFIG_FILES([Makefile]) AC_OUTPUT - diff --git src/c++/task-controller/main.c src/c++/task-controller/main.c index c19785a..39154a8 100644 --- src/c++/task-controller/main.c +++ src/c++/task-controller/main.c @@ -17,6 +17,26 @@ */ #include "task-controller.h" +void open_log_file(const char *log_file) { + if (log_file == NULL) { + LOGFILE = stdout; + } else { + LOGFILE = fopen(log_file, "a"); + if (LOGFILE == NULL) { + fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file); + LOGFILE = stdout; + } + if (LOGFILE != stdout) { + if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH + | S_IRGRP | S_IWGRP) < 0) { + fprintf(stdout, "Unable to change permission of the log file %s \n", + log_file); + fprintf(stdout, "changing log file to stdout"); + LOGFILE = stdout; + } + } + } +} int main(int argc, char **argv) { int command; @@ -24,6 +44,7 @@ int main(int argc, char **argv) { const char * job_id = NULL; const char * task_id = NULL; const char * tt_root = NULL; + const char *log_dir = NULL; int exit_code = 0; const char * task_pid = NULL; const char* const short_options = "l:"; @@ -35,7 +56,7 @@ int main(int argc, char **argv) { //Minimum number of arguments required to run the task-controller //command-name user command tt-root if (argc < 3) { - display_usage(stderr); + display_usage(stdout); return INVALID_ARGUMENT_NUMBER; } @@ -54,24 +75,9 @@ int main(int argc, char **argv) { break; } } while (next_option != -1); - if (log_file == NULL) { - LOGFILE = stderr; - } else { - LOGFILE = fopen(log_file, "a"); - if (LOGFILE == NULL) { - fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file); - LOGFILE = stderr; - } - if (LOGFILE != stderr) { - if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH - | S_IRGRP | S_IWGRP) < 0) { - fprintf(stderr, "Unable to change permission of the log file %s \n", - log_file); - fprintf(stderr, "changing log file to stderr"); - LOGFILE = stderr; - } - } - } + + open_log_file(log_file); + //checks done for user name //checks done if the user is root or not. if (argv[optind] == NULL) { @@ -88,17 +94,28 @@ int main(int argc, char **argv) { } optind = optind + 1; command = atoi(argv[optind++]); -#ifdef DEBUG + fprintf(LOGFILE, "main : command provided %d\n",command); fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name); -#endif + switch (command) { + case INITIALIZE_JOB: + job_id = argv[optind++]; + exit_code = initialize_job(job_id, user_detail->pw_name); + break; case LAUNCH_TASK_JVM: tt_root = argv[optind++]; job_id = argv[optind++]; task_id = argv[optind++]; + log_dir = argv[optind++]; exit_code - = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root); + = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root, log_dir); + break; + case INITIALIZE_TASK: + job_id = argv[optind++]; + task_id = argv[optind++]; + log_dir = argv[optind++]; + exit_code = initialize_task(job_id, task_id, log_dir, user_detail->pw_name); break; case TERMINATE_TASK_JVM: task_pid = argv[optind++]; diff --git src/c++/task-controller/task-controller.c src/c++/task-controller/task-controller.c index 511f0cf..eb7e2ef 100644 --- src/c++/task-controller/task-controller.c +++ src/c++/task-controller/task-controller.c @@ -71,104 +71,462 @@ int change_user(const char * user) { return 0; } -// function to check if the passed tt_root is present in hadoop.tmp.dir -int check_tt_root(const char *tt_root) { - char ** mapred_local_dir; - int found = -1; +/** + * Checks the passed value for the variable config_key against the values in + * the configuration. + * Returns 0 if the passed value is found in the configuration, + * -1 otherwise + */ +int check_variable_against_config(const char *config_key, + const char *passed_value) { - if (tt_root == NULL) { + if (config_key == NULL || passed_value == NULL) { return -1; } - mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY); + int found = -1; + + const char **config_value = get_values(config_key); - if (mapred_local_dir == NULL) { + if (config_value == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", config_key); return -1; } - while(*mapred_local_dir != NULL) { - if(strcmp(*mapred_local_dir,tt_root) == 0) { + char **config_val_ptr = (char **) config_value; + while (*config_val_ptr != NULL) { + if (strcmp(*config_val_ptr, passed_value) == 0) { found = 0; break; } + config_val_ptr++; + } + + if (found != 0) { + fprintf( + LOGFILE, + "Invalid value passed: \ + Configured value of %s is %s. \ + Passed value is %s.\n", + config_key, get_value(config_key), passed_value); } - free(mapred_local_dir); + free(config_value); return found; } /** + * Utility function to concatenate argB to argA using the concat_pattern + */ +char *concatenate(const char *argA, const char *argB, char *concat_pattern, + char *return_path_name) { + if (argA == NULL || argB == NULL) { + fprintf(LOGFILE, "One of the arguments passed for %s in null.\n", + return_path_name); + return NULL; + } + + char *return_path = NULL; + int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB); + + return_path = (char *) malloc(sizeof(char) * (str_len + 1)); + if (return_path == NULL) { + fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name); + return NULL; + } + memset(return_path, '\0', str_len + 1); + snprintf(return_path, str_len, concat_pattern, argA, argB); + return return_path; +} + +/** + * Get the job-directory path from tt_root and job-id + */ +char *get_job_directory(const char * tt_root, const char *jobid) { + return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path"); +} + +/** + * Get the job-jars directory from the job_dir + */ +char *get_job_jars_directory(const char *job_dir) { + return concatenate(job_dir, "", JOB_DIR_TO_JARS_PATTERN, + "job_jars_dir_path"); +} + +/** + * Get the work directory from the job_dir + */ +char *get_work_directory(const char *job_dir) { + return concatenate(job_dir, "", JOB_DIR_TO_WORK_DIR_PATTERN, + "job_work_dir_path"); +} + +/** + * Get the attempt directory for the given attempt_id + */ +char *get_attempt_directory(const char *job_dir, const char *attempt_id) { + return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN, + "attempt_dir_path"); +} + +/* + * Get the path to the task launcher file which is created by the TT + */ +char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) { + return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN, + "task_script_path"); +} + +/** + * Get the log directory for the given attempt. + */ +char *get_task_log_dir(const char *log_dir, const char *attempt_id) { + return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN, + "task_log_dir"); +} + +/** + * Function to check if the passed tt_root is present in mapred.local.dir + * the task-controller is configured with. + */ +int check_tt_root(const char *tt_root) { + return check_variable_against_config(TT_SYS_DIR_KEY, tt_root); +} + +/** + * Function to check if the passed log_dir is present in hadoop.log.dir + * the task-controller is configured with. + */ +int check_task_logs_dir(const char *log_dir) { + return check_variable_against_config(TT_LOG_DIR_KEY, log_dir); +} + +/** * Function to check if the constructed path and absolute * path resolve to one and same. */ - int check_path(char *path) { + fprintf(LOGFILE, "Passed path : %s\n", path); char * resolved_path = (char *) canonicalize_file_name(path); - if(resolved_path == NULL) { + if (resolved_path == NULL) { + switch (errno) { + case ENAMETOOLONG: + fprintf(LOGFILE, "The resulting path is too long.\n"); + break; + case EACCES: + fprintf(LOGFILE, "The path is not readable.\n"); + break; + case ENOENT: + fprintf(LOGFILE, "The input file name is empty or %s", + "the path components does not exist.\n"); + break; + case ELOOP: + fprintf(LOGFILE, "More than `MAXSYMLINKS' many symlinks have been followed.\n"); + break; + } return ERROR_RESOLVING_FILE_PATH; } - if(strcmp(resolved_path, path) !=0) { + fprintf(LOGFILE, "Resolved path : %s\n", resolved_path); + if (strcmp(resolved_path, path) != 0) { free(resolved_path); return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH; } free(resolved_path); return 0; } + /** - * Function to check if a user actually owns the file. + * Function to change the owner/group of a given path. */ -int check_owner(uid_t uid, char *path) { - struct stat filestat; - if(stat(path, &filestat)!=0) { - return UNABLE_TO_STAT_FILE; +int change_owner(const char *path, uid_t uid, gid_t gid) { + int exit_code = chown(path, uid, gid); + if (exit_code != 0) { + switch (errno) { + case (EPERM): + fprintf(LOGFILE, + "Process lacks permission to make the requested change.\n"); + break; + case (EROFS): + fprintf(LOGFILE, "File is on a read-only file system.\n"); + break; + case (EACCES): + fprintf( + LOGFILE, + "Process does not have search permission for a directory \ + component of the file name.\n"); + break; + case (ENAMETOOLONG): + fprintf(LOGFILE, "The file name is too long.\n"); + break; + case (ENOENT): + fprintf(LOGFILE, + "A directory component in the file name doesn't exist.\n"); + break; + case (ENOTDIR): + fprintf( + LOGFILE, + "A directory component in the file name exists, \ + but it isn't a directory.\n"); + break; + case (ELOOP): + fprintf( + LOGFILE, + "Too many symbolic links were resolved \ + while trying to look up the file name.\n"); + break; + default: + fprintf(LOGFILE, "chown failed with error number : %d.\n", errno); + } } - //check owner. - if(uid != filestat.st_uid){ - return FILE_NOT_OWNED_BY_TASKTRACKER; + return exit_code; +} + +/** + * Function to change the mode of a given path. + */ +int change_mode(const char *path, mode_t mode) { + int exit_code = chmod(path, mode); + if (exit_code != 0) { + switch (errno) { + case (ENOENT): + fprintf(LOGFILE, "The named file doesn't exist.\n"); + break; + case (EPERM): + fprintf( + LOGFILE, + "This process does not have permission to change the access\ + permissions of this file.\n"); + break; + case (EROFS): + fprintf(LOGFILE, "The file resides on a read-only file system.\n"); + break; + case (EACCES): + fprintf( + LOGFILE, + "Process does not have search permission for a directory \ + component of the file name.\n"); + break; + case (ENAMETOOLONG): + fprintf(LOGFILE, "The file name is too long.\n"); + break; + case (ENOTDIR): + fprintf( + LOGFILE, + "A directory component in the file name exists, \ + but it isn't a directory.\n"); + break; + case (ELOOP): + fprintf( + LOGFILE, + "Too many symbolic links were resolved \ + while trying to look up the file name.\n"); + break; + default: + fprintf(LOGFILE, "chown failed with error number : %d.\n", errno); + } + } + return exit_code; +} + +/** + * Function to prepare the attempt directories for the task JVM. + * This is done by changing the ownership of the attempt directory recursively + * to the job owner. We do the following: + * * sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/ + * * sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/ + */ +int prepare_attempt_directories(const char *job_id, const char *attempt_id, + const char *user) { + if (job_id == NULL || attempt_id == NULL || user == NULL) { + fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n"); + return INVALID_ARGUMENT_NUMBER; + } + + if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user); + return INVALID_USER_NAME; + } + + int tasktracker_gid = getgid(); + + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; + } + + char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *attempt_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + job_dir = get_job_directory(*local_dir_ptr, job_id); + + // prepare attempt-dir in each of the mapred_local_dir + attempt_dir = get_attempt_directory(job_dir, attempt_id); + if (opendir(attempt_dir) == NULL) { + fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n", + attempt_dir); + } else if (secure_path(attempt_dir, user_detail->pw_uid, tasktracker_gid) + != 0) { + fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir); + failed = 0; + } + + local_dir_ptr++; + free(attempt_dir); + free(job_dir); + } + free(local_dir); + free(full_local_dir_str); + + cleanup(); + if (failed == 0) { + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; } return 0; } -/* - * function to provide path to the task file which is created by the tt - * - *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern +/** + * Function to prepare the task logs for the child. It gives the ownership + * of the attempt's log-dir to the user. It gives readable permissions to + * everyone for the attempt's dir and it's contents. This is only till + * security is fixed for task-logs. + * * sudo chown user:mapred log-dir/userlogs/$attemptid + * * sudo chmod -R 2770 log-dir/userlogs/$attemptid */ -void get_task_file_path(const char * jobid, const char * taskid, - const char * tt_root, char **task_script_path) { - const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY); - *task_script_path = NULL; - int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen( - taskid)) + strlen(tt_root); - - if (mapred_local_dir == NULL) { - return; - } - - *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1)); - if (*task_script_path == NULL) { - fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n"); - free(mapred_local_dir); - return; - } - - memset(*task_script_path,'\0',str_len+1); - snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root, - jobid, taskid); -#ifdef DEBUG - fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path); - fflush(LOGFILE); -#endif - free(mapred_local_dir); +int prepare_task_logs(const char *log_dir, const char *task_id) { + + char *task_log_dir = get_task_log_dir(log_dir, task_id); + if (opendir(task_log_dir) == NULL) { + fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n", + task_log_dir); + // See TaskRunner.java to see that an absent log-dir doesn't fail the task. + return 0; + } + + int tasktracker_gid = getgid(); + if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid) != 0) { + fprintf(LOGFILE, "Failed to secure the log_dir %s\n", task_log_dir); + return -1; + } + return 0; +} + +/** + * Function to secure the given path. It does the following recursively: + * 1) changes the owner/group of the paths to the passed owner/group + * 2) changes the file permission to the secure file_mode + */ +int secure_path(const char *path, uid_t uid, gid_t gid) { + FTS *tree = NULL; // the file hierarchy + FTSENT *entry = NULL; // a file in the hierarchy + char *paths[] = { (char *) path }; + int process_path = 1; + int dir = 1; + int error_code = 0; + + mode_t mode; + mode_t file_mode = S_IRWXU | S_IRWXG; // No setgid on files + mode_t dir_mode = S_ISGID | S_IRWXU | S_IRWXG; + + // Get physical locations and don't resolve the symlinks. + // Don't change directory while walking the directory. + int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR; + tree = fts_open(paths, ftsoptions, NULL); + while (((entry = fts_read(tree)) != NULL) && error_code == 0) { + dir = 1; + switch (entry->fts_info) { + case FTS_D: + // A directory being visited in pre-order. + // We change ownership of directories in post-order. + // so ignore the pre-order visit. + process_path = 1; + break; + case FTS_DC: + // A directory that causes a cycle in the tree + // We don't expect cycles, ignore. + process_path = 1; + break; + case FTS_DNR: + // A directory which cannot be read + // Ignore and set error code. + process_path = 1; + error_code = -1; + break; + case FTS_DOT: + // "." or ".." + process_path = 1; + break; + case FTS_F: + // A regular file + process_path = 0; + break; + case FTS_DP: + // A directory being visited in post-order + process_path = 0; + dir = 0; + break; + case FTS_SL: + // A symbolic link + process_path = 0; + break; + case FTS_SLNONE: + // A symbolic link with a nonexistent target + process_path = 0; + break; + case FTS_NS: + // A file for which no stat(2) information was available + // Ignore and set error code + process_path = 1; + error_code = -1; + break; + case FTS_DEFAULT: + // File that doesn't belong to any of the above type. Ignore. + process_path = 1; + break; + } + + if (error_code != 0) { + break; + } + if (process_path == 1) { + continue; + } + + if (change_owner(entry->fts_path, uid, gid) != 0) { + fprintf(LOGFILE, "couldn't change the ownership of %s\n", + entry->fts_path); + error_code = -3; + } else { + if (dir == 0) { + mode = dir_mode; + } else { + mode = file_mode; + } + if (change_mode(entry->fts_path, mode) != 0) { + fprintf(LOGFILE, "couldn't change the permissions of %s\n", + entry->fts_path); + error_code = -3; + } + } + } + fts_close(tree); + return error_code; } -//end of private functions void display_usage(FILE *stream) { fprintf(stream, "Usage: task-controller [-l logfile] user command command-args\n"); } //function used to populate and user_details structure. - int get_user_details(const char *user) { if (user_detail == NULL) { user_detail = getpwnam(user); @@ -180,31 +538,126 @@ int get_user_details(const char *user) { return 0; } + +/** + * Function to prepare the job directories for the task JVM. + * We do the following: + * * sudo chown user:mapred -R taskTracker/jobcache/$jobid + * * sudo chmod 2770 -R taskTracker/jobcache/$jobid + * * sudo chmod 0570 taskTracker/jobcache/$jobid/ + */ +int initialize_job(const char *jobid, const char *user) { + if (jobid == NULL || user == NULL) { + fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n"); + return INVALID_ARGUMENT_NUMBER; + } + + if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s", user); + return INVALID_USER_NAME; + } + + gid_t tasktracker_gid = getgid(); // TaskTracker's group-id + + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return INVALID_TT_ROOT; + } + + char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + job_dir = get_job_directory(*local_dir_ptr, jobid); + + if (opendir(job_dir) == NULL) { + fprintf(LOGFILE, "job_dir %s doesn't exist. Not doing anything.\n", + job_dir); + } else if (secure_path(job_dir, user_detail->pw_uid, tasktracker_gid) != 0) { + fprintf(LOGFILE, "Failed to secure the job_dir %s\n", job_dir); + failed = 0; + } else if (change_mode(job_dir, S_IREAD | S_IEXEC | S_IRWXG) != 0) { + fprintf(LOGFILE, "couldn't change the permissions of %s\n", job_dir); + failed = 0; + } + + local_dir_ptr++; + free(job_dir); + } + free(local_dir); + free(full_local_dir_str); + cleanup(); + if (failed == 0) { + return INITIALIZE_JOB_FAILED; + } + return 0; +} + +/** + * Function used to initialize task. Prepares attempt_dir, jars_dir and + * log_dir to be accessible by the child + * TODO: fix exit codes + */ +int initialize_task(const char *jobid, const char *taskid, + const char *log_dir, const char *user) { + int exit_code = 0; + fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid); + fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid); + fprintf(LOGFILE, "log_dir passed to initialize_task : %s.\n", log_dir); + + if (prepare_attempt_directories(jobid, taskid, user) != 0) { + fprintf(LOGFILE, + "Couldn't prepare the attempt directories for %s of user %s.\n", + taskid, user); + exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED; + goto cleanup; + } + + if (check_task_logs_dir(log_dir) < 0) { + fprintf(LOGFILE, "Problem with the log directory passed or configured.\n"); + exit_code = INVALID_TT_LOG_DIR; + goto cleanup; + } + + if (prepare_task_logs(log_dir, taskid) != 0) { + fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", + log_dir, taskid); + exit_code = PREPARE_TASK_LOGS_FAILED; + } + + cleanup: + // free configurations + cleanup(); + return exit_code; +} + /* - *Function used to launch a task as the provided user. - * First the function checks if the tt_root passed is found in - * hadoop.temp.dir - * Uses get_task_file_path to fetch the task script file path. - * Does an execlp on the same in order to replace the current image with + * Function used to launch a task as the provided user. It does the following : + * 1) Checks if the tt_root passed is found in mapred.local.dir + * 2) Prepares attempt_dir, jars_dir and log_dir to be accessible by the child + * 3) Uses get_task_launcher_file to fetch the task script file path + * 4) Does an execlp on the same in order to replace the current image with * task image. */ - int run_task_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root) { - char *task_script_path = NULL; + const char *tt_root, const char *log_dir) { int exit_code = 0; - uid_t uid = getuid(); - if(jobid == NULL || taskid == NULL) { + if (jobid == NULL || taskid == NULL || tt_root == NULL || log_dir == NULL) { return INVALID_ARGUMENT_NUMBER; } -#ifdef DEBUG - fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid); - fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid); - fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root); - fflush(LOGFILE); -#endif + fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid); + fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid); + fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root); + fprintf(LOGFILE, "log_dir passed to run_task_as_user : %s.\n", log_dir); + //Check tt_root before switching the user, as reading configuration //file requires privileged access. if (check_tt_root(tt_root) < 0) { @@ -213,41 +666,94 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid, return INVALID_TT_ROOT; } - //change the user - fclose(LOGFILE); - fcloseall(); - umask(0); - if (change_user(user) != 0) { - cleanup(); - return SETUID_OPER_FAILED; + char *job_dir = NULL, *jars_dir = NULL, *task_script_path = NULL; + + if (prepare_attempt_directories(jobid, taskid, user) != 0) { + fprintf(LOGFILE, + "Couldn't prepare the attempt directories for %s of user %s.\n", + taskid, user); + exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED; + goto cleanup; } - get_task_file_path(jobid, taskid, tt_root, &task_script_path); + if (check_task_logs_dir(log_dir) < 0) { + fprintf(LOGFILE, "Problem with the log directory passed or configured.\n"); + exit_code = INVALID_TT_LOG_DIR; + goto cleanup; + } + + if (prepare_task_logs(log_dir, taskid) != 0) { + fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", log_dir, + taskid); + exit_code = PREPARE_TASK_LOGS_FAILED; + goto cleanup; + } + + job_dir = get_job_directory(tt_root, jobid); + if (job_dir == NULL) { + fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root); + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + task_script_path = get_task_launcher_file(job_dir, taskid); if (task_script_path == NULL) { - cleanup(); - return INVALID_TASK_SCRIPT_PATH; + fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir); + exit_code = OUT_OF_MEMORY; + goto cleanup; } + errno = 0; exit_code = check_path(task_script_path); if(exit_code != 0) { goto cleanup; } - errno = 0; - exit_code = check_owner(uid, task_script_path); - if(exit_code != 0) { + + //change the user + fcloseall(); + umask(0007); + if (change_user(user) != 0) { + exit_code = SETUID_OPER_FAILED; goto cleanup; } + errno = 0; cleanup(); execlp(task_script_path, task_script_path, NULL); if (errno != 0) { - free(task_script_path); - exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; - } + switch (errno) { + case E2BIG: + fprintf(LOGFILE, + "The total number of bytes in the environment (envp)\ + and argument list (argv) is too large."); + break; + case EACCES: + fprintf(LOGFILE, + "Search permission is denied on a component of the path prefix\ + of filename or the name of a script interpreter. or"); + fprintf(LOGFILE, + "Execute permission is denied for the file or a script or \ + ELF interpreter."); + break; + case ENOENT: + fprintf(LOGFILE, + "The file filename or a script or ELF interpreter does not exist,\ + or a shared library needed for file or interpreter cannot be found."); + break; + } + free(task_script_path); + exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; + } return exit_code; cleanup: + if (job_dir != NULL) { + free(job_dir); + } + if (jars_dir != NULL) { + free(jars_dir); + } if (task_script_path != NULL) { free(task_script_path); } @@ -261,19 +767,23 @@ cleanup: * The function sends appropriate signal to the process group * specified by the task_pid. */ - int kill_user_task(const char *user, const char *task_pid, int sig) { int pid = 0; if(task_pid == NULL) { return INVALID_ARGUMENT_NUMBER; } + + fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user); + fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid); + fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig); + pid = atoi(task_pid); if(pid <= 0) { return INVALID_TASK_PID; } - fclose(LOGFILE); + fcloseall(); if (change_user(user) != 0) { cleanup(); @@ -283,6 +793,7 @@ int kill_user_task(const char *user, const char *task_pid, int sig) { //Don't continue if the process-group is not alive anymore. if(kill(-pid,0) < 0) { errno = 0; + cleanup(); return 0; } @@ -297,4 +808,3 @@ int kill_user_task(const char *user, const char *task_pid, int sig) { cleanup(); return 0; } - diff --git src/c++/task-controller/task-controller.h src/c++/task-controller/task-controller.h index 8e545b5..f160688 100644 --- src/c++/task-controller/task-controller.h +++ src/c++/task-controller/task-controller.h @@ -28,14 +28,20 @@ #include #include #include -#include +#include +#include +#include +#include + #include "configuration.h" //command definitions enum command { + INITIALIZE_JOB, LAUNCH_TASK_JVM, + INITIALIZE_TASK, TERMINATE_TASK_JVM, - KILL_TASK_JVM + KILL_TASK_JVM, }; enum errorcodes { @@ -45,22 +51,43 @@ enum errorcodes { SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4 INVALID_TT_ROOT, //5 SETUID_OPER_FAILED, //6 - INVALID_TASK_SCRIPT_PATH, //7 - UNABLE_TO_EXECUTE_TASK_SCRIPT, //8 - UNABLE_TO_KILL_TASK, //9 - INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10 - INVALID_TASK_PID, //11 - ERROR_RESOLVING_FILE_PATH, //12 - RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13 - UNABLE_TO_STAT_FILE, //14 - FILE_NOT_OWNED_BY_TASKTRACKER //15 + UNABLE_TO_EXECUTE_TASK_SCRIPT, //7 + UNABLE_TO_KILL_TASK, //8 + INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //9 + INVALID_TASK_PID, //10 + ERROR_RESOLVING_FILE_PATH, //11 + RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //12 + UNABLE_TO_STAT_FILE, //13 + FILE_NOT_OWNED_BY_TASKTRACKER, //14 + PREPARE_ATTEMPT_DIRECTORIES_FAILED, //15 + INITIALIZE_JOB_FAILED, //16 + PREPARE_TASK_LOGS_FAILED, //17 + INVALID_TT_LOG_DIR, //18 + FINALIZE_TASK_DIRS_FAILED, //19 + FINALIZE_JOB_FAILED, //20 + OUT_OF_MEMORY, //21 }; +#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s" + +#define TT_ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN"/%s" + +#define JOB_DIR_TO_JARS_PATTERN "%s/jars" + +#define JOB_DIR_TO_WORK_DIR_PATTERN "%s/work" -#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh" +#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s" + +#define JOB_DIR_TO_TASK_WORK_DIR_PATTERN "%s/%s/work" + +#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s" + +#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh" #define TT_SYS_DIR_KEY "mapred.local.dir" +#define TT_LOG_DIR_KEY "hadoop.log.dir" + #define MAX_ITEMS 10 #ifndef HADOOP_CONF_DIR @@ -74,8 +101,25 @@ extern FILE *LOGFILE; void display_usage(FILE *stream); -int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root); +int run_task_as_user(const char * user, const char *jobid, const char *taskid, + const char *tt_root, const char *log_dir); + +int initialize_task(const char *jobid, const char *taskid, + const char *log_dir, const char *user); + +int initialize_job(const char *jobid, const char *user); int kill_user_task(const char *user, const char *task_pid, int sig); +int prepare_attempt_directory(const char *attempt_dir, const char *user); + +// The following functions are exposed for testing + +int check_variable_against_config(const char *config_key, + const char *passed_value); + int get_user_details(const char *user); + +int check_path(char *path); + +char *get_job_cache_directory(const char * tt_root); diff --git src/c++/task-controller/test-task-controller.c src/c++/task-controller/test-task-controller.c new file mode 100644 index 0000000..9660f1a --- /dev/null +++ src/c++/task-controller/test-task-controller.c @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "task-controller.h" + +#define HADOOP_CONF_DIR "/tmp" + +int write_config_file(char *file_name) { + FILE *file; + char const *str = + "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n"; + + file = fopen(file_name, "w"); + if (file == NULL) { + printf("Failed to open %s.\n", file_name); + return EXIT_FAILURE; + } + fwrite(str, 1, strlen(str), file); + fclose(file); + return 0; +} + +void test_check_variable_against_config() { + + // A temporary configuration directory + char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX"; + + // To accomodate "/conf/taskcontroller.cfg" + char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")]; + + strcpy(template, conf_dir_templ); + char *temp_dir = mkdtemp(template); + if (temp_dir == NULL) { + printf("Couldn't create a temporary dir for conf.\n"); + goto cleanup; + } + + // Set the configuration directory + hadoop_conf_dir = strdup(temp_dir); + + // create the configuration directory + strcat(template, "/conf"); + char *conf_dir = strdup(template); + mkdir(conf_dir, S_IRWXU); + + // create the configuration file + strcat(template, "/taskcontroller.cfg"); + if (write_config_file(template) != 0) { + printf("Couldn't write the configuration file.\n"); + goto cleanup; + } + + // Test obtaining a value for a key from the config + char *config_values[4] = { "/tmp/testing1", "/tmp/testing2", + "/tmp/testing3", "/tmp/testing4" }; + char *value = (char *) get_value("mapred.local.dir"); + if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4") + != 0) { + printf("Obtaining a value for a key from the config failed.\n"); + goto cleanup; + } + + // Test the parsing of a multiple valued key from the config + char **values = (char **)get_values("mapred.local.dir"); + char **values_ptr = values; + int i = 0; + while (*values_ptr != NULL) { + printf(" value : %s\n", *values_ptr); + if (strcmp(*values_ptr, config_values[i++]) != 0) { + printf("Configured values are not read out properly. Test failed!"); + goto cleanup;; + } + values_ptr++; + } + + if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) { + printf("Configuration should not contain /tmp/testing5! \n"); + goto cleanup; + } + + if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) { + printf("Configuration should contain /tmp/testing4! \n"); + goto cleanup; + } + + cleanup: if (value != NULL) { + free(value); + } + if (values != NULL) { + free(values); + } + if (hadoop_conf_dir != NULL) { + free(hadoop_conf_dir); + } + unlink(template); + rmdir(conf_dir); + rmdir(hadoop_conf_dir); +} + +void test_get_job_directory() { + char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001"); + printf("job_dir obtained is %s\n", job_dir); + int ret = 0; + if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) { + ret = -1; + } + free(job_dir); + assert(ret == 0); +} + +void test_get_attempt_directory() { + char *attempt_dir = (char *) get_attempt_directory( + "/tmp/taskTracker/jobcache/job_200906101234_0001", + "attempt_200906112028_0001_m_000000_0"); + printf("attempt_dir obtained is %s\n", attempt_dir); + int ret = 0; + if (strcmp( + attempt_dir, + "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0") + != 0) { + ret = -1; + } + free(attempt_dir); + assert(ret == 0); +} + +void test_get_task_launcher_file() { + char + *task_file = + (char *) get_task_launcher_file( + "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0"); + printf("task_file obtained is %s\n", task_file); + int ret = 0; + if (strcmp( + task_file, + "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh") + != 0) { + ret = -1; + } + free(task_file); + assert(ret == 0); +} + +void test_get_task_log_dir() { + char *logdir = (char *) get_task_log_dir("/tmp/testing", + "attempt_200906112028_0001_m_000000_0"); + printf("logdir obtained is %s\n", logdir); + int ret = 0; + if (strcmp(logdir, + "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) { + ret = -1; + } + free(logdir); + assert(ret == 0); +} + +int main(int argc, char **argv) { + printf("Starting tests\n"); + LOGFILE = stdout; + test_check_variable_against_config(); + test_get_job_directory(); + test_get_attempt_directory(); + test_get_task_launcher_file(); + test_get_task_log_dir(); + printf("Finished tests\n"); + return 0; +} diff --git src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java index 8e6842e..8439824 100644 --- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java +++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java @@ -65,7 +65,7 @@ public class TestStreamingAsDifferentUser extends "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") }; StreamJob streamJob = new StreamJob(args, true); streamJob.setConf(myConf); - streamJob.go(); + assertTrue("Job has not succeeded", streamJob.go() == 0); assertOwnerShip(outputPath); } } diff --git src/java/org/apache/hadoop/mapred/BackupStore.java src/java/org/apache/hadoop/mapred/BackupStore.java index 52d0175..77724a3 100644 --- src/java/org/apache/hadoop/mapred/BackupStore.java +++ src/java/org/apache/hadoop/mapred/BackupStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapred; import java.io.DataOutputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; @@ -548,10 +547,9 @@ public class BackupStore { boolean isActive() { return isActive; } private Writer createSpillFile() throws IOException { - Path tmp = new Path( - TaskTracker.getIntermediateOutputDir( - tid.getJobID().toString(), tid.toString()) + - "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); + Path tmp = + new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_" + + (spillNumber++) + ".out"); LOG.info("Created file: " + tmp); diff --git src/java/org/apache/hadoop/mapred/Child.java src/java/org/apache/hadoop/mapred/Child.java index 3ea7dc8..ab593cf 100644 --- src/java/org/apache/hadoop/mapred/Child.java +++ src/java/org/apache/hadoop/mapred/Child.java @@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.jvm.JvmMetrics; import org.apache.log4j.LogManager; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; /** * The main() for child processes. @@ -168,14 +169,15 @@ class Child { LOG.fatal("FSError from child", e); umbilical.fsError(taskid, e.getMessage()); } catch (Throwable throwable) { - LOG.warn("Error running child", throwable); + LOG.warn("Error running child : " + + StringUtils.stringifyException(throwable)); try { if (task != null) { // do cleanup for the task task.taskCleanup(umbilical); } } catch (Throwable th) { - LOG.info("Error cleaning up" + th); + LOG.info("Error cleaning up : " + StringUtils.stringifyException(th)); } // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git src/java/org/apache/hadoop/mapred/DefaultTaskController.java src/java/org/apache/hadoop/mapred/DefaultTaskController.java index 174403b..c8e42bf 100644 --- src/java/org/apache/hadoop/mapred/DefaultTaskController.java +++ src/java/org/apache/hadoop/mapred/DefaultTaskController.java @@ -85,7 +85,7 @@ class DefaultTaskController extends TaskController { * extra from what TaskTracker has done. */ @Override - void initializeJob(JobID jobId) { + void initializeJob(InitializeJobContext context) { } @Override diff --git src/java/org/apache/hadoop/mapred/IsolationRunner.java src/java/org/apache/hadoop/mapred/IsolationRunner.java index 4ae1b2a..6d302f2 100644 --- src/java/org/apache/hadoop/mapred/IsolationRunner.java +++ src/java/org/apache/hadoop/mapred/IsolationRunner.java @@ -31,13 +31,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JvmTask; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; /** * IsolationRunner is intended to facilitate debugging by re-running a specific @@ -169,17 +164,23 @@ public class IsolationRunner { // setup the local and user working directories FileSystem local = FileSystem.getLocal(conf); LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); - File workDirName = TaskRunner.formWorkDir(lDirAlloc, taskId, false, conf); - local.setWorkingDirectory(new Path(workDirName.toString())); + // Child's conf.xml is the sandboxed version, so the mapred.local.dir list + // consists of attempt-dirs. + Path workDirName = lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf); + local.setWorkingDirectory(workDirName); FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory()); // set up a classloader with the right classpath - ClassLoader classLoader = makeClassLoader(conf, workDirName); + ClassLoader classLoader = + makeClassLoader(conf, new File(workDirName.toString())); Thread.currentThread().setContextClassLoader(classLoader); conf.setClassLoader(classLoader); - - Path localSplit = new Path(new Path(jobFilename.toString()).getParent(), - "split.dta"); + + // Child's conf.xml is the sandboxed version, so the mapred.local.dir list + // consists of attempt-dirs. + Path localSplit = + new LocalDirAllocator("mapred.local.dir").getLocalPathToRead( + TaskTracker.LOCAL_SPLIT_FILE, conf); DataInputStream splitFile = FileSystem.getLocal(conf).open(localSplit); String splitClass = Text.readString(splitFile); BytesWritable split = new BytesWritable(); diff --git src/java/org/apache/hadoop/mapred/JvmManager.java src/java/org/apache/hadoop/mapred/JvmManager.java index c2a463b..d1566aa 100644 --- src/java/org/apache/hadoop/mapred/JvmManager.java +++ src/java/org/apache/hadoop/mapred/JvmManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.ProcessTree; +import org.apache.hadoop.util.StringUtils; class JvmManager { @@ -111,7 +112,8 @@ class JvmManager { } } - public TaskInProgress getTaskForJvm(JVMId jvmId) { + public TaskInProgress getTaskForJvm(JVMId jvmId) + throws IOException { if (jvmId.isMapJVM()) { return mapJvmManager.getTaskForJvm(jvmId); } else { @@ -177,7 +179,8 @@ class JvmManager { jvmIdToRunner.get(jvmId).setBusy(true); } - synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) { + synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) + throws IOException { if (jvmToRunningTask.containsKey(jvmId)) { //Incase of JVM reuse, tasks are returned to previously launched //JVM via this method. However when a new task is launched @@ -185,16 +188,26 @@ class JvmManager { TaskRunner taskRunner = jvmToRunningTask.get(jvmId); JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); Task task = taskRunner.getTaskInProgress().getTask(); - TaskControllerContext context = - new TaskController.TaskControllerContext(); + + // Initialize task dirs + TaskControllerContext context = + new TaskController.TaskControllerContext(); context.env = jvmRunner.env; context.task = task; - //If we are returning the same task as which the JVM was launched - //we don't initialize task once again. - if(!jvmRunner.env.conf.get("mapred.task.id"). - equals(task.getTaskID().toString())) { - tracker.getTaskController().initializeTask(context); + // If we are returning the same task as which the JVM was launched + // we don't initialize task once again. + if (!jvmRunner.env.conf.get("mapred.task.id").equals( + task.getTaskID().toString())) { + try { + tracker.getTaskController().initializeTask(context); + } catch (IOException e) { + LOG.warn("Failed to initialize the new task " + + task.getTaskID().toString() + " to be given to JVM with id " + + jvmId); + throw e; + } } + return taskRunner.getTaskInProgress(); } return null; @@ -393,23 +406,24 @@ class JvmManager { //Launch the task controller to run task JVM initalContext.task = jvmToRunningTask.get(jvmId).getTask(); initalContext.env = env; - tracker.getTaskController().initializeTask(initalContext); tracker.getTaskController().launchTaskJVM(initalContext); } catch (IOException ioe) { // do nothing // error and output are appropriately redirected + LOG.info("Child exited with the exception " + + StringUtils.stringifyException(ioe)); } finally { // handle the exit code shexec = initalContext.shExec; if (shexec == null) { return; } - + kill(); - + int exitCode = shexec.getExitCode(); updateOnJvmExit(jvmId, exitCode); - LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + - numTasksRan); + LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode + + ". Number of tasks it ran: " + numTasksRan); try { // In case of jvm-reuse, //the task jvm cleans up the common workdir for every @@ -438,6 +452,7 @@ class JvmManager { .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); + // Destroy the task jvm controller.destroyTaskJVM(initalContext); } else { LOG.info(String.format("JVM Not killed %s but just removed", jvmId diff --git src/java/org/apache/hadoop/mapred/LinuxTaskController.java src/java/org/apache/hadoop/mapred/LinuxTaskController.java index c21ccb1..f67bcc0 100644 --- src/java/org/apache/hadoop/mapred/LinuxTaskController.java +++ src/java/org/apache/hadoop/mapred/LinuxTaskController.java @@ -24,13 +24,13 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -73,44 +73,19 @@ class LinuxTaskController extends TaskController { new File(hadoopBin, "task-controller").getAbsolutePath(); } - // The list of directory paths specified in the - // variable mapred.local.dir. This is used to determine - // which among the list of directories is picked up - // for storing data for a particular task. - private String[] mapredLocalDirs; - - // permissions to set on files and directories created. - // When localized files are handled securely, this string - // will change to something more restrictive. Until then, - // it opens up the permissions for all, so that the tasktracker - // and job owners can access files together. - private static final String FILE_PERMISSIONS = "ugo+rwx"; - - // permissions to set on components of the path leading to - // localized files and directories. Read and execute permissions - // are required for different users to be able to access the - // files. - private static final String PATH_PERMISSIONS = "go+rx"; - public LinuxTaskController() { super(); } - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - mapredLocalDirs = conf.getStrings("mapred.local.dir"); - //Setting of the permissions of the local directory is done in - //setup() - } - /** * List of commands that the setuid script will execute. */ enum TaskCommands { + INITIALIZE_JOB, LAUNCH_TASK_JVM, + INITIALIZE_TASK, TERMINATE_TASK_JVM, - KILL_TASK_JVM + KILL_TASK_JVM, } /** @@ -150,48 +125,105 @@ class LinuxTaskController extends TaskController { ShellCommandExecutor shExec = buildTaskControllerExecutor( TaskCommands.LAUNCH_TASK_JVM, env.conf.getUser(), - launchTaskJVMArgs, env); + launchTaskJVMArgs, env.workDir, env.env); context.shExec = shExec; try { shExec.execute(); } catch (Exception e) { - LOG.warn("Exception thrown while launching task JVM : " + - StringUtils.stringifyException(e)); + // TODO: check for 143 (SIGTERM) and 137 (SIGKILL) exit codes + LOG.warn("Exception thrown while launching task JVM : " + + StringUtils.stringifyException(e)); LOG.warn("Exit code from task is : " + shExec.getExitCode()); - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); + LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); + logOutput(shExec.getOutput()); throw new IOException(e); } - if(LOG.isDebugEnabled()) { - LOG.debug("output after executing task jvm = " + shExec.getOutput()); + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); + logOutput(shExec.getOutput()); } } /** - * Returns list of arguments to be passed while launching task VM. - * See {@code buildTaskControllerExecutor(TaskCommands, - * String, List, JvmEnv)} documentation. + * Helper method that runs a LinuxTaskController command + * + * @param taskCommand + * @param user + * @param cmdArgs + * @param env + * @throws IOException + */ + private void runCommand(TaskCommands taskCommand, String user, + List cmdArgs, File workDir, Map env) + throws IOException { + + ShellCommandExecutor shExec = + buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env); + try { + shExec.execute(); + } catch (Exception e) { + LOG.warn("Exit code from " + taskCommand.toString() + " is : " + + shExec.getExitCode()); + LOG.warn("Exception thrown by " + taskCommand.toString() + " : " + + StringUtils.stringifyException(e)); + LOG.info("Output from LinuxTaskController's " + taskCommand.toString() + + " follows:"); + logOutput(shExec.getOutput()); + throw new IOException(e); + } + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's " + taskCommand.toString() + + " follows:"); + logOutput(shExec.getOutput()); + } + } + + /** + * Returns list of arguments to be passed while initializing a new task. See + * {@code buildTaskControllerExecutor(TaskCommands, String, List, + * JvmEnv)} documentation. + * * @param context * @return Argument to be used while launching Task VM */ - private List buildLaunchTaskArgs(TaskControllerContext context) { + private List buildInitializeTaskArgs(TaskControllerContext context) { List commandArgs = new ArrayList(3); String taskId = context.task.getTaskID().toString(); String jobId = getJobId(context); - LOG.debug("getting the task directory as: " - + getTaskCacheDirectory(context)); - commandArgs.add(getDirectoryChosenForTask( - new File(getTaskCacheDirectory(context)), - context)); commandArgs.add(jobId); - if(!context.task.isTaskCleanupTask()) { + if (!context.task.isTaskCleanupTask()) { commandArgs.add(taskId); - }else { + } else { commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); } + File logDir = new File(TaskLog.getBaseLogDir()); + try { + commandArgs.add(logDir.getCanonicalPath()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return commandArgs; } - - // get the Job ID from the information in the TaskControllerContext + + @Override + void initializeTask(TaskControllerContext context) + throws IOException { + LOG.info("Going to do " + TaskCommands.INITIALIZE_TASK.toString() + + " for " + context.task.getTaskID().toString()); + runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(), + buildInitializeTaskArgs(context), context.env.workDir, context.env.env); + } + + private void logOutput(String output) { + String shExecOutput = output; + if (shExecOutput != null) { + for (String str : shExecOutput.split("\n")) { + LOG.info(str); + } + } + } + private String getJobId(TaskControllerContext context) { String taskId = context.task.getTaskID().toString(); TaskAttemptID tId = TaskAttemptID.forName(taskId); @@ -199,6 +231,29 @@ class LinuxTaskController extends TaskController { return jobId; } + /** + * Returns list of arguments to be passed while launching task VM. + * See {@code buildTaskControllerExecutor(TaskCommands, + * String, List, JvmEnv)} documentation. + * @param context + * @return Argument to be used while launching Task VM + */ + private List buildLaunchTaskArgs(TaskControllerContext context) { + List commandArgs = new ArrayList(3); + String taskId = context.task.getTaskID().toString(); + String jobId = getJobId(context); + LOG.info("getting the task directory as: " + + getTaskCacheDirectory(context)); + LOG.info("getting the tt_root as " +getDirectoryChosenForTask( + new File(getTaskCacheDirectory(context)), + context) ); + commandArgs.add(getDirectoryChosenForTask( + new File(getTaskCacheDirectory(context)), + context)); + commandArgs.addAll(buildInitializeTaskArgs(context)); + return commandArgs; + } + // Get the directory from the list of directories configured // in mapred.local.dir chosen for storing data pertaining to // this task. @@ -208,8 +263,9 @@ class LinuxTaskController extends TaskController { String taskId = context.task.getTaskID().toString(); for (String dir : mapredLocalDirs) { File mapredDir = new File(dir); - File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir( - jobId, taskId, context.task.isTaskCleanupTask())); + // TODO: fix + File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir( + jobId, taskId, context.task.isTaskCleanupTask())).getParentFile(); if (directory.equals(taskDir)) { return dir; } @@ -219,68 +275,7 @@ class LinuxTaskController extends TaskController { throw new IllegalArgumentException("invalid task cache directory " + directory.getAbsolutePath()); } - - /** - * Setup appropriate permissions for directories and files that - * are used by the task. - * - * As the LinuxTaskController launches tasks as a user, different - * from the daemon, all directories and files that are potentially - * used by the tasks are setup with appropriate permissions that - * will allow access. - * - * Until secure data handling is implemented (see HADOOP-4491 and - * HADOOP-4493, for e.g.), the permissions are set up to allow - * read, write and execute access for everyone. This will be - * changed to restricted access as data is handled securely. - */ - void initializeTask(TaskControllerContext context) { - // Setup permissions for the job and task cache directories. - setupTaskCacheFileAccess(context); - // setup permissions for task log directory - setupTaskLogFileAccess(context); - } - - // Allows access for the task to create log files under - // the task log directory - private void setupTaskLogFileAccess(TaskControllerContext context) { - TaskAttemptID taskId = context.task.getTaskID(); - File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG); - String taskAttemptLogDir = f.getParentFile().getAbsolutePath(); - changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false); - } - - // Allows access for the task to read, write and execute - // the files under the job and task cache directories - private void setupTaskCacheFileAccess(TaskControllerContext context) { - String taskId = context.task.getTaskID().toString(); - JobID jobId = JobID.forName(getJobId(context)); - //Change permission for the task across all the disks - for(String localDir : mapredLocalDirs) { - File f = new File(localDir); - File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir( - jobId.toString(), taskId, context.task.isTaskCleanupTask())); - if(taskCacheDir.exists()) { - changeDirectoryPermissions(taskCacheDir.getPath(), - FILE_PERMISSIONS, true); - } - }//end of local directory Iteration - } - // convenience method to execute chmod. - private void changeDirectoryPermissions(String dir, String mode, - boolean isRecursive) { - int ret = 0; - try { - ret = FileUtil.chmod(dir, mode, isRecursive); - } catch (Exception e) { - LOG.warn("Exception in changing permissions for directory " + dir + - ". Exception: " + e.getMessage()); - } - if (ret != 0) { - LOG.warn("Could not change permissions for directory " + dir); - } - } /** * Builds the command line for launching/terminating/killing task JVM. * Following is the format for launching/terminating/killing task JVM @@ -295,14 +290,14 @@ class LinuxTaskController extends TaskController { * @param command command to be executed. * @param userName user name * @param cmdArgs list of extra arguments - * @param env JVM environment variables. + * @param env JVM environment variables. : TODO: fix * @return {@link ShellCommandExecutor} * @throws IOException */ - private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, - String userName, - List cmdArgs, JvmEnv env) - throws IOException { + private ShellCommandExecutor buildTaskControllerExecutor( + TaskCommands command, String userName, List cmdArgs, + File workDir, Map env) + throws IOException { String[] taskControllerCmd = new String[3 + cmdArgs.size()]; taskControllerCmd[0] = getTaskControllerExecutablePath(); taskControllerCmd[1] = userName; @@ -317,9 +312,9 @@ class LinuxTaskController extends TaskController { } } ShellCommandExecutor shExec = null; - if(env.workDir != null && env.workDir.exists()) { + if(workDir != null && workDir.exists()) { shExec = new ShellCommandExecutor(taskControllerCmd, - env.workDir, env.env); + workDir, env); } else { shExec = new ShellCommandExecutor(taskControllerCmd); } @@ -333,6 +328,7 @@ class LinuxTaskController extends TaskController { // is different from what is set with respect with // env.workDir. Hence building this from the taskId everytime. String taskId = context.task.getTaskID().toString(); + // TODO: fix the following line or document it. File cacheDirForJob = context.env.workDir.getParentFile().getParentFile(); if(context.task.isTaskCleanupTask()) { taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX; @@ -371,68 +367,27 @@ class LinuxTaskController extends TaskController { } } } - - /** - * Sets up the permissions of the following directories: - * - * Job cache directory - * Archive directory - * Hadoop log directories - * - */ - @Override - void setup() { - //set up job cache directory and associated permissions - String localDirs[] = this.mapredLocalDirs; - for(String localDir : localDirs) { - //Cache root - File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir()); - File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir()); - if(!cacheDirectory.exists()) { - if(!cacheDirectory.mkdirs()) { - LOG.warn("Unable to create cache directory : " + - cacheDirectory.getPath()); - } - } - if(!jobCacheDirectory.exists()) { - if(!jobCacheDirectory.mkdirs()) { - LOG.warn("Unable to create job cache directory : " + - jobCacheDirectory.getPath()); - } - } - //Give world writable permission for every directory under - //mapred-local-dir. - //Child tries to write files under it when executing. - changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true); - }//end of local directory manipulations - //setting up perms for user logs - File taskLog = TaskLog.getUserLogDir(); - changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false); + private List buildInitializeJobCommandArgs( + InitializeJobContext context) { + List initJobCmdArgs = new ArrayList(); + initJobCmdArgs.add(context.jobid.toString()); + return initJobCmdArgs; } /* + *TODO: fix * Create Job directories across disks and set their permissions to 777 * This way when tasks are run we just need to setup permissions for * task folder. */ @Override - void initializeJob(JobID jobid) { - for(String localDir : this.mapredLocalDirs) { - File jobDirectory = new File(localDir, - TaskTracker.getLocalJobDir(jobid.toString())); - if(!jobDirectory.exists()) { - if(!jobDirectory.mkdir()) { - LOG.warn("Unable to create job cache directory : " - + jobDirectory.getPath()); - continue; - } - } - //Should be recursive because the jar and work folders might be - //present under the job cache directory - changeDirectoryPermissions( - jobDirectory.getPath(), FILE_PERMISSIONS, true); - } + void initializeJob(InitializeJobContext context) + throws IOException { + LOG.info("Going to initialize job " + context.jobid.toString() + + " on the TT"); + runCommand(TaskCommands.INITIALIZE_JOB, context.user, + buildInitializeJobCommandArgs(context), context.workDir, null); } /** @@ -467,7 +422,7 @@ class LinuxTaskController extends TaskController { } ShellCommandExecutor shExec = buildTaskControllerExecutor( command, context.env.conf.getUser(), - buildKillTaskCommandArgs(context), context.env); + buildKillTaskCommandArgs(context), context.env.workDir, context.env.env); try { shExec.execute(); } catch (Exception e) { @@ -498,6 +453,5 @@ class LinuxTaskController extends TaskController { protected String getTaskControllerExecutablePath() { return taskControllerExe; - } + } } - diff --git src/java/org/apache/hadoop/mapred/LocalJobRunner.java src/java/org/apache/hadoop/mapred/LocalJobRunner.java index bc80fd4..6df3093 100644 --- src/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; @@ -64,7 +64,7 @@ class LocalJobRunner implements JobSubmissionProtocol { private JobStatus status; private ArrayList mapIds = new ArrayList(); - private MapOutputFile mapoutputFile; + private JobProfile profile; private Path localFile; private FileSystem localFs; @@ -84,8 +84,6 @@ class LocalJobRunner implements JobSubmissionProtocol { public Job(JobID jobid, JobConf conf) throws IOException { this.file = new Path(getSystemDir(), jobid + "/job.xml"); this.id = jobid; - this.mapoutputFile = new MapOutputFile(jobid); - this.mapoutputFile.setConf(conf); this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml"); this.localFs = FileSystem.getLocal(conf); @@ -160,7 +158,9 @@ class LocalJobRunner implements JobSubmissionProtocol { } outputCommitter.setupJob(jContext); status.setSetupProgress(1.0f); - + + Map mapOutputFiles = + new HashMap(); for (int i = 0; i < rawSplits.length; i++) { if (!this.isInterrupted()) { TaskAttemptID mapId = new TaskAttemptID( @@ -171,6 +171,12 @@ class LocalJobRunner implements JobSubmissionProtocol { rawSplits[i].getClassName(), rawSplits[i].getBytes(), 1); JobConf localConf = new JobConf(job); + TaskRunner.setupChildMapredLocalDirs(map, localConf); + + MapOutputFile mapOutput = new MapOutputFile(map.getJobID()); + mapOutput.setConf(localConf); + mapOutputFiles.put(mapId, mapOutput); + map.setJobFile(localFile.toString()); map.localizeConfiguration(localConf); map.setConf(localConf); @@ -188,14 +194,20 @@ class LocalJobRunner implements JobSubmissionProtocol { new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0); try { if (numReduceTasks > 0) { + ReduceTask reduce = new ReduceTask(file.toString(), + reduceId, 0, mapIds.size(), 1); + JobConf localConf = new JobConf(job); + TaskRunner.setupChildMapredLocalDirs(reduce, localConf); // move map output to reduce input for (int i = 0; i < mapIds.size(); i++) { if (!this.isInterrupted()) { TaskAttemptID mapId = mapIds.get(i); - Path mapOut = this.mapoutputFile.getOutputFile(mapId); - Path reduceIn = this.mapoutputFile.getInputFileForWrite( - mapId.getTaskID(),reduceId, - localFs.getFileStatus(mapOut).getLen()); + Path mapOut = mapOutputFiles.get(mapId).getOutputFile(); + MapOutputFile localOutputFile = new MapOutputFile(jobId); + localOutputFile.setConf(localConf); + Path reduceIn = + localOutputFile.getInputFileForWrite(mapId.getTaskID(), + localFs.getFileStatus(mapOut).getLen()); if (!localFs.mkdirs(reduceIn.getParent())) { throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString()); @@ -207,9 +219,6 @@ class LocalJobRunner implements JobSubmissionProtocol { } } if (!this.isInterrupted()) { - ReduceTask reduce = new ReduceTask(file.toString(), - reduceId, 0, mapIds.size(), 1); - JobConf localConf = new JobConf(job); reduce.setJobFile(localFile.toString()); reduce.localizeConfiguration(localConf); reduce.setConf(localConf); @@ -224,11 +233,8 @@ class LocalJobRunner implements JobSubmissionProtocol { } } } finally { - for (TaskAttemptID mapId: mapIds) { - this.mapoutputFile.removeAll(mapId); - } - if (numReduceTasks == 1) { - this.mapoutputFile.removeAll(reduceId); + for (MapOutputFile output : mapOutputFiles.values()) { + output.removeAll(); } } // delete the temporary directory in output directory diff --git src/java/org/apache/hadoop/mapred/MapOutputFile.java src/java/org/apache/hadoop/mapred/MapOutputFile.java index 30b71c9..d4afd05 100644 --- src/java/org/apache/hadoop/mapred/MapOutputFile.java +++ src/java/org/apache/hadoop/mapred/MapOutputFile.java @@ -31,7 +31,9 @@ class MapOutputFile { private JobConf conf; private JobID jobId; - + + static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; + MapOutputFile() { } @@ -42,132 +44,143 @@ class MapOutputFile { private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); - /** Return the path to local map output file created earlier - * @param mapTaskId a map task id + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException */ - public Path getOutputFile(TaskAttemptID mapTaskId) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out", conf); + public Path getOutputFile() + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR + + "file.out", conf); } - /** Create a local map output file name. - * @param mapTaskId a map task id + /** + * Create a local map output file name. + * * @param size the size of the file + * @return path + * @throws IOException */ - public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out", size, conf); + public Path getOutputFileForWrite(long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR + + "file.out", size, conf); } - /** Return the path to a local map output index file created earlier - * @param mapTaskId a map task id + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException */ - public Path getOutputIndexFile(TaskAttemptID mapTaskId) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out.index", conf); + public Path getOutputIndexFile() + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR + + "file.out.index", conf); } - /** Create a local map output index file name. - * @param mapTaskId a map task id + /** + * Create a local map output index file name. + * * @param size the size of the file + * @return path + * @throws IOException */ - public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out.index", - size, conf); + public Path getOutputIndexFileForWrite(long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR + + "file.out.index", size, conf); } - /** Return a local map spill file created earlier. - * @param mapTaskId a map task id + /** + * Return a local map spill file created earlier. + * * @param spillNumber the number + * @return path + * @throws IOException */ - public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" - + spillNumber + ".out", conf); + public Path getSpillFile(int spillNumber) + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out", conf); } - /** Create a local map spill file name. - * @param mapTaskId a map task id + /** + * Create a local map spill file name. + * * @param spillNumber the number * @param size the size of the file + * @return path + * @throws IOException */ - public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + - spillNumber + ".out", size, conf); + public Path getSpillFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out", size, conf); } - /** Return a local map spill index file created earlier - * @param mapTaskId a map task id + /** + * Return a local map spill index file created earlier + * * @param spillNumber the number + * @return path + * @throws IOException */ - public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + - spillNumber + ".out.index", conf); + public Path getSpillIndexFile(int spillNumber) + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out.index", conf); } - /** Create a local map spill index file name. - * @param mapTaskId a map task id + /** + * Create a local map spill index file name. + * * @param spillNumber the number * @param size the size of the file + * @return path + * @throws IOException */ - public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + spillNumber + - ".out.index", size, conf); + public Path getSpillIndexFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill" + + spillNumber + ".out.index", size, conf); } - /** Return a local reduce input file created earlier - * @param mapTaskId a map task id - * @param reduceTaskId a reduce task id + /** + * Return a local reduce input file created earlier + * + * @param mapId a map task id + * @return path + * @throws IOException */ - public Path getInputFile(int mapId, TaskAttemptID reduceTaskId) - throws IOException { - // TODO *oom* should use a format here - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), reduceTaskId.toString()) - + "/map_" + mapId + ".out", - conf); + public Path getInputFile(int mapId) + throws IOException { + return lDirAlloc.getLocalPathToRead(String.format( + REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer + .valueOf(mapId)), conf); } - /** Create a local reduce input file name. - * @param mapTaskId a map task id - * @param reduceTaskId a reduce task id + /** + * Create a local reduce input file name. + * + * @param mapId a map task id * @param size the size of the file + * @return path + * @throws IOException */ - public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, - long size) - throws IOException { - // TODO *oom* should use a format here - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), reduceTaskId.toString()) - + "/map_" + mapId.getId() + ".out", - size, conf); + public Path getInputFileForWrite(TaskID mapId, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(String.format( + REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()), + size, conf); } /** Removes all of the files related to a task. */ - public void removeAll(TaskAttemptID taskId) throws IOException { - conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir( - jobId.toString(), taskId.toString()) -); + public void removeAll() + throws IOException { + conf.deleteLocalFiles(TaskTracker.OUTPUT); } public void setConf(Configuration conf) { diff --git src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/MapTask.java index 996415d..780af71 100644 --- src/java/org/apache/hadoop/mapred/MapTask.java +++ src/java/org/apache/hadoop/mapred/MapTask.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -66,7 +67,6 @@ class MapTask extends Task { * The size of each record in the index file for the map-outputs. */ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; - private BytesWritable split = new BytesWritable(); private String splitClass; @@ -101,11 +101,20 @@ class MapTask extends Task { } @Override - public void localizeConfiguration(JobConf conf) throws IOException { + public void localizeConfiguration(JobConf conf) + throws IOException { super.localizeConfiguration(conf); - if (isMapOrReduce()) { - Path localSplit = new Path(new Path(getJobFile()).getParent(), - "split.dta"); + // Write the split file to the local disk if it is a normal map task (not a + // job-setup or a job-cleanup task) and if the user wishes to run + // IsolationRunner either by setting keep.failed.tasks.files to true or by + // using keep.tasks.files.pattern + if (isMapOrReduce() + && (conf.getKeepTaskFilesPattern() != null || conf + .getKeepFailedTaskFiles())) { + Path localSplit = + new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite( + TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID() + .toString()), conf); LOG.debug("Writing local split to " + localSplit); DataOutputStream out = FileSystem.getLocal(conf).create(localSplit); Text.writeString(out, splitClass); @@ -1222,8 +1231,8 @@ class MapTask extends Task { try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); - final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), - numSpills, size); + final Path filename = + mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) @@ -1287,9 +1296,9 @@ class MapTask extends Task { if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file - Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( - getTaskID(), numSpills, - partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); + Path indexFilename = + mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions + * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); @@ -1315,8 +1324,8 @@ class MapTask extends Task { try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); - final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), - numSpills, size); + final Path filename = + mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); // we don't run the combiner for a single record @@ -1352,9 +1361,9 @@ class MapTask extends Task { } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file - Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( - getTaskID(), numSpills, - partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); + Path indexFilename = + mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions + * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); @@ -1444,14 +1453,14 @@ class MapTask extends Task { final TaskAttemptID mapId = getTaskID(); for(int i = 0; i < numSpills; i++) { - filename[i] = mapOutputFile.getSpillFile(mapId, i); + filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out")); if (indexCacheList.size() == 0) { - rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0), + rfs.rename(mapOutputFile.getSpillIndexFile(0), new Path(filename[0].getParent(),"file.out.index")); } else { indexCacheList.get(0).writeToFile( @@ -1462,7 +1471,7 @@ class MapTask extends Task { // read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { - Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i); + Path indexFileName = mapOutputFile.getSpillIndexFile(i); indexCacheList.add(new SpillRecord(indexFileName, job)); } @@ -1470,10 +1479,10 @@ class MapTask extends Task { //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; - Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId, - finalOutFileSize); - Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite( - mapId, finalIndexFileSize); + Path finalOutputFile = + mapOutputFile.getOutputFileForWrite(finalOutFileSize); + Path finalIndexFile = + mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); diff --git src/java/org/apache/hadoop/mapred/MapTaskRunner.java src/java/org/apache/hadoop/mapred/MapTaskRunner.java index 918b2a6..94750d0 100644 --- src/java/org/apache/hadoop/mapred/MapTaskRunner.java +++ src/java/org/apache/hadoop/mapred/MapTaskRunner.java @@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner { return false; } - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); return true; } /** Delete all of the temporary map output files. */ public void close() throws IOException { LOG.info(getTask()+" done; removing files."); - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); } } diff --git src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java index 9401f90..f01f363 100644 --- src/java/org/apache/hadoop/mapred/ReduceTask.java +++ src/java/org/apache/hadoop/mapred/ReduceTask.java @@ -208,7 +208,7 @@ class ReduceTask extends Task { if (isLocal) { // for local jobs for(int i = 0; i < numMaps; ++i) { - fileList.add(mapOutputFile.getInputFile(i, getTaskID())); + fileList.add(mapOutputFile.getInputFile(i)); } } else { // for non local jobs @@ -1284,12 +1284,11 @@ class ReduceTask extends Task { // else, we will check the localFS to find a suitable final location // for this path TaskAttemptID reduceId = reduceTask.getTaskID(); - Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir( - reduceId.getJobID().toString(), - reduceId.toString()) - + "/map_" + - loc.getTaskId().getId() + ".out"); - + Path filename = + new Path(String.format( + MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING, + TaskTracker.OUTPUT, loc.getTaskId().getId())); + // Copy the map output to a temp file whose name is unique to this attempt Path tmpMapOutput = new Path(filename+"-"+id); @@ -2326,8 +2325,8 @@ class ReduceTask extends Task { sortPhaseFinished = true; // must spill to disk, but can't retain in-mem for intermediate merge - final Path outputPath = mapOutputFile.getInputFileForWrite(mapId, - reduceTask.getTaskID(), inMemToDiskBytes); + final Path outputPath = + mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes); final RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, @@ -2621,8 +2620,8 @@ class ReduceTask extends Task { long mergeOutputSize = createInMemorySegments(inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); - Path outputPath = mapOutputFile.getInputFileForWrite(mapId, - reduceTask.getTaskID(), mergeOutputSize); + Path outputPath = + mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize); Writer writer = new Writer(conf, rfs, outputPath, diff --git src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java index 903354e..fc6d1ae 100644 --- src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java +++ src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java @@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunner { } // cleanup from failures - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); return true; } @@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunner { public void close() throws IOException { LOG.info(getTask()+" done; removing files."); getTask().getProgress().setStatus("closed"); - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); } } diff --git src/java/org/apache/hadoop/mapred/TaskController.java src/java/org/apache/hadoop/mapred/TaskController.java index 030bf6b..8ec42a3 100644 --- src/java/org/apache/hadoop/mapred/TaskController.java +++ src/java/org/apache/hadoop/mapred/TaskController.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.mapred.JvmManager.JvmEnv; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -45,18 +47,74 @@ abstract class TaskController implements Configurable { public Configuration getConf() { return conf; } - + + // The list of directory paths specified in the variable mapred.local.dir. + // This is used to determine which among the list of directories is picked up + // for storing data for a particular task. + protected String[] mapredLocalDirs; + public void setConf(Configuration conf) { this.conf = conf; + mapredLocalDirs = conf.getStrings("mapred.local.dir"); } - + + /** + * Sets up the permissions of the following directories on all the configured + * disks: + *
    + *
  • mapred-local directories
  • + *
  • Job cache directories
  • + *
  • Archive directories
  • + *
  • Hadoop log directories
  • + *
+ */ + void setup() { + for (String localDir : this.mapredLocalDirs) { + // Set up the mapred-local directories. + File mapredlocalDir = new File(localDir); + if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) { + LOG.warn("Unable to create mapred-local directory : " + + mapredlocalDir.getPath()); + } else { + FileUtil.setPermissions(mapredlocalDir, FileUtil.sevenFiveFive); + } + // TODO: should throw exception if failed on all disks? + + // Set up the cache directory used for distributed cache files + File distributedCacheDir = new File(localDir, TaskTracker.getDistributedCacheDir()); + if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) { + LOG.warn("Unable to create cache directory : " + + distributedCacheDir.getPath()); + } else { + FileUtil.setPermissions(distributedCacheDir, FileUtil.sevenFiveFive); + } + + // Set up the jobcache directory + File jobCacheDir = + new File(localDir, TaskTracker.getJobCacheSubdir()); + if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) { + LOG.warn("Unable to create job cache directory : " + + jobCacheDir.getPath()); + } else { + FileUtil.setPermissions(jobCacheDir, FileUtil.sevenFiveFive); + } + } + + // Set up the user log directory + File taskLog = TaskLog.getUserLogDir(); + FileUtil.setPermissions(taskLog, FileUtil.sevenFiveFive); + } + /** - * Setup task controller component. + * Take task-controller specific actions to initialize job. This involves + * moving the jobfiles i.e. contents of the directory + * taskTracker/system/jobid/userfiles to taskTracker/user/jobid/ so as to + * enable the launch tasks to share the contents from the users directory. * + * @throws IOException */ - abstract void setup(); - - + abstract void initializeJob(InitializeJobContext context) throws IOException; + /** * Launch a task JVM * @@ -65,7 +123,7 @@ abstract class TaskController implements Configurable { */ abstract void launchTaskJVM(TaskControllerContext context) throws IOException; - + /** * Top level cleanup a task JVM method. * @@ -90,47 +148,44 @@ abstract class TaskController implements Configurable { } killTask(context); } - - /** - * Perform initializing actions required before a task can run. - * - * For instance, this method can be used to setup appropriate - * access permissions for files and directories that will be - * used by tasks. Tasks use the job cache, log, PID and distributed cache - * directories and files as part of their functioning. Typically, - * these files are shared between the daemon and the tasks - * themselves. So, a TaskController that is launching tasks - * as different users can implement this method to setup - * appropriate ownership and permissions for these directories - * and files. - */ - abstract void initializeTask(TaskControllerContext context); - - + + /** Perform initializing actions required before a task can run. + * + * For instance, this method can be used to setup appropriate + * access permissions for files and directories that will be + * used by tasks. Tasks use the job cache, log, PID and distributed cache + * directories and files as part of their functioning. Typically, + * these files are shared between the daemon and the tasks + * themselves. So, a TaskController that is launching tasks + * as different users can implement this method to setup + * appropriate ownership and permissions for these directories + * and files. + */ + abstract void initializeTask(TaskControllerContext context) + throws IOException; + /** * Contains task information required for the task controller. */ static class TaskControllerContext { // task being executed - Task task; - // the JVM environment for the task - JvmEnv env; - // the Shell executor executing the JVM for this task - ShellCommandExecutor shExec; - // process handle of task JVM - String pid; - // waiting time before sending SIGKILL to task JVM after sending SIGTERM - long sleeptimeBeforeSigkill; + Task task; + ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task. + + // Information used only when this context is used for launching new tasks. + JvmEnv env; // the JVM environment for the task. + + // Information used only when this context is used for destroying a task jvm. + String pid; // process handle of task JVM. + long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM + } + + static class InitializeJobContext { + JobID jobid; + File workDir; + String user; } - /** - * Method which is called after the job is localized so that task controllers - * can implement their own job localization logic. - * - * @param tip Task of job for which localization happens. - */ - abstract void initializeJob(JobID jobId); - /** * Sends a graceful terminate signal to taskJVM and it sub-processes. * @@ -144,6 +199,5 @@ abstract class TaskController implements Configurable { * * @param context task context */ - abstract void killTask(TaskControllerContext context); } diff --git src/java/org/apache/hadoop/mapred/TaskLog.java src/java/org/apache/hadoop/mapred/TaskLog.java index 9baafd8..2091cfc 100644 --- src/java/org/apache/hadoop/mapred/TaskLog.java +++ src/java/org/apache/hadoop/mapred/TaskLog.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; import org.apache.log4j.Appender; @@ -55,8 +56,7 @@ public class TaskLog { LogFactory.getLog(TaskLog.class); private static final File LOG_DIR = - new File(System.getProperty("hadoop.log.dir"), - "userlogs").getAbsoluteFile(); + new File(getBaseLogDir(), "userlogs").getAbsoluteFile(); static LocalFileSystem localFS = null; static { @@ -156,8 +156,12 @@ public class TaskLog { return new File(getBaseDir(taskid), "log.index"); } } - - private static File getBaseDir(String taskid) { + + static String getBaseLogDir() { + return System.getProperty("hadoop.log.dir"); + } + + static File getBaseDir(String taskid) { return new File(LOG_DIR, taskid); } private static long prevOutLength; diff --git src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskRunner.java index 86c5868..c80208f 100644 --- src/java/org/apache/hadoop/mapred/TaskRunner.java +++ src/java/org/apache/hadoop/mapred/TaskRunner.java @@ -121,213 +121,51 @@ abstract class TaskRunner extends Thread { TaskAttemptID taskid = t.getTaskID(); LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf); - + URI[] archives = DistributedCache.getCacheArchives(conf); URI[] files = DistributedCache.getCacheFiles(conf); + // We don't create any symlinks yet, so presence/absence of workDir + // actually on the file system doesn't matter. setupDistributedCache(lDirAlloc, workDir, archives, files); - - if (!prepare()) { - return; - } - // Accumulates class paths for child. - List classPaths = new ArrayList(); - // start with same classpath as parent process - appendSystemClasspaths(classPaths); + // Set up the child task's configuration. After this call, no localization + // of files should happen in the TaskTracker's process space. Any changes to + // the conf object after this will NOT be reflected to the child. + setupChildTaskConfiguration(lDirAlloc); - if (!workDir.mkdirs()) { - if (!workDir.isDirectory()) { - LOG.fatal("Mkdirs failed to create " + workDir.toString()); - } + if (!prepare()) { + return; } - // include the user specified classpath - appendJobJarClasspaths(conf.getJar(), classPaths); - - // Distributed cache paths - appendDistributedCacheClasspaths(conf, archives, files, classPaths); - - // Include the working dir too - classPaths.add(workDir.toString()); - // Build classpath - - - // Build exec child JVM args. - Vector vargs = new Vector(8); - File jvm = // use same jvm as parent - new File(new File(System.getProperty("java.home"), "bin"), "java"); - - vargs.add(jvm.toString()); - - // Add child (task) java-vm options. - // - // The following symbols if present in mapred.child.java.opts value are - // replaced: - // + @taskid@ is interpolated with value of TaskID. - // Other occurrences of @ will not be altered. - // - // Example with multiple arguments and substitutions, showing - // jvm GC logging, and start of a passwordless JVM JMX agent so can - // connect with jconsole and the likes to watch child memory, threads - // and get thread dumps. - // - // - // mapred.child.java.opts - // -verbose:gc -Xloggc:/tmp/@taskid@.gc \ - // -Dcom.sun.management.jmxremote.authenticate=false \ - // -Dcom.sun.management.jmxremote.ssl=false \ - // - // - // - String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); - javaOpts = javaOpts.replace("@taskid@", taskid.toString()); - String [] javaOptsSplit = javaOpts.split(" "); - - // Add java.library.path; necessary for loading native libraries. - // - // 1. To support native-hadoop library i.e. libhadoop.so, we add the - // parent processes' java.library.path to the child. - // 2. We also add the 'cwd' of the task to it's java.library.path to help - // users distribute native libraries via the DistributedCache. - // 3. The user can also specify extra paths to be added to the - // java.library.path via mapred.child.java.opts. - // - String libraryPath = System.getProperty("java.library.path"); - if (libraryPath == null) { - libraryPath = workDir.getAbsolutePath(); - } else { - libraryPath += SYSTEM_PATH_SEPARATOR + workDir; - } - boolean hasUserLDPath = false; - for(int i=0; i classPaths = getClassPaths(conf, workDir, archives, files); - // Setup the log4j prop long logSize = TaskLog.getTaskLogLength(conf); - vargs.add("-Dhadoop.log.dir=" + - new File(System.getProperty("hadoop.log.dir") - ).getAbsolutePath()); - vargs.add("-Dhadoop.root.logger=INFO,TLA"); - vargs.add("-Dhadoop.tasklog.taskid=" + taskid); - vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); - - if (conf.getProfileEnabled()) { - if (conf.getProfileTaskRange(t.isMapTask() - ).isIncluded(t.getPartition())) { - File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE); - vargs.add(String.format(conf.getProfileParams(), prof.toString())); - } - } - // Add main class and its arguments - vargs.add(Child.class.getName()); // main of Child - // pass umbilical address - InetSocketAddress address = tracker.getTaskTrackerReportAddress(); - vargs.add(address.getAddress().getHostAddress()); - vargs.add(Integer.toString(address.getPort())); - vargs.add(taskid.toString()); // pass task identifier + // Build exec child JVM args. + Vector vargs = + getVMArgs(taskid, workDir, classPaths, logSize); tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf); // set memory limit using ulimit if feasible and necessary ... - String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf); - List setup = null; - if (ulimitCmd != null) { - setup = new ArrayList(); - for (String arg : ulimitCmd) { - setup.add(arg); - } - } + List setup = getVMSetupCmd(); // Set up the redirection of the task's stdout and stderr streams File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); - boolean b = stdout.getParentFile().mkdirs(); + File logDir = stdout.getParentFile(); + boolean b = logDir.mkdirs(); if (!b) { LOG.warn("mkdirs failed. Ignoring"); + } else { + // TODO: This is not needed once MAPREDUCE-AAAA + FileUtil.setPermissions(logDir, FileUtil.sevenSevenSeven); } tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr); Map env = new HashMap(); - StringBuffer ldLibraryPath = new StringBuffer(); - ldLibraryPath.append(workDir.toString()); - String oldLdLibraryPath = null; - oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH"); - if (oldLdLibraryPath != null) { - ldLibraryPath.append(SYSTEM_PATH_SEPARATOR); - ldLibraryPath.append(oldLdLibraryPath); - } - env.put("LD_LIBRARY_PATH", ldLibraryPath.toString()); - - // add the env variables passed by the user - String mapredChildEnv = conf.get("mapred.child.env"); - if (mapredChildEnv != null && mapredChildEnv.length() > 0) { - String childEnvs[] = mapredChildEnv.split(","); - for (String cEnv : childEnvs) { - try { - String[] parts = cEnv.split("="); // split on '=' - String value = env.get(parts[0]); - if (value != null) { - // replace $env with the child's env constructed by tt's - // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp - value = parts[1].replace("$" + parts[0], value); - } else { - // this key is not configured by the tt for the child .. get it - // from the tt's env - // example PATH=$PATH:/tmp - value = System.getenv(parts[0]); - if (value != null) { - // the env key is present in the tt's env - value = parts[1].replace("$" + parts[0], value); - } else { - // the env key is note present anywhere .. simply set it - // example X=$X:/tmp or X=/tmp - value = parts[1].replace("$" + parts[0], ""); - } - } - env.put(parts[0], value); - } catch (Throwable t) { - // set the error msg - errorInfo = "Invalid User environment settings : " + mapredChildEnv - + ". Failed to parse user-passed environment param." - + " Expecting : env1=value1,env2=value2..."; - LOG.warn(errorInfo); - throw t; - } - } - } + errorInfo = getVMEnvironment(errorInfo, workDir, conf, env); jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, @@ -355,7 +193,7 @@ abstract class TaskRunner extends Thread { LOG.fatal(t.getTaskID()+" reporting FSError", ie); } } catch (Throwable throwable) { - LOG.warn(t.getTaskID() + errorInfo, throwable); + LOG.warn(t.getTaskID() + " : " + errorInfo, throwable); Throwable causeThrowable = new Throwable(errorInfo, throwable); ByteArrayOutputStream baos = new ByteArrayOutputStream(); causeThrowable.printStackTrace(new PrintStream(baos)); @@ -385,15 +223,309 @@ abstract class TaskRunner extends Thread { } } + /** + * @param lDirAlloc + * @throws IOException + */ + void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc) + throws IOException { + + Path localTaskFile = + lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t + .getJobID().toString(), t.getTaskID().toString(), t + .isTaskCleanupTask()), conf); + + // setup the child's mapred-local-dir. This is done just before writing the + // task file to the disk. + setupChildMapredLocalDirs(t, conf); + + // write the child's task configuration file to the local disk + writeLocalTaskFile(localTaskFile.toString(), conf); + + // Set the final job file in the task. The child needs to know the correct + // path to job.xml. So set this path accordingly. + LOG.info(" $$$$$$$$$$$$ jobfile " + localTaskFile); + t.setJobFile(localTaskFile.toString()); + } + + /** + * @return + */ + private List getVMSetupCmd() { + String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf); + List setup = null; + if (ulimitCmd != null) { + setup = new ArrayList(); + for (String arg : ulimitCmd) { + setup.add(arg); + } + } + return setup; + } + + /** + * @param taskid + * @param workDir + * @param classPaths + * @param logSize + * @return + * @throws IOException + */ + private Vector getVMArgs(TaskAttemptID taskid, File workDir, + List classPaths, long logSize) + throws IOException { + Vector vargs = new Vector(8); + File jvm = // use same jvm as parent + new File(new File(System.getProperty("java.home"), "bin"), "java"); + + vargs.add(jvm.toString()); + + // Add child (task) java-vm options. + // + // The following symbols if present in mapred.child.java.opts value are + // replaced: + // + @taskid@ is interpolated with value of TaskID. + // Other occurrences of @ will not be altered. + // + // Example with multiple arguments and substitutions, showing + // jvm GC logging, and start of a passwordless JVM JMX agent so can + // connect with jconsole and the likes to watch child memory, threads + // and get thread dumps. + // + // + // mapred.child.java.opts + // -verbose:gc -Xloggc:/tmp/@taskid@.gc \ + // -Dcom.sun.management.jmxremote.authenticate=false \ + // -Dcom.sun.management.jmxremote.ssl=false \ + // + // + // + String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); + javaOpts = javaOpts.replace("@taskid@", taskid.toString()); + String [] javaOptsSplit = javaOpts.split(" "); + + // Add java.library.path; necessary for loading native libraries. + // + // 1. To support native-hadoop library i.e. libhadoop.so, we add the + // parent processes' java.library.path to the child. + // 2. We also add the 'cwd' of the task to it's java.library.path to help + // users distribute native libraries via the DistributedCache. + // 3. The user can also specify extra paths to be added to the + // java.library.path via mapred.child.java.opts. + // + String libraryPath = System.getProperty("java.library.path"); + if (libraryPath == null) { + libraryPath = workDir.getAbsolutePath(); + } else { + libraryPath += SYSTEM_PATH_SEPARATOR + workDir; + } + boolean hasUserLDPath = false; + for(int i=0; i getClassPaths(JobConf conf, File workDir, + URI[] archives, URI[] files) + throws IOException { + // Accumulates class paths for child. + List classPaths = new ArrayList(); + // start with same classpath as parent process + appendSystemClasspaths(classPaths); + + // include the user specified classpath + appendJobJarClasspaths(conf.getJar(), classPaths); + + // Distributed cache paths + appendDistributedCacheClasspaths(conf, archives, files, classPaths); + + // Include the working dir too + classPaths.add(workDir.toString()); + return classPaths; + } + + /** + * @param errorInfo + * @param workDir + * @param env + * @return + * @throws Throwable + */ + private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf, + Map env) + throws Throwable { + StringBuffer ldLibraryPath = new StringBuffer(); + ldLibraryPath.append(workDir.toString()); + String oldLdLibraryPath = null; + oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH"); + if (oldLdLibraryPath != null) { + ldLibraryPath.append(SYSTEM_PATH_SEPARATOR); + ldLibraryPath.append(oldLdLibraryPath); + } + env.put("LD_LIBRARY_PATH", ldLibraryPath.toString()); + + // add the env variables passed by the user + String mapredChildEnv = conf.get("mapred.child.env"); + if (mapredChildEnv != null && mapredChildEnv.length() > 0) { + String childEnvs[] = mapredChildEnv.split(","); + for (String cEnv : childEnvs) { + try { + String[] parts = cEnv.split("="); // split on '=' + String value = env.get(parts[0]); + if (value != null) { + // replace $env with the child's env constructed by tt's + // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp + value = parts[1].replace("$" + parts[0], value); + } else { + // this key is not configured by the tt for the child .. get it + // from the tt's env + // example PATH=$PATH:/tmp + value = System.getenv(parts[0]); + if (value != null) { + // the env key is present in the tt's env + value = parts[1].replace("$" + parts[0], value); + } else { + // the env key is note present anywhere .. simply set it + // example X=$X:/tmp or X=/tmp + value = parts[1].replace("$" + parts[0], ""); + } + } + env.put(parts[0], value); + } catch (Throwable t) { + // set the error msg + errorInfo = "Invalid User environment settings : " + mapredChildEnv + + ". Failed to parse user-passed environment param." + + " Expecting : env1=value1,env2=value2..."; + LOG.warn(errorInfo); + throw t; + } + } + } + return errorInfo; + } + + /** + * Write the task specific job-configuration file. + * + * @param localFs + * @throws IOException + */ + private static void writeLocalTaskFile(String jobFile, JobConf conf) + throws IOException { + Path localTaskFile = new Path(jobFile); + FileSystem localFs = FileSystem.getLocal(conf); + localFs.delete(localTaskFile, true); + OutputStream out = localFs.create(localTaskFile); + try { + conf.writeXml(out); + } finally { + out.close(); + } + } + + /** + * Prepare the mapred.local.dir for the child. The child is sand-boxed now. + * Whenever it uses LocalDirAllocator from now on inside the child, it will + * only see files inside the attempt-directory. + */ + static void setupChildMapredLocalDirs(Task t, JobConf conf) { + String[] localDirs = conf.getStrings("mapred.local.dir"); + String jobId = t.getJobID().toString(); + String taskId = t.getTaskID().toString(); + boolean isCleanup = t.isTaskCleanupTask(); + // TODO: check localDirs length + String childMapredLocalDir = + localDirs[0] + Path.SEPARATOR + + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup); + for (int i = 1; i < localDirs.length; i++) { + childMapredLocalDir += + "," + localDirs[i] + Path.SEPARATOR + + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup); + } + LOG.info("mapred.local.dir for child : " + childMapredLocalDir); + conf.set("mapred.local.dir", childMapredLocalDir); + } + /** Creates the working directory pathname for a task attempt. */ static File formWorkDir(LocalDirAllocator lDirAlloc, TaskAttemptID task, boolean isCleanup, JobConf conf) throws IOException { - File workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalTaskDir(task.getJobID().toString(), - task.toString(), isCleanup) - + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString()); - return workDir; + Path workDir = + lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task + .getJobID().toString(), task.toString(), isCleanup), conf); + + return new File(workDir.toString()); } private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir, @@ -412,7 +544,7 @@ abstract class TaskRunner extends Thread { fileStatus = fileSystem.getFileStatus( new Path(archives[i].getPath())); String cacheId = DistributedCache.makeRelative(archives[i],conf); - String cachePath = TaskTracker.getCacheSubdir() + + String cachePath = TaskTracker.getDistributedCacheDir() + Path.SEPARATOR + cacheId; localPath = lDirAlloc.getLocalPathForWrite(cachePath, @@ -438,7 +570,7 @@ abstract class TaskRunner extends Thread { fileStatus = fileSystem.getFileStatus( new Path(files[i].getPath())); String cacheId = DistributedCache.makeRelative(files[i], conf); - String cachePath = TaskTracker.getCacheSubdir() + + String cachePath = TaskTracker.getDistributedCacheDir() + Path.SEPARATOR + cacheId; localPath = lDirAlloc.getLocalPathForWrite(cachePath, @@ -455,20 +587,12 @@ abstract class TaskRunner extends Thread { } DistributedCache.setLocalFiles(conf, stringifyPathArray(p)); } - Path localTaskFile = new Path(t.getJobFile()); - FileSystem localFs = FileSystem.getLocal(conf); - localFs.delete(localTaskFile, true); - OutputStream out = localFs.create(localTaskFile); - try { - conf.writeXml(out); - } finally { - out.close(); - } } } - private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives, - URI[] files, List classPaths) throws IOException { + private static void appendDistributedCacheClasspaths(JobConf conf, + URI[] archives, URI[] files, List classPaths) + throws IOException { // Archive paths Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf); if (archiveClasspaths != null && archives != null) { @@ -503,8 +627,9 @@ abstract class TaskRunner extends Thread { } } - private void appendSystemClasspaths(List classPaths) { - for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) { + private static void appendSystemClasspaths(List classPaths) { + for (String c : System.getProperty("java.class.path").split( + SYSTEM_PATH_SEPARATOR)) { classPaths.add(c); } } @@ -539,6 +664,7 @@ abstract class TaskRunner extends Thread { //cache, we didn't create the symlinks. This is done on a per task basis //by the currently executing task. public static void setupWorkDir(JobConf conf) throws IOException { + // TODO: why do it for the first task File workDir = new File(".").getAbsoluteFile(); FileUtil.fullyDelete(workDir); if (DistributedCache.getSymlink(conf)) { @@ -586,19 +712,8 @@ abstract class TaskRunner extends Thread { // Do not exit even if symlinks have not been created. LOG.warn(StringUtils.stringifyException(ie)); } - // add java.io.tmpdir given by mapred.child.tmp - String tmp = conf.get("mapred.child.tmp", "./tmp"); - Path tmpDir = new Path(tmp); - // if temp directory path is not absolute - // prepend it with workDir. - if (!tmpDir.isAbsolute()) { - tmpDir = new Path(workDir.toString(), tmp); - FileSystem localFs = FileSystem.getLocal(conf); - if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){ - throw new IOException("Mkdirs failed to create " + tmpDir.toString()); - } - } + createChildTmpDir(workDir, conf); } /** diff --git src/java/org/apache/hadoop/mapred/TaskTracker.java src/java/org/apache/hadoop/mapred/TaskTracker.java index a044326..cad6b17 100644 --- src/java/org/apache/hadoop/mapred/TaskTracker.java +++ src/java/org/apache/hadoop/mapred/TaskTracker.java @@ -65,6 +65,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.mapred.TaskController.InitializeJobContext; import org.apache.hadoop.mapred.TaskStatus.Phase; import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus; import org.apache.hadoop.mapred.pipes.Submitter; @@ -185,10 +186,16 @@ public class TaskTracker //for serving map output to the other nodes static Random r = new Random(); - private static final String SUBDIR = "taskTracker"; - private static final String CACHEDIR = "archive"; - private static final String JOBCACHE = "jobcache"; - private static final String OUTPUT = "output"; + static final String SUBDIR = "taskTracker"; + private static final String DISTCACHEDIR = "distcache"; + static final String JOBCACHE = "jobcache"; + static final String OUTPUT = "output"; + private static final String JARSDIR = "jars"; + static final String LOCAL_SPLIT_FILE = "split.dta"; + static final String JOBFILE = "job.xml"; + + static final String JOB_LOCAL_DIR = "job.local.dir"; + private JobConf fConf; private FileSystem localFs; private int maxMapSlots; @@ -376,25 +383,49 @@ public class TaskTracker } } - static String getCacheSubdir() { - return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR; + static String getDistributedCacheDir() { + return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR; } static String getJobCacheSubdir() { return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE; } - + static String getLocalJobDir(String jobid) { - return getJobCacheSubdir() + Path.SEPARATOR + jobid; + return getJobCacheSubdir() + Path.SEPARATOR + jobid; } - static String getLocalTaskDir(String jobid, String taskid) { - return getLocalTaskDir(jobid, taskid, false) ; + static String getLocalJobConfFile(String jobid) { + return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE; + } + + static String getTaskConfFile(String jobid, String taskid, + boolean isCleanupAttempt) { + return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR + + TaskTracker.JOBFILE; + } + + static String getJobJarFile(String jobid) { + return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR + + Path.SEPARATOR + "job.jar"; + } + + static String getJobWorkDir(String jobid) { + return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR; + } + + static String getLocalSplitFile(String jobid, String taskid) { + return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR + + TaskTracker.LOCAL_SPLIT_FILE; } static String getIntermediateOutputDir(String jobid, String taskid) { - return getLocalTaskDir(jobid, taskid) - + Path.SEPARATOR + TaskTracker.OUTPUT ; + return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR + + TaskTracker.OUTPUT; + } + + static String getLocalTaskDir(String jobid, String taskid) { + return getLocalTaskDir(jobid, taskid, false); } static String getLocalTaskDir(String jobid, @@ -406,7 +437,17 @@ public class TaskTracker } return taskDir; } - + + static String getTaskWorkDir(String jobid, String taskid, + boolean isCleanupAttempt) { + String dir = + getLocalJobDir(jobid) + Path.SEPARATOR + taskid; + if (isCleanupAttempt) { + dir = dir + TASK_CLEANUP_SUFFIX; + } + return dir + Path.SEPARATOR + MRConstants.WORKDIR; + } + String getPid(TaskAttemptID tid) { TaskInProgress tip = tasks.get(tid); if (tip != null) { @@ -748,10 +789,133 @@ public class TaskTracker // intialize the job directory private void localizeJob(TaskInProgress tip) throws IOException { - Path localJarFile = null; Task t = tip.getTask(); JobID jobId = t.getJobID(); - Path jobFile = new Path(t.getJobFile()); + RunningJob rjob = addTaskToJob(jobId, tip); + + synchronized (rjob) { + if (!rjob.localized) { + + JobConf localJobConf = doJobLocalization(t); + + // Now initialize the job via task-controller so as to set + // ownership/permissions of jars, job-work-dir + InitializeJobContext context = new InitializeJobContext(); + context.jobid = jobId; + context.user = localJobConf.getUser(); + context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR)); // TODO: checks + taskController.initializeJob(context); + + rjob.jobConf = localJobConf; + rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || + localJobConf.getKeepFailedTaskFiles()); + rjob.localized = true; + } + } + launchTaskForJob(tip, new JobConf(rjob.jobConf)); + } + + /** + * Localize the job on this tasktracker. Specifically + *
    + *
  • Cleanup and create job directories on all disks
  • + *
  • Download the job config file job.xml from the DFS
  • + *
  • Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR} + * in the configuration. + *
  • Download the job jar file job.jar from the DFS, unjar it and set jar + * file in the configuration.
  • + *
+ * + * @param t task whose job has to be localized on this TT + * @return the modified job configuration to be used for all the tasks of this + * job as a starting point. + * @throws IOException + */ + JobConf doJobLocalization(Task t) + throws IOException { + JobID jobId = t.getJobID(); + + // Initialize the job directories first + FileSystem localFs = FileSystem.getLocal(fConf); + initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir")); + + // Download the job.xml for this job from the system FS + Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId); + + // TODO: Fix this. TT config is not loaded at all! + JobConf localJobConf = new JobConf(localJobFile); + + // create the 'job-work' directory: job-specific shared directory for use as + // scratch space by all tasks of the same job running on this TaskTracker. + Path workDir = + lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()), + fConf); + if (!localFs.mkdirs(workDir)) { + throw new IOException("Mkdirs failed to create " + + workDir.toString()); + } + System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath()); + localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath()); + + // Download the job.jar for this job from the system FS + localizeJobJarFile(jobId, localFs, localJobConf); + return localJobConf; + } + + /** + * Prepare the job directories for a given job. To be called by the job + * localization code, only if the job is not already localized. + * + * @param jobId + * @param fs + * @param localDirs + * @throws IOException + */ + private static void initializeJobDirs(JobID jobId, FileSystem fs, + String[] localDirs) + throws IOException { + boolean initJobDirStatus = false; + String jobDirPath = getLocalJobDir(jobId.toString()); + for (String localDir : localDirs) { + Path jobDir = new Path(localDir, jobDirPath); + if (fs.exists(jobDir)) { + // this will happen on a partial execution of localizeJob. Sometimes + // copying job.xml to the local disk succeeds but copying job.jar might + // throw out an exception. We should clean up and then try again. + fs.delete(jobDir, true); + } + + boolean jobDirStatus = fs.mkdirs(jobDir); + if (!jobDirStatus) { + LOG.warn("Not able to create job directory " + jobDir.toString()); + } + + initJobDirStatus = initJobDirStatus || jobDirStatus; + + // TODO: fix return status for the following. + // TODO: is this really needed? Try to fix/use RawLocalFileSystem.mkdirs + // job-dir has to be private to the TT + FileUtil.setPermissions(new File(jobDir.toUri().getPath()), + FileUtil.sevenZeroZero); + } + + if (!initJobDirStatus) { + throw new IOException("Not able to initialize job directories " + + "in any of the configured local directories for job " + + jobId.toString()); + } + } + + /** + * Download the job configuration file from the DFS. + * + * @param t Task whose job file has to be downloaded + * @param jobId jobid of the task + * @return the local file system path of the downloaded file. + * @throws IOException + */ + private Path localizeJobConfFile(Path jobFile, JobID jobId) + throws IOException { // Get sizes of JobFile and JarFile // sizes are -1 if they are not present. FileStatus status = null; @@ -762,82 +926,102 @@ public class TaskTracker } catch(FileNotFoundException fe) { jobFileSize = -1; } - Path localJobFile = lDirAlloc.getLocalPathForWrite( - getLocalJobDir(jobId.toString()) - + Path.SEPARATOR + "job.xml", - jobFileSize, fConf); - RunningJob rjob = addTaskToJob(jobId, tip); - synchronized (rjob) { - if (!rjob.localized) { - - FileSystem localFs = FileSystem.getLocal(fConf); - // this will happen on a partial execution of localizeJob. - // Sometimes the job.xml gets copied but copying job.jar - // might throw out an exception - // we should clean up and then try again - Path jobDir = localJobFile.getParent(); - if (localFs.exists(jobDir)){ - localFs.delete(jobDir, true); - boolean b = localFs.mkdirs(jobDir); - if (!b) - throw new IOException("Not able to create job directory " - + jobDir.toString()); - } - systemFS.copyToLocalFile(jobFile, localJobFile); - JobConf localJobConf = new JobConf(localJobFile); - - // create the 'work' directory - // job-specific shared directory for use as scratch space - Path workDir = lDirAlloc.getLocalPathForWrite( - (getLocalJobDir(jobId.toString()) - + Path.SEPARATOR + MRConstants.WORKDIR), fConf); - if (!localFs.mkdirs(workDir)) { - throw new IOException("Mkdirs failed to create " - + workDir.toString()); - } - System.setProperty("job.local.dir", workDir.toString()); - localJobConf.set("job.local.dir", workDir.toString()); - - // copy Jar file to the local FS and unjar it. - String jarFile = localJobConf.getJar(); - long jarFileSize = -1; - if (jarFile != null) { - Path jarFilePath = new Path(jarFile); - try { - status = systemFS.getFileStatus(jarFilePath); - jarFileSize = status.getLen(); - } catch(FileNotFoundException fe) { - jarFileSize = -1; - } - // Here we check for and we check five times the size of jarFileSize - // to accommodate for unjarring the jar file in work directory - localJarFile = new Path(lDirAlloc.getLocalPathForWrite( - getLocalJobDir(jobId.toString()) - + Path.SEPARATOR + "jars", - 5 * jarFileSize, fConf), "job.jar"); - if (!localFs.mkdirs(localJarFile.getParent())) { - throw new IOException("Mkdirs failed to create jars directory "); - } - systemFS.copyToLocalFile(jarFilePath, localJarFile); - localJobConf.setJar(localJarFile.toString()); - OutputStream out = localFs.create(localJobFile); - try { - localJobConf.writeXml(out); - } finally { - out.close(); - } - // also unjar the job.jar files - RunJar.unJar(new File(localJarFile.toString()), - new File(localJarFile.getParent().toString())); - } - rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || - localJobConf.getKeepFailedTaskFiles()); - rjob.localized = true; - rjob.jobConf = localJobConf; - taskController.initializeJob(jobId); + + Path localJobFile = + lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()), + jobFileSize, fConf); + + // Download job.xml + systemFS.copyToLocalFile(jobFile, localJobFile); + return localJobFile; + } + + /** + * Download the job jar file from DFS to the local file system and unjar it. + * Set the local jar file in the passed configuration. + * + * @param jobId + * @param localFs + * @param localJobConf + * @throws IOException + */ + private void localizeJobJarFile(JobID jobId, FileSystem localFs, + JobConf localJobConf) + throws IOException { + // copy Jar file to the local FS and unjar it. + String jarFile = localJobConf.getJar(); + FileStatus status = null; + long jarFileSize = -1; + if (jarFile != null) { + Path jarFilePath = new Path(jarFile); + try { + status = systemFS.getFileStatus(jarFilePath); + jarFileSize = status.getLen(); + } catch (FileNotFoundException fe) { + jarFileSize = -1; } + // Here we check for and we check five times the size of jarFileSize + // to accommodate for unjarring the jar file in userfiles directory + Path localJarFile = + lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()), + 5 * jarFileSize, fConf); + + // Download job.jar + systemFS.copyToLocalFile(jarFilePath, localJarFile); + + localJobConf.setJar(localJarFile.toString()); + + // Also un-jar the job.jar files. We un-jar it so that classes inside + // sub-directories, for e.g., lib/, classes/ are available on class-path + RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile + .getParent().toString())); + } + } + + /** + * create taskDirs on all the disks. Otherwise, in some cases, like when + * LinuxTaskController is in use, child might wish to balance load across + * disks but cannot itself create attempt directory because of the fact that + * job directory is writable only by the TT. + * + * @param jobId + * @param attemptId + * @param isCleanupAttempt + * @param fs + * @param localDirs + * @throws IOException + */ + private static void initializeAttemptDirs(String jobId, String attemptId, + boolean isCleanupAttempt, FileSystem fs, String[] localDirs) + throws IOException { + + boolean initStatus = false; + String attemptDirPath = + getLocalTaskDir(jobId, attemptId, isCleanupAttempt); + + for (String localDir : localDirs) { + + Path localAttemptDir = new Path(localDir, attemptDirPath); + + boolean attemptDirStatus = fs.mkdirs(localAttemptDir); + if (!attemptDirStatus) { + LOG.warn("localAttemptDir " + localAttemptDir.toString() + + " couldn't be created."); + } + + // job dir is not private, set private permissions for attempt-dir + FileUtil.setPermissions(new File(localAttemptDir.toUri().getPath()), + FileUtil.sevenZeroZero); + // TODO: fix return code of the above. + + initStatus = initStatus || attemptDirStatus; + } + + if (!initStatus) { + throw new IOException("Not able to initialize attempt directories " + + "in any of the configured local directories for the attempt " + + attemptId.toString()); } - launchTaskForJob(tip, new JobConf(rjob.jobConf)); } private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ @@ -876,7 +1060,7 @@ public class TaskTracker for (TaskInProgress tip : tasksToClose.values()) { tip.jobHasFinished(false); } - + this.running = false; // Clear local storage @@ -915,6 +1099,17 @@ public class TaskTracker } /** + * For testing + */ + TaskTracker() { + server = null; + } + + void setConf(JobConf conf) { + fConf = conf; + } + + /** * Start with the local machine name, and the default JobTracker */ public TaskTracker(JobConf conf) throws IOException { @@ -1557,7 +1752,7 @@ public class TaskTracker mapOutputFile.setJobId(taskId.getJobID()); mapOutputFile.setConf(conf); - Path tmp_output = mapOutputFile.getOutputFile(taskId); + Path tmp_output = mapOutputFile.getOutputFile(); if(tmp_output == null) return 0; FileSystem localFS = FileSystem.getLocal(conf); @@ -1833,54 +2028,36 @@ public class TaskTracker taskTimeout = (10 * 60 * 1000); } - private void localizeTask(Task task) throws IOException{ + void localizeTask(Task task) throws IOException{ - Path localTaskDir = - lDirAlloc.getLocalPathForWrite( - TaskTracker.getLocalTaskDir(task.getJobID().toString(), - task.getTaskID().toString(), task.isTaskCleanupTask()), - defaultJobConf ); - FileSystem localFs = FileSystem.getLocal(fConf); - if (!localFs.mkdirs(localTaskDir)) { - throw new IOException("Mkdirs failed to create " - + localTaskDir.toString()); - } - - // create symlink for ../work if it already doesnt exist - String workDir = lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalJobDir(task.getJobID().toString()) - + Path.SEPARATOR - + "work", defaultJobConf).toString(); - String link = localTaskDir.getParent().toString() - + Path.SEPARATOR + "work"; - File flink = new File(link); - if (!flink.exists()) - FileUtil.symLink(workDir, link); - + + // create taskDirs on all the disks. + initializeAttemptDirs(task.getJobID().toString(), task.getTaskID() + .toString(), task.isTaskCleanupTask(), localFs, fConf + .getStrings("mapred.local.dir")); + // create the working-directory of the task - Path cwd = lDirAlloc.getLocalPathForWrite( - getLocalTaskDir(task.getJobID().toString(), - task.getTaskID().toString(), task.isTaskCleanupTask()) - + Path.SEPARATOR + MRConstants.WORKDIR, - defaultJobConf); + Path cwd = + lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID() + .toString(), task.getTaskID().toString(), task + .isTaskCleanupTask()), defaultJobConf); if (!localFs.mkdirs(cwd)) { throw new IOException("Mkdirs failed to create " + cwd.toString()); } - Path localTaskFile = new Path(localTaskDir, "job.xml"); - task.setJobFile(localTaskFile.toString()); localJobConf.set("mapred.local.dir", fConf.get("mapred.local.dir")); + if (fConf.get("slave.host.name") != null) { localJobConf.set("slave.host.name", fConf.get("slave.host.name")); } - localJobConf.set("mapred.task.id", task.getTaskID().toString()); keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); + // Do the task-type specific localization task.localizeConfiguration(localJobConf); List staticResolutions = NetUtils.getAllStaticResolutions(); @@ -1913,12 +2090,6 @@ public class TaskTracker //disable jvm reuse localJobConf.setNumTasksToExecutePerJvm(1); } - OutputStream out = localFs.create(localTaskFile); - try { - localJobConf.writeXml(out); - } finally { - out.close(); - } task.setConf(localJobConf); } @@ -2165,16 +2336,13 @@ public class TaskTracker } File workDir = null; try { - workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalTaskDir( - task.getJobID().toString(), - task.getTaskID().toString(), - task.isTaskCleanupTask()) - + Path.SEPARATOR + MRConstants.WORKDIR, - localJobConf). toString()); + // localJobConf is already sandboxed. + workDir = + new File(lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, + localJobConf).toString()); } catch (IOException e) { LOG.warn("Working Directory of the task " + task.getTaskID() + - "doesnt exist. Caught exception " + + " doesnt exist. Caught exception " + StringUtils.stringifyException(e)); } // Build the command @@ -2449,34 +2617,42 @@ public class TaskTracker if (localJobConf == null) { return; } - String taskDir = getLocalTaskDir(task.getJobID().toString(), - taskId.toString(), task.isTaskCleanupTask()); + String localTaskDir = + getLocalTaskDir(task.getJobID().toString(), taskId.toString(), + task.isTaskCleanupTask()); + String taskWorkDir = + getTaskWorkDir(task.getJobID().toString(), taskId.toString(), + task.isTaskCleanupTask()); if (needCleanup) { if (runner != null) { //cleans up the output directory of the task (where map outputs //and reduce inputs get stored) runner.close(); } - //We don't delete the workdir - //since some other task (running in the same JVM) - //might be using the dir. The JVM running the tasks would clean - //the workdir per a task in the task process itself. + if (localJobConf.getNumTasksToExecutePerJvm() == 1) { + // No jvm reuse, remove everything directoryCleanupThread.addToQueue(localFs, getLocalFiles(defaultJobConf, - taskDir)); + localTaskDir)); } - else { - directoryCleanupThread.addToQueue(localFs, - getLocalFiles(defaultJobConf, - taskDir+"/job.xml")); + // Jvm reuse. We don't delete the workdir since some other task + // (running in the same JVM) might be using the dir. The JVM + // running the tasks would clean the workdir per a task in the + // task process itself. + directoryCleanupThread.addToQueue(localFs, getLocalFiles( + defaultJobConf, localTaskDir + Path.SEPARATOR + + TaskTracker.JOBFILE)); + // TODO: fix + directoryCleanupThread.addToQueue(localFs, getLocalFiles( + defaultJobConf, localTaskDir + "/taskjvm.sh")); } } else { if (localJobConf.getNumTasksToExecutePerJvm() == 1) { directoryCleanupThread.addToQueue(localFs, getLocalFiles(defaultJobConf, - taskDir+"/work")); + taskWorkDir)); } } } catch (Throwable ie) { diff --git src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java index 521c758..07f84b2 100644 --- src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java +++ src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java @@ -97,7 +97,7 @@ public class ClusterWithLinuxTaskController extends TestCase { MyLinuxTaskController.class.getName()); mrCluster = new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri() - .toString(), 1, null, null, conf); + .toString(), 4, null, null, conf); // Get the configured taskcontroller-path String path = System.getProperty("taskcontroller-path"); @@ -143,8 +143,18 @@ public class ClusterWithLinuxTaskController extends TestCase { PrintWriter writer = new PrintWriter(new FileOutputStream(configurationFile)); - writer.println(String.format("mapred.local.dir=%s", mrCluster - .getTaskTrackerLocalDir(0))); + StringBuffer sb = new StringBuffer(); + String[] localDirs = mrCluster.getTaskTrackerRunner(0).getLocalDirs(); + for(int i = 0 ; i < localDirs.length; i++) { + sb.append(localDirs[i]); + if((i + 1) != localDirs.length) { + sb.append(","); + } + } + writer.println(String.format("mapred.local.dir=%s", sb.toString())); + + writer + .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir())); writer.flush(); writer.close(); @@ -156,7 +166,11 @@ public class ClusterWithLinuxTaskController extends TestCase { * @return boolean */ protected boolean shouldRun() { - return isTaskExecPathPassed() && isUserPassed(); + if (!isTaskExecPathPassed() || !isUserPassed()) { + LOG.info("Invalid taskcontroller-path or taskcontroller-user!" + + " Not running test."); + } + return true; } private boolean isTaskExecPathPassed() { diff --git src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java index 5c26fd1..5f2c49f 100644 --- src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java +++ src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java @@ -171,7 +171,7 @@ public class MiniMRCluster { StringBuffer localPath = new StringBuffer(); for(int i=0; i < numDir; ++i) { File ttDir = new File(localDirBase, - Integer.toString(trackerId) + "_" + 0); + Integer.toString(trackerId) + "_" + i); if (!ttDir.mkdirs()) { if (!ttDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + ttDir); diff --git src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java index 3bf0bc5..9ee549c 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java +++ src/test/mapred/org/apache/hadoop/mapred/TestIsolationRunner.java @@ -26,6 +26,7 @@ import junit.framework.TestCase; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.lib.IdentityMapper; @@ -96,23 +97,20 @@ public class TestIsolationRunner extends TestCase { }); return files.length; } - + private Path getAttemptJobXml(JobConf conf, JobID jobId, TaskType taskType) throws IOException { - String[] localDirs = conf.getLocalDirs(); - assertEquals(1, localDirs.length); - Path jobCacheDir = new Path(localDirs[0], "0_0" + Path.SEPARATOR + - "taskTracker" + Path.SEPARATOR + "jobcache" + Path.SEPARATOR + jobId); - Path attemptDir = new Path(jobCacheDir, - new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString()); - return new Path(attemptDir, "job.xml"); + String taskid = + new TaskAttemptID(new TaskID(jobId, taskType, 0), 0).toString(); + return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead( + TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf); } public void testIsolationRunOfMapTask() throws IOException, InterruptedException, ClassNotFoundException { MiniMRCluster mr = null; try { - mr = new MiniMRCluster(1, "file:///", 1); + mr = new MiniMRCluster(1, "file:///", 4); // Run a job succesfully; keep task files. JobConf conf = mr.createJobConf(); @@ -130,8 +128,10 @@ public class TestIsolationRunner extends TestCase { // Retrieve succesful job's configuration and // run IsolationRunner against the map task. FileSystem localFs = FileSystem.getLocal(conf); - Path mapJobXml = getAttemptJobXml(conf, jobId, - TaskType.MAP).makeQualified(localFs); + Path mapJobXml = + getAttemptJobXml( + mr.getTaskTrackerRunner(0).getTaskTracker().getJobConf(), jobId, + TaskType.MAP).makeQualified(localFs); assertTrue(localFs.exists(mapJobXml)); new IsolationRunner().run(new String[] { diff --git src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java index f923f88..ab678d5 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java +++ src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java @@ -20,8 +20,10 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; /** * Test a java-based mapred job with LinuxTaskController running the jobs as a @@ -39,10 +41,33 @@ public class TestJobExecutionAsDifferentUser extends startCluster(); Path inDir = new Path("input"); Path outDir = new Path("output"); - RunningJob job = - UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir); + + RunningJob job; + + // Run a job with zero maps/reduces + job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); + + // Run a job with 1 map and zero reduces + job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0); + job.waitForCompletion(); assertTrue("Job failed", job.isSuccessful()); assertOwnerShip(outDir); + + // Run a normal job with maps/reduces + job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); + + // Run a job with jvm reuse + JobConf myConf = getClusterConf(); + myConf.set("mapred.job.reuse.jvm.num.tasks", "-1"); + String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" }; + assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args)); + // TODO: check reuse } public void testEnvironment() throws IOException { diff --git src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java index 665dc33..2f70803 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java +++ src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java @@ -289,7 +289,7 @@ public class TestMapRed extends TestCase { first = false; MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID()); mapOutputFile.setConf(conf); - Path input = mapOutputFile.getInputFile(0, taskId); + Path input = mapOutputFile.getInputFile(0); FileSystem fs = FileSystem.get(conf); assertTrue("reduce input exists " + input, fs.exists(input)); SequenceFile.Reader rdr = diff --git src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java index 1dac0ac..f14234c 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java +++ src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java @@ -135,7 +135,7 @@ public class TestMiniMRWithDFS extends TestCase { int numNotDel = 0; File localDir = new File(mr.getTaskTrackerLocalDir(i)); LOG.debug("Tracker directory: " + localDir); - File trackerDir = new File(localDir, "taskTracker"); + File trackerDir = new File(localDir, TaskTracker.SUBDIR); assertTrue("local dir " + localDir + " does not exist.", localDir.isDirectory()); assertTrue("task tracker dir " + trackerDir + " does not exist.", @@ -150,7 +150,7 @@ public class TestMiniMRWithDFS extends TestCase { } for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) { String name = contents[fileIdx]; - if (!("taskTracker".equals(contents[fileIdx]))) { + if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) { LOG.debug("Looking at " + name); assertTrue("Spurious directory " + name + " found in " + localDir, false); @@ -158,7 +158,7 @@ public class TestMiniMRWithDFS extends TestCase { } for (int idx = 0; idx < neededDirs.size(); ++idx) { String name = neededDirs.get(idx); - if (new File(new File(new File(trackerDir, "jobcache"), + if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE), jobIds[idx]), name).isDirectory()) { found[idx] = true; numNotDel++; diff --git src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java new file mode 100644 index 0000000..12f41f7 --- /dev/null +++ src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.jar.JarOutputStream; +import java.util.zip.ZipEntry; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; +import org.apache.hadoop.mapreduce.TaskType; + +import junit.framework.TestCase; + +/** + * Test to verify localization of a job and localization of a task on a + * TaskTracker. + * + */ +public class TestTaskTrackerLocalization extends TestCase { + + File TEST_ROOT_DIR; + + @Override + protected void setUp() + throws Exception { + TEST_ROOT_DIR = + new File(System.getProperty("test.build.data", "/tmp"), + "testJobLocalization"); + if (!TEST_ROOT_DIR.exists()) { + TEST_ROOT_DIR.mkdirs(); + } + } + + @Override + protected void tearDown() + throws Exception { + FileUtil.fullyDelete(TEST_ROOT_DIR); + } + + /** + * Test job localization on a TT. Tests localization of job.xml, job.jar and + * corresponding setting of configuration. + * + * @throws IOException + */ + public void testJobLocalization() + throws IOException { + JobConf conf = new JobConf(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + conf.setStrings("mapred.local.dir", new File(TEST_ROOT_DIR, "local_0") + .getPath(), new File(TEST_ROOT_DIR, "local_1").getPath()); + + // Create the job jar file + File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar"); + JarOutputStream jstream = + new JarOutputStream(new FileOutputStream(jobJarFile)); + ZipEntry ze = new ZipEntry("lib/lib1.jar"); + jstream.putNextEntry(ze); + jstream.closeEntry(); + ze = new ZipEntry("lib/lib2.jar"); + jstream.putNextEntry(ze); + jstream.closeEntry(); + jstream.finish(); + jstream.close(); + conf.setJar(jobJarFile.toURI().toString()); + + // Create the job configuration file + File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml"); + FileOutputStream out = new FileOutputStream(jobConfFile); + conf.writeXml(out); + out.close(); + + // Set up the TaskTracker + TaskTracker tracker = new TaskTracker(); + tracker.setConf(conf); + tracker.systemFS = FileSystem.getLocal(conf); // for test case + + // Set up the task to be localized + String jtIdentifier = "200907202331"; + JobID jobId = new JobID(jtIdentifier, 1); + TaskAttemptID taskId = + new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 1); + Task t = + new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1); + + // /////////// The main method being tested + JobConf localizedJobConf = tracker.doJobLocalization(t); + // /////////// + + // Check the directory structure + for (String dir : conf.getStrings("mapred.local.dir")) { + File localDir = new File(dir); + assertTrue("mapred.local.dir " + localDir + " isn't created!", localDir + .exists()); + File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR); + assertTrue("taskTracker sub-dir in the local-dir " + localDir + + "is not created!", taskTrackerSubDir.exists()); + File systemDir = new File(taskTrackerSubDir, TaskTracker.JOBCACHE); + assertTrue("system dir in the taskTrackerSubdir " + taskTrackerSubDir + + " isn't created!", systemDir.exists()); + File systemJobDir = new File(systemDir, jobId.toString()); + assertTrue("job-dir in system dir " + systemDir + " isn't created!", + systemJobDir.exists()); + } + + // check the localization of job.xml + LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); + + assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc + .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()), + conf) != null); + + // check the localization of job.jar + Path jarFileLocalized = + lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId + .toString()), conf); + assertTrue("job.jar is not localized on this TaskTracker!!", + jarFileLocalized != null); + assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File( + jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib1.jar") + .exists()); + assertTrue("lib/lib2.jar is not unjarred on this TaskTracker!!", new File( + jarFileLocalized.getParent() + Path.SEPARATOR + "lib/lib2.jar") + .exists()); + + // check the creation of job work directory + assertTrue("job-work dir is not created on this TaskTracker!!", + lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId + .toString()), conf) != null); + + // Check the setting of job.local.dir and job.jar which will eventually be + // used by the user's task + boolean jobLocalDirFlag = false, mapredJarFlag = false; + String localizedJobLocalDir = + localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR); + String localizedJobJar = localizedJobConf.getJar(); + for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) { + if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR + + TaskTracker.getJobWorkDir(jobId.toString()))) { + jobLocalDirFlag = true; + } + if (localizedJobJar.equals(localDir + Path.SEPARATOR + + TaskTracker.getJobJarFile(jobId.toString()))) { + mapredJarFlag = true; + } + } + assertTrue(TaskTracker.JOB_LOCAL_DIR + + " is not set properly to the target users directory : " + + localizedJobLocalDir, jobLocalDirFlag); + assertTrue( + "mapred.jar is not set properly to the target users directory : " + + localizedJobJar, mapredJarFlag); + } + + /** + * Test task localization on a TT. + * + * @throws IOException + */ + public void testTaskLocalization() + throws IOException { + JobConf conf = new JobConf(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + + conf.setStrings("mapred.local.dir", new File(TEST_ROOT_DIR, "local_0") + .getPath(), new File(TEST_ROOT_DIR, "local_1").getPath()); + + // Create the job configuration file + File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml"); + FileOutputStream out = new FileOutputStream(jobConfFile); + conf.writeXml(out); + out.close(); + + // Set up the TaskTracker + TaskTracker tracker = new TaskTracker(); + tracker.setConf(conf); + tracker.systemFS = FileSystem.getLocal(conf); // for test case + + // Set up the task to be localized + String jtIdentifier = "200907202331"; + JobID jobId = new JobID(jtIdentifier, 1); + TaskAttemptID taskId = + new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0); + Task task = + new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1); + + JobConf localizedJobConf = tracker.doJobLocalization(task); + + TaskInProgress tip = tracker.new TaskInProgress(task, conf); + tip.setJobConf(localizedJobConf); + + // ////////// The central method being tested + tip.localizeTask(task); + // ////////// + + // check the functionality of localizeTask + for (String dir : conf.getStrings("mapred.local.dir")) { + assertTrue("attempt-dir in localDir " + dir + " is not creted!!", + new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId + .toString())).exists()); + } + + LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); + assertTrue("atttempt work dir for " + taskId.toString() + + " is not created in any of the configured dirs!!", lDirAlloc + .getLocalPathToRead(TaskTracker + .getTaskWorkDir(task.getJobID().toString(), task.getTaskID() + .toString(), task.isTaskCleanupTask()), conf) != null); + + TaskRunner runner = task.createRunner(tracker, tip); + + // /////// Another method being tested + runner.setupChildTaskConfiguration(lDirAlloc); + // /////// + + // Make sure the task-conf file is created + Path localTaskFile = + lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task + .getJobID().toString(), task.getTaskID().toString(), task + .isTaskCleanupTask()), conf); + assertTrue("Task conf file " + localTaskFile.toString() + + " is not created!!", new File(localTaskFile.toUri().getPath()) + .exists()); + + // Make sure that the mapred.local.dir is sandboxed + JobConf localizedTaskConf = new JobConf(localTaskFile); + for (String childMapredLocalDir : localizedTaskConf + .getStrings("mapred.local.dir")) { + assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!", + childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId + .toString(), taskId.toString(), false))); + } + + // Make sure task task.getJobFile is changed and pointed correctly. + assertTrue(task.getJobFile().endsWith( + TaskTracker + .getTaskConfFile(jobId.toString(), taskId.toString(), false))); + } +}