diff --git build.xml build.xml index f10cc84..2af891b 100644 --- build.xml +++ build.xml @@ -1657,5 +1657,16 @@ - + + + + + + + + + + diff --git conf/taskcontroller.cfg conf/taskcontroller.cfg index fb4cced..eb2e5ee 100644 --- conf/taskcontroller.cfg +++ conf/taskcontroller.cfg @@ -1,3 +1,2 @@ -mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated -hadoop.pid.dir=#configured HADOOP_PID_DIR -hadoop.indent.str=#configured HADOOP_IDENT_STR +mapred.local.dir=#configured value of mapred.local.dir. It can be a list of comma separated paths. +hadoop.log.dir=#configured value of hadoop.log.dir. \ No newline at end of file diff --git src/c++/task-controller/Makefile.in src/c++/task-controller/Makefile.in index ba35f96..83f418f 100644 --- src/c++/task-controller/Makefile.in +++ src/c++/task-controller/Makefile.in @@ -16,9 +16,11 @@ # limitations under the License. # OBJS=main.o task-controller.o configuration.o +TESTOBJS=test-task-controller.o task-controller.o configuration.o CC=@CC@ CFLAGS = @CFLAGS@ BINARY=task-controller +TESTBINARY=test-task-controller installdir = @prefix@ @@ -34,9 +36,15 @@ task-controller.o: task-controller.c task-controller.h configuration.o: configuration.h configuration.c $(CC) $(CFLAG) -o configuration.o -c configuration.c +test-task-controller.o: task-controller.c task-controller.h + $(CC) $(CFLAG) -o test-task-controller.o -c test-task-controller.c + +test: $(TESTOBJS) + $(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS) + cp $(TESTBINARY) $(installdir) clean: - rm -rf $(BINARY) $(OBJS) + rm -rf $(BINARY) $(OBJS) $(TESTOBJS) install: all cp $(BINARY) $(installdir) diff --git src/c++/task-controller/configuration.c src/c++/task-controller/configuration.c index 05b267a..b6abbf5 100644 --- src/c++/task-controller/configuration.c +++ src/c++/task-controller/configuration.c @@ -71,9 +71,8 @@ void get_configs() { snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR); #endif -#ifdef DEBUG - fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name); -#endif + fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name); + //allocate space for ten configuration items. config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *) * MAX_SIZE); @@ -87,7 +86,7 @@ void get_configs() { while(!feof(conf_file)) { line = (char *) malloc(linesize); if(line == NULL) { - fprintf(LOGFILE,"malloc failed while reading configuration file.\n"); + fprintf(LOGFILE, "malloc failed while reading configuration file.\n"); goto cleanup; } size_read = getline(&line,&linesize,conf_file); @@ -123,9 +122,9 @@ void get_configs() { "Failed allocating memory for single configuration item\n"); goto cleanup; } -#ifdef DEBUG - fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok); -#endif + + fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok); + memset(config.confdetails[config.size], 0, sizeof(struct confentry)); config.confdetails[config.size]->key = (char *) malloc( sizeof(char) * (strlen(equaltok)+1)); @@ -142,9 +141,9 @@ void get_configs() { free(config.confdetails[config.size]); continue; } -#ifdef DEBUG - fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok); -#endif + + fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok); + config.confdetails[config.size]->value = (char *) malloc( sizeof(char) * (strlen(equaltok)+1)); strcpy((char *)config.confdetails[config.size]->value, equaltok); @@ -184,8 +183,7 @@ void get_configs() { * array, next time onwards used the populated array. * */ - -const char * get_value(char* key) { +const char * get_value(const char* key) { int count; if (config.size == 0) { get_configs(); @@ -196,15 +194,19 @@ const char * get_value(char* key) { } for (count = 0; count < config.size; count++) { if (strcmp(config.confdetails[count]->key, key) == 0) { - return config.confdetails[count]->value; + return strdup(config.confdetails[count]->value); } } return NULL; } -const char ** get_values(char * key) { +/** + * Function to return an array of values for a key. + * Value delimiter is assumed to be a comma. + */ +const char ** get_values(const char * key) { const char ** toPass = NULL; - const char * value = get_value(key); + const char *value = get_value(key); char *tempTok = NULL; char *tempstr = NULL; int size = 0; diff --git src/c++/task-controller/configuration.h.in src/c++/task-controller/configuration.h.in index af8086b..ebdf4d8 100644 --- src/c++/task-controller/configuration.h.in +++ src/c++/task-controller/configuration.h.in @@ -53,10 +53,10 @@ extern struct configuration config; extern char *hadoop_conf_dir; #endif //method exposed to get the configurations -const char * get_value(char* key); +const char * get_value(const char* key); //method to free allocated configuration void free_configurations(); //function to return array of values pointing to the key. Values are //comma seperated strings. -const char ** get_values(char* key); +const char ** get_values(const char* key); diff --git src/c++/task-controller/configure.ac src/c++/task-controller/configure.ac index 2fd52a8..8605ac8 100644 --- src/c++/task-controller/configure.ac +++ src/c++/task-controller/configure.ac @@ -38,7 +38,7 @@ AC_PROG_CC # Checks for header files. AC_HEADER_STDC -AC_CHECK_HEADERS([stdlib.h string.h unistd.h]) +AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h]) #check for HADOOP_CONF_DIR @@ -50,12 +50,14 @@ fi # Checks for typedefs, structures, and compiler characteristics. AC_C_CONST AC_TYPE_PID_T +AC_TYPE_MODE_T +AC_TYPE_SIZE_T # Checks for library functions. AC_FUNC_MALLOC AC_FUNC_REALLOC -AC_CHECK_FUNCS([strerror]) +AC_FUNC_CHOWN +AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup]) AC_CONFIG_FILES([Makefile]) AC_OUTPUT - diff --git src/c++/task-controller/main.c src/c++/task-controller/main.c index c19785a..13f13f6 100644 --- src/c++/task-controller/main.c +++ src/c++/task-controller/main.c @@ -17,6 +17,26 @@ */ #include "task-controller.h" +void open_log_file(char *log_file) { + if (log_file == NULL) { + LOGFILE = stdout; + } else { + LOGFILE = fopen(log_file, "a"); + if (LOGFILE == NULL) { + fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file); + LOGFILE = stdout; + } + if (LOGFILE != stdout) { + if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH + | S_IRGRP | S_IWGRP) < 0) { + fprintf(stdout, "Unable to change permission of the log file %s \n", + log_file); + fprintf(stdout, "changing log file to stdout"); + LOGFILE = stdout; + } + } + } +} int main(int argc, char **argv) { int command; @@ -24,6 +44,7 @@ int main(int argc, char **argv) { const char * job_id = NULL; const char * task_id = NULL; const char * tt_root = NULL; + const char *log_dir = NULL; int exit_code = 0; const char * task_pid = NULL; const char* const short_options = "l:"; @@ -35,7 +56,7 @@ int main(int argc, char **argv) { //Minimum number of arguments required to run the task-controller //command-name user command tt-root if (argc < 3) { - display_usage(stderr); + display_usage(stdout); return INVALID_ARGUMENT_NUMBER; } @@ -54,24 +75,9 @@ int main(int argc, char **argv) { break; } } while (next_option != -1); - if (log_file == NULL) { - LOGFILE = stderr; - } else { - LOGFILE = fopen(log_file, "a"); - if (LOGFILE == NULL) { - fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file); - LOGFILE = stderr; - } - if (LOGFILE != stderr) { - if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH - | S_IRGRP | S_IWGRP) < 0) { - fprintf(stderr, "Unable to change permission of the log file %s \n", - log_file); - fprintf(stderr, "changing log file to stderr"); - LOGFILE = stderr; - } - } - } + + open_log_file(log_file); + //checks done for user name //checks done if the user is root or not. if (argv[optind] == NULL) { @@ -88,26 +94,50 @@ int main(int argc, char **argv) { } optind = optind + 1; command = atoi(argv[optind++]); -#ifdef DEBUG + fprintf(LOGFILE, "main : command provided %d\n",command); fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name); -#endif + switch (command) { case LAUNCH_TASK_JVM: tt_root = argv[optind++]; job_id = argv[optind++]; task_id = argv[optind++]; + log_dir = argv[optind++]; exit_code - = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root); + = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root, log_dir); + break; + case INITIALIZE_TASK: + job_id = argv[optind++]; + task_id = argv[optind++]; + log_dir = argv[optind++]; + exit_code = initialize_task(job_id, task_id, log_dir, user_detail->pw_name); break; case TERMINATE_TASK_JVM: + job_id = argv[optind++]; + task_id = argv[optind++]; + log_dir = argv[optind++]; task_pid = argv[optind++]; + exit_code = finalize_task_dirs(job_id, task_id, log_dir, 1); // Not the first task. + // TODO: fix + open_log_file(log_file); exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM); break; case KILL_TASK_JVM: task_pid = argv[optind++]; exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL); break; + case FINALIZE_TASK_DIRS: + job_id = argv[optind++]; + task_id = argv[optind++]; + log_dir = argv[optind++]; + int first_task = atoi(argv[optind++]); // TODO: put in checks for -ve values. + exit_code = finalize_task_dirs(job_id, task_id, log_dir, first_task); + break; + case FINALIZE_JOB: + job_id = argv[optind++]; + exit_code = finalize_job(job_id); + break; default: exit_code = INVALID_COMMAND_PROVIDED; } diff --git src/c++/task-controller/task-controller.c src/c++/task-controller/task-controller.c index 511f0cf..3e56f2b 100644 --- src/c++/task-controller/task-controller.c +++ src/c++/task-controller/task-controller.c @@ -71,104 +71,610 @@ int change_user(const char * user) { return 0; } -// function to check if the passed tt_root is present in hadoop.tmp.dir -int check_tt_root(const char *tt_root) { - char ** mapred_local_dir; - int found = -1; +/** + * Checks the passed value for the variable config_key against the values in + * the configuration. + * Returns 0 if the passed value is found in the configuration, + * -1 otherwise + */ +int check_variable_against_config(const char *config_key, + const char *passed_value) { - if (tt_root == NULL) { + if (config_key == NULL || passed_value == NULL) { return -1; } - mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY); + int found = -1; + + const char **config_value = get_values(config_key); - if (mapred_local_dir == NULL) { + if (config_value == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", config_key); return -1; } - while(*mapred_local_dir != NULL) { - if(strcmp(*mapred_local_dir,tt_root) == 0) { + char **config_val_ptr = (char **) config_value; + while (*config_val_ptr != NULL) { + if (strcmp(*config_val_ptr, passed_value) == 0) { found = 0; break; } + config_val_ptr++; + } + + if (found != 0) { + fprintf( + LOGFILE, + "Invalid value passed: \ + Configured value of %s is %s. \ + Passed value is %s.\n", + config_key, get_value(config_key), passed_value); } - free(mapred_local_dir); + free(config_value); return found; } /** + * Utility function to concatenate argB to argA using the concat_pattern + */ +char *concatenate(const char *argA, const char *argB, char *concat_pattern, + char *return_path_name) { + if (argA == NULL || argB == NULL) { + fprintf(LOGFILE, "One of the arguments passed for %s in null.\n", + return_path_name); + return NULL; + } + + char *return_path = NULL; + int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB); + + return_path = (char *) malloc(sizeof(char) * (str_len + 1)); + if (return_path == NULL) { + fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name); + return NULL; + } + memset(return_path, '\0', str_len + 1); + snprintf(return_path, str_len, concat_pattern, argA, argB); + return return_path; +} + +/** + * Get the job-directory path from tt_root and job-id + */ +char *get_job_directory(const char * tt_root, const char *jobid) { + return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path"); +} + +/** + * Get the job-jars directory from the job_dir + */ +char *get_job_jars_directory(const char *job_dir) { + return concatenate(job_dir, "", JOB_DIR_TO_JARS_PATTERN, + "job_jars_dir_path"); +} + +/** + * Get the work directory from the job_dir + */ +char *get_work_directory(const char *job_dir) { + return concatenate(job_dir, "", JOB_DIR_TO_WORK_DIR_PATTERN, + "job_work_dir_path"); +} + +/** + * Get the attempt directory for the given attempt_id + */ +char *get_attempt_directory(const char *job_dir, const char *attempt_id) { + return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN, + "attempt_dir_path"); +} + +/** + * Get the task_work directory for the given attempt_id + */ +char *get_task_work_directory(const char *job_dir, const char *attempt_id) { + return concatenate(job_dir, attempt_id, JOB_DIR_TO_TASK_WORK_DIR_PATTERN, + "task_work_dir_path"); +} + +/* + * Get the path to the task launcher file which is created by the TT + */ +char *get_task_launcher_file(const char *attempt_dir) { + return concatenate(attempt_dir, "", ATTEMPT_DIR_TO_TASK_SCRIPT_PATTERN, + "task_script_path"); +} + +/** + * Get the log directory for the given attempt. + */ +char *get_task_log_dir(const char *log_dir, const char *attempt_id) { + return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN, + "task_log_dir"); +} + +/** + * Function to check if the passed tt_root is present in mapred.local.dir + * the task-controller is configured with. + */ +int check_tt_root(const char *tt_root) { + return check_variable_against_config(TT_SYS_DIR_KEY, tt_root); +} + +/** + * Function to check if the passed log_dir is present in hadoop.log.dir + * the task-controller is configured with. + */ +int check_task_logs_dir(const char *log_dir) { + return check_variable_against_config(TT_LOG_DIR_KEY, log_dir); +} + +/** * Function to check if the constructed path and absolute * path resolve to one and same. */ - int check_path(char *path) { + fprintf(LOGFILE, "Passed path : %s\n", path); char * resolved_path = (char *) canonicalize_file_name(path); - if(resolved_path == NULL) { + if (resolved_path == NULL) { + switch (errno) { + case ENAMETOOLONG: + fprintf(LOGFILE, "The resulting path is too long.\n"); + break; + case EACCES: + fprintf(LOGFILE, "The path is not readable.\n"); + break; + case ENOENT: + fprintf(LOGFILE, "The input file name is empty or %s", + "the path components does not exist.\n"); + break; + case ELOOP: + fprintf(LOGFILE, "More than `MAXSYMLINKS' many symlinks have been followed.\n"); + break; + } return ERROR_RESOLVING_FILE_PATH; } - if(strcmp(resolved_path, path) !=0) { + fprintf(LOGFILE, "Resolved path : %s\n", resolved_path); + if (strcmp(resolved_path, path) != 0) { free(resolved_path); return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH; } free(resolved_path); return 0; } + /** - * Function to check if a user actually owns the file. + * Function to change the owner/group of a given path. */ -int check_owner(uid_t uid, char *path) { - struct stat filestat; - if(stat(path, &filestat)!=0) { - return UNABLE_TO_STAT_FILE; +int change_owner(const char *path, uid_t uid, gid_t gid) { + int exit_code = chown(path, uid, gid); + if (exit_code != 0) { + switch (errno) { + case (EPERM): + fprintf(LOGFILE, + "Process lacks permission to make the requested change.\n"); + break; + case (EROFS): + fprintf(LOGFILE, "File is on a read-only file system.\n"); + break; + case (EACCES): + fprintf( + LOGFILE, + "Process does not have search permission for a directory \ + component of the file name.\n"); + break; + case (ENAMETOOLONG): + fprintf(LOGFILE, "The file name is too long.\n"); + break; + case (ENOENT): + fprintf(LOGFILE, + "A directory component in the file name doesn't exist.\n"); + break; + case (ENOTDIR): + fprintf( + LOGFILE, + "A directory component in the file name exists, \ + but it isn't a directory.\n"); + break; + case (ELOOP): + fprintf( + LOGFILE, + "Too many symbolic links were resolved \ + while trying to look up the file name.\n"); + break; + default: + fprintf(LOGFILE, "chown failed with error number : %d.\n", errno); + } } - //check owner. - if(uid != filestat.st_uid){ - return FILE_NOT_OWNED_BY_TASKTRACKER; + return exit_code; +} + +/** + * Function to change the mode of a given path. + */ +int change_mode(const char *path, mode_t mode) { + int exit_code = chmod(path, mode); + if (exit_code != 0) { + switch (errno) { + case (ENOENT): + fprintf(LOGFILE, "The named file doesn't exist.\n"); + break; + case (EPERM): + fprintf( + LOGFILE, + "This process does not have permission to change the access\ + permissions of this file.\n"); + break; + case (EROFS): + fprintf(LOGFILE, "The file resides on a read-only file system.\n"); + break; + case (EACCES): + fprintf( + LOGFILE, + "Process does not have search permission for a directory \ + component of the file name.\n"); + break; + case (ENAMETOOLONG): + fprintf(LOGFILE, "The file name is too long.\n"); + break; + case (ENOTDIR): + fprintf( + LOGFILE, + "A directory component in the file name exists, \ + but it isn't a directory.\n"); + break; + case (ELOOP): + fprintf( + LOGFILE, + "Too many symbolic links were resolved \ + while trying to look up the file name.\n"); + break; + default: + fprintf(LOGFILE, "chown failed with error number : %d.\n", errno); + } + } + return exit_code; +} + +/** + * Function to prepare the attempt directories for the task JVM. + * This is done by changing the ownership of the attempt directory recursively + * to the job owner. + */ +int prepare_attempt_directories(const char *job_id, const char *attempt_id, + const char *user) { + if (job_id == NULL || attempt_id == NULL || user == NULL) { + fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n"); + return -1; + } + + if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user); + return -1; + } + + char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *attempt_dir; + char *task_work_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + job_dir = get_job_directory(*local_dir_ptr, job_id); + + // prepare attempt-dir in each of the mapred_local_dir + attempt_dir = get_attempt_directory(job_dir, attempt_id); + if (opendir(attempt_dir) == NULL) { + fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n", + attempt_dir); + } else if (secure_path(attempt_dir, user_detail->pw_uid, + user_detail->pw_gid) != 0) { + fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir); + failed = 0; + } + + // prepare task-work_dir in each of the mapred_local_dir + task_work_dir = get_task_work_directory(job_dir, attempt_id); + if (opendir(task_work_dir) == NULL) { + fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n", + task_work_dir); + } else if (secure_path(task_work_dir, user_detail->pw_uid, + user_detail->pw_gid) != 0) { + fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", task_work_dir); + failed = 0; + } + + local_dir_ptr++; + free(attempt_dir); + free(task_work_dir); + free(job_dir); + } + free(local_dir); + free(full_local_dir_str); + + cleanup(); + if (failed == 0) { + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; } return 0; } -/* - * function to provide path to the task file which is created by the tt - * - *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern +/** + * Function to prepare the job jars directories for the task JVM. Task JVM + * creates symbolic links to the jars in the job directories and hence needs + * access permissions. So here, we change the ownership of the jars directory + * recursively to the job owner. */ -void get_task_file_path(const char * jobid, const char * taskid, - const char * tt_root, char **task_script_path) { - const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY); - *task_script_path = NULL; - int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen( - taskid)) + strlen(tt_root); - - if (mapred_local_dir == NULL) { - return; - } - - *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1)); - if (*task_script_path == NULL) { - fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n"); - free(mapred_local_dir); - return; - } - - memset(*task_script_path,'\0',str_len+1); - snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root, - jobid, taskid); -#ifdef DEBUG - fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path); - fflush(LOGFILE); -#endif - free(mapred_local_dir); +int prepare_jars_and_work_directories(const char *jobid, const char *user) { + if (jobid == NULL || user == NULL) { + fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n"); + return -1; + } + + if (get_user_details(user) < 0) { + fprintf(LOGFILE, "Couldn't get the user details of %s", user); + return -1; + } + + char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *jars_dir; + char *work_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + // prepare jars in each of the mapred_local_dir + job_dir = get_job_directory(*local_dir_ptr, jobid); + + // prepare jars in each of the mapred_local_dir + jars_dir = get_job_jars_directory(job_dir); + if (opendir(jars_dir) == NULL) { + fprintf(LOGFILE, "jars_dir %s doesn't exist. Not doing anything.\n", + jars_dir); + } else if (secure_path(jars_dir, user_detail->pw_uid, user_detail->pw_gid) + != 0) { + fprintf(LOGFILE, "Failed to secure the jars_dir %s\n", jars_dir); + failed = 0; + } + + // prepare work in each of the mapred_local_dir + work_dir = get_work_directory(job_dir); + if (opendir(work_dir) == NULL) { + fprintf(LOGFILE, "work_dir %s doesn't exist. Not doing anything.\n", + jars_dir); + } else if (secure_path(work_dir, user_detail->pw_uid, user_detail->pw_gid) + != 0) { + fprintf(LOGFILE, "Failed to secure the work_dir %s\n", jars_dir); + failed = 0; + } + + local_dir_ptr++; + free(job_dir); + free(jars_dir); + free(work_dir); + } + free(local_dir); + free(full_local_dir_str); + cleanup(); + if (failed == 0) { + return PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED; + } + return 0; +} + +/** + * Function to prepare the task logs for the child. It gives the ownership + * of the attempt's log-dir to the user. It gives readable permissions to + * everyone for the attempt's dir and it's contents. This is only till + * security is fixed for task-logs. + */ +int prepare_task_logs(const char *log_dir, const char *task_id) { + + char *task_log_dir = get_task_log_dir(log_dir, task_id); + if (opendir(task_log_dir) == NULL) { + fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n", + task_log_dir); + // See TaskRunner.java to see that an absent log-dir doesn't fail the task. + return 0; + } + + if (change_owner(task_log_dir, user_detail->pw_uid, user_detail->pw_gid) + != 0) { + fprintf(LOGFILE, "couldn't change the ownership of attempt log dir %s\n", + task_log_dir); + return -1; + } + + // Create and set permissions for all the log related files. + // See org.apache.hadoop.mapred.TaskLog for information about these files. + // TODO: Once MAPREDUCE-AAAA is fixed, the following will not be needed. + char *files[5] = { "stdout", "stderr", "syslog", "profile.out", "debugout" }; + char *all_files[6]; + int i; + for (i = 0; i < 5; i++) { + all_files[i] = files[i]; + } + if (strstr(task_id, ".cleanup") != NULL) { + // This is a cleanup attempt + all_files[5] = "log.index.cleanup"; + } else { + all_files[5] = "log.index"; + } + + char *file_path; + int exit_code = 0; + mode_t old_umask = umask(0033); + for (i = 0; i < 7; i++) { + file_path = concatenate(task_log_dir, all_files[i], "%s/%s", + ("%s_file_path", all_files[i])); + // Give read permissions to everyone and write permissions to the user. + if (creat(file_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) == -1) { + fprintf(LOGFILE, "couldn't create the file %s in the log_dir %s\n", + file_path, task_log_dir); + exit_code = -1; + } else if (change_owner(file_path, user_detail->pw_uid, + user_detail->pw_gid) != 0) { + fprintf(LOGFILE, "couldn't change the ownership of the file %s\n", + file_path); + exit_code = -1; + } + } + umask(old_umask); + return exit_code; +} + +/** + * Function to secure the given path. It does the following recursively: + * 1) changes the owner/group of the paths to the passed owner/group + * 2) changes the file permission to be only readable to the owner. + */ +int secure_path(const char *path, uid_t uid, gid_t gid) { + FTS *tree = NULL; // the file hierarchy + FTSENT *entry = NULL; // a file in the hierarchy + char *paths[] = { (char *)path }; + int process_path = 1; + int error_code = 0; + + // Secure permissions + mode_t file_mode = S_IREAD | S_IWRITE | S_IEXEC; + + // Get physical locations and don't resolve the symlinks. + // Don't change directory while walking the directory. + int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR; + tree = fts_open(paths, ftsoptions, NULL); + while (((entry = fts_read(tree)) != NULL) && error_code == 0) { + switch (entry->fts_info) { + case FTS_D: + // A directory being visited in pre-order. + // We change ownership of directories in post-order. + // so ignore the pre-order visit. + process_path = 1; + break; + case FTS_DC: + // A directory that causes a cycle in the tree + // We don't expect cycles, ignore. + process_path = 1; + break; + case FTS_DNR: + // A directory which cannot be read + // Ignore and set error code. + process_path = 1; + error_code = -1; + break; + case FTS_DOT: + // "." or ".." + process_path = 1; + break; + case FTS_F: + // A regular file + process_path = 0; + break; + case FTS_DP: + // A directory being visited in post-order + process_path = 0; + break; + case FTS_SL: + // A symbolic link + process_path = 0; + break; + case FTS_SLNONE: + // A symbolic link with a nonexistent target + process_path = 0; + break; + case FTS_NS: + // A file for which no stat(2) information was available + // Ignore and set error code + process_path = 1; + error_code = -1; + break; + case FTS_DEFAULT: + // File that doesn't belong to any of the above type. Ignore. + process_path = 1; + break; + } + + if (error_code != 0) { + break; + } + if (process_path == 1) { + continue; + } + + if (change_owner(entry->fts_path, uid, gid) != 0) { + fprintf(LOGFILE, "couldn't change the ownership of %s\n", + entry->fts_path); + error_code = -3; + } else if (change_mode(entry->fts_path, file_mode) != 0) { + fprintf(LOGFILE, "couldn't change the permissions of %s\n", + entry->fts_path); + error_code = -3; + } + } + fts_close(tree); + return error_code; +} + +/** + * Function to finalise directories to be owned by the TaskTracker back again. + * This is used in the following cases: + * 1) For changing ownership of the attempt-directory when the + * attempt finishes and + * 2) For changing ownership of the jars directory when the job finishes. + */ +int finalize_directories(const char *dir_to_be_finalised) { + if (dir_to_be_finalised == NULL) { + fprintf(LOGFILE, "dir_to_be_finalised is null.\n"); + return -1; + } + + if (opendir(dir_to_be_finalised) == NULL) { + fprintf(LOGFILE, + "dir_to_be_finalised %s doesn't exist. Not doing anything.\n", + dir_to_be_finalised); + return 0; + } + + uid_t uid = getuid(); + gid_t gid = getgid(); + + fprintf(LOGFILE, "Securing the path %s recursively to TT.\n", + dir_to_be_finalised); + if (secure_path(dir_to_be_finalised, uid, gid) != 0) { + fprintf(LOGFILE, "Failed to secure the path %s to TT.\n", + dir_to_be_finalised); + return -1; + } + return 0; } -//end of private functions void display_usage(FILE *stream) { fprintf(stream, "Usage: task-controller [-l logfile] user command command-args\n"); } //function used to populate and user_details structure. - int get_user_details(const char *user) { if (user_detail == NULL) { user_detail = getpwnam(user); @@ -180,31 +686,66 @@ int get_user_details(const char *user) { return 0; } +/** + * Function used to initialize task. Prepares attempt_dir, jars_dir and + * log_dir to be accessible by the child + * TODO: fix exit codes + */ +int initialize_task(const char *jobid, const char *taskid, + const char *log_dir, const char *user) { + int exit_code = 0; + fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid); + fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid); + fprintf(LOGFILE, "log_dir passed to initialize_task : %s.\n", log_dir); + + if (prepare_attempt_directories(jobid, taskid, user) != 0) { + fprintf(LOGFILE, + "Couldn't prepare the attempt directories for %s of user %s.\n", + taskid, user); + exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED; + goto cleanup; + } + + if (check_task_logs_dir(log_dir) < 0) { + fprintf(LOGFILE, "Problem with the log directory passed or configured.\n"); + exit_code = INVALID_TT_LOG_DIR; + goto cleanup; + } + + if (prepare_task_logs(log_dir, taskid) != 0) { + fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", + log_dir, taskid); + exit_code = PREPARE_TASK_LOGS_FAILED; + } + + cleanup: + // free configurations + cleanup(); + return exit_code; +} + /* - *Function used to launch a task as the provided user. - * First the function checks if the tt_root passed is found in - * hadoop.temp.dir - * Uses get_task_file_path to fetch the task script file path. - * Does an execlp on the same in order to replace the current image with + * Function used to launch a task as the provided user. It does the following : + * 1) Checks if the tt_root passed is found in mapred.local.dir + * 2) Prepares attempt_dir, jars_dir and log_dir to be accessible by the child + * 3) Uses get_task_launcher_file to fetch the task script file path + * 4) Does an execlp on the same in order to replace the current image with * task image. */ - int run_task_as_user(const char * user, const char *jobid, const char *taskid, - const char *tt_root) { - char *task_script_path = NULL; + const char *tt_root, const char *log_dir) { int exit_code = 0; uid_t uid = getuid(); - if(jobid == NULL || taskid == NULL) { + if (jobid == NULL || taskid == NULL || tt_root == NULL || log_dir == NULL) { return INVALID_ARGUMENT_NUMBER; } -#ifdef DEBUG - fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid); - fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid); - fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root); - fflush(LOGFILE); -#endif + fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid); + fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid); + fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root); + fprintf(LOGFILE, "log_dir passed to run_task_as_user : %s.\n", log_dir); + //Check tt_root before switching the user, as reading configuration //file requires privileged access. if (check_tt_root(tt_root) < 0) { @@ -213,41 +754,113 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid, return INVALID_TT_ROOT; } - //change the user - fclose(LOGFILE); - fcloseall(); - umask(0); - if (change_user(user) != 0) { - cleanup(); - return SETUID_OPER_FAILED; + char *job_dir = NULL, *task_work_dir = NULL, *jars_dir = + NULL, *task_script_path = NULL; + + if (prepare_attempt_directories(jobid, taskid, user) != 0) { + fprintf(LOGFILE, + "Couldn't prepare the attempt directories for %s of user %s.\n", + taskid, user); + exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED; + goto cleanup; + } + + if (prepare_jars_and_work_directories(jobid, user) != 0) { + fprintf(LOGFILE, + "Couldn't prepare jars and work directories %s of user %s.\n", jobid, + user); + exit_code = PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED; + goto cleanup; + } + + if (check_task_logs_dir(log_dir) < 0) { + fprintf(LOGFILE, "Problem with the log directory passed or configured.\n"); + exit_code = INVALID_TT_LOG_DIR; + goto cleanup; + } + + if (prepare_task_logs(log_dir, taskid) != 0) { + fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", log_dir, + taskid); + exit_code = PREPARE_TASK_LOGS_FAILED; + goto cleanup; + } + + job_dir = get_job_directory(tt_root, jobid); + if (job_dir == NULL) { + fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root); + exit_code = OUT_OF_MEMORY; + goto cleanup; } - get_task_file_path(jobid, taskid, tt_root, &task_script_path); + task_work_dir = get_task_work_directory(job_dir, taskid); + if (task_work_dir == NULL) { + fprintf(LOGFILE, "Couldn't obtain task_work_dir for %s in %s.\n", taskid, job_dir); + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + task_script_path = get_task_launcher_file(task_work_dir); if (task_script_path == NULL) { - cleanup(); - return INVALID_TASK_SCRIPT_PATH; + fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir); + exit_code = OUT_OF_MEMORY; + goto cleanup; } + errno = 0; exit_code = check_path(task_script_path); if(exit_code != 0) { goto cleanup; } - errno = 0; - exit_code = check_owner(uid, task_script_path); - if(exit_code != 0) { + + //change the user + fcloseall(); + umask(0077); + if (change_user(user) != 0) { + exit_code = SETUID_OPER_FAILED; goto cleanup; } + errno = 0; cleanup(); execlp(task_script_path, task_script_path, NULL); if (errno != 0) { - free(task_script_path); - exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; - } + switch (errno) { + case E2BIG: + fprintf(LOGFILE, + "The total number of bytes in the environment (envp)\ + and argument list (argv) is too large."); + break; + case EACCES: + fprintf(LOGFILE, + "Search permission is denied on a component of the path prefix\ + of filename or the name of a script interpreter. or"); + fprintf(LOGFILE, + "Execute permission is denied for the file or a script or \ + ELF interpreter."); + break; + case ENOENT: + fprintf(LOGFILE, + "The file filename or a script or ELF interpreter does not exist,\ + or a shared library needed for file or interpreter cannot be found."); + break; + } + free(task_script_path); + exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; + } return exit_code; cleanup: + if (job_dir != NULL) { + free(job_dir); + } + if (task_work_dir != NULL) { + free(task_work_dir); + } + if (jars_dir != NULL) { + free(jars_dir); + } if (task_script_path != NULL) { free(task_script_path); } @@ -261,19 +874,23 @@ cleanup: * The function sends appropriate signal to the process group * specified by the task_pid. */ - int kill_user_task(const char *user, const char *task_pid, int sig) { int pid = 0; if(task_pid == NULL) { return INVALID_ARGUMENT_NUMBER; } + + fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user); + fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid); + fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig); + pid = atoi(task_pid); if(pid <= 0) { return INVALID_TASK_PID; } - fclose(LOGFILE); + fcloseall(); if (change_user(user) != 0) { cleanup(); @@ -283,6 +900,7 @@ int kill_user_task(const char *user, const char *task_pid, int sig) { //Don't continue if the process-group is not alive anymore. if(kill(-pid,0) < 0) { errno = 0; + cleanup(); return 0; } @@ -298,3 +916,147 @@ int kill_user_task(const char *user, const char *task_pid, int sig) { return 0; } +/** + * Function to tidy up things when a task given by task-id finishes. + * As of now, it changes the ownership of the attempt directory and log + * directory back to TT so that + * 1) TT can serve the output in case of map-tasks, + * 2) TT can directly read the logs of finished tasks and + * 3) task dirs can be cleaned up directly by TT once not needed anymore. + */ +int finalize_task_dirs(const char *job_id, const char *task_id, + const char *log_dir, int first_task) { + + if (job_id == NULL || task_id == NULL || log_dir == NULL) { + return INVALID_ARGUMENT_NUMBER; + } + + fprintf(LOGFILE, "job-id passed to finalize_task_dirs : %s.\n", job_id); + fprintf(LOGFILE, "task-d passed to finalize_task_dirs : %s.\n", task_id); + fprintf(LOGFILE, "log_dir passed to finalize_task_dirs : %s.\n", log_dir); + fprintf(LOGFILE, + "first_task passed to finalize_task_dirs : %d.\n", + first_task); + + char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return PREPARE_ATTEMPT_DIRECTORIES_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *attempt_dir; + char *task_work_dir; + char *task_log_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + job_dir = get_job_directory(*local_dir_ptr, job_id); + + // finalise attempt-dir in each of the mapred_local_dir + attempt_dir = get_attempt_directory(job_dir, task_id); + if (opendir(attempt_dir) == NULL) { + fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n", + attempt_dir); + } else if (finalize_directories(attempt_dir)) { + fprintf(LOGFILE, "Failed to finalise the directory %s\n", attempt_dir); + failed = 0; + } + + if (first_task == 1) { + // Not the first task. + + // finalise task-work-dir in each of the mapred_local_dir + // this is used in case of jvm reuse. + task_work_dir = get_task_work_directory(job_dir, task_id); + if (opendir(task_work_dir) == NULL) { + fprintf(LOGFILE, + "task_work_dir %s doesn't exist. Not doing anything.\n", + task_work_dir); + } else if (finalize_directories(task_work_dir)) { + fprintf(LOGFILE, "Failed to finalise the directory %s\n", + task_work_dir); + failed = 0; + } + free(task_work_dir); + } + + local_dir_ptr++; + free(attempt_dir); + free(job_dir); + } + free(local_dir); + free(full_local_dir_str); + + cleanup(); + if (failed == 0) { + return FINALIZE_TASK_DIRS_FAILED; + } + return 0; +} + +/** + * Function to tidy up things once the job given by job-id finishes. + * As of now, it changes the ownership of job jars back to the TT so that they + * can be cleaned up by TT itself. + */ +int finalize_job(const char *jobid) { + if (jobid == NULL) { + return INVALID_ARGUMENT_NUMBER; + } + + fprintf(LOGFILE, "Job-id passed to finalize_job : %s.\n", jobid); + + char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY); + char **local_dir = (char **) get_values(TT_SYS_DIR_KEY); + + if (local_dir == NULL) { + fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); + cleanup(); + return FINALIZE_JOB_FAILED; + } + + fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY, + full_local_dir_str); + + char *job_dir; + char *jars_dir; + char *work_dir; + char **local_dir_ptr = local_dir; + int failed = 1; + while (*local_dir_ptr != NULL) { + job_dir = get_job_directory(*local_dir_ptr, jobid); + + // finalise jars in each of the mapred_local_dir + jars_dir = get_job_jars_directory(job_dir); + if (finalize_directories(jars_dir) != 0) { + fprintf(LOGFILE, "Failed to finalise the jars_dir %s\n", jars_dir); + failed = 0; + } + + // finalise work dir in each of the mapred_local_dir + work_dir = get_work_directory(job_dir); + if (finalize_directories(work_dir) != 0) { + fprintf(LOGFILE, "Failed to finalise the work_dir %s\n", work_dir); + failed = 0; + } + + local_dir_ptr++; + free(job_dir); + free(work_dir); + free(jars_dir); + } + free(local_dir); + free(full_local_dir_str); + cleanup(); + if (failed == 0) { + return FINALIZE_JOB_FAILED; + } + return 0; +} diff --git src/c++/task-controller/task-controller.h src/c++/task-controller/task-controller.h index 8e545b5..7665190 100644 --- src/c++/task-controller/task-controller.h +++ src/c++/task-controller/task-controller.h @@ -28,14 +28,21 @@ #include #include #include -#include +#include +#include +#include +#include + #include "configuration.h" //command definitions enum command { LAUNCH_TASK_JVM, + INITIALIZE_TASK, TERMINATE_TASK_JVM, - KILL_TASK_JVM + KILL_TASK_JVM, + FINALIZE_TASK_DIRS, + FINALIZE_JOB, }; enum errorcodes { @@ -45,21 +52,41 @@ enum errorcodes { SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4 INVALID_TT_ROOT, //5 SETUID_OPER_FAILED, //6 - INVALID_TASK_SCRIPT_PATH, //7 - UNABLE_TO_EXECUTE_TASK_SCRIPT, //8 - UNABLE_TO_KILL_TASK, //9 - INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10 - INVALID_TASK_PID, //11 - ERROR_RESOLVING_FILE_PATH, //12 - RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13 - UNABLE_TO_STAT_FILE, //14 - FILE_NOT_OWNED_BY_TASKTRACKER //15 + UNABLE_TO_EXECUTE_TASK_SCRIPT, //7 + UNABLE_TO_KILL_TASK, //8 + INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //9 + INVALID_TASK_PID, //10 + ERROR_RESOLVING_FILE_PATH, //11 + RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //12 + UNABLE_TO_STAT_FILE, //13 + FILE_NOT_OWNED_BY_TASKTRACKER, //14 + PREPARE_ATTEMPT_DIRECTORIES_FAILED, //15 + PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED, //16 + PREPARE_TASK_LOGS_FAILED, //17 + INVALID_TT_LOG_DIR, //18 + FINALIZE_TASK_DIRS_FAILED, //19 + FINALIZE_JOB_FAILED, //20 + OUT_OF_MEMORY, //21 }; +#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s" + +#define TT_ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN"/%s" + +#define JOB_DIR_TO_JARS_PATTERN "%s/jars" + +#define JOB_DIR_TO_WORK_DIR_PATTERN "%s/work" -#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh" +#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s" + +#define JOB_DIR_TO_TASK_WORK_DIR_PATTERN "%s/task-work/%s" + +#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s" + +#define ATTEMPT_DIR_TO_TASK_SCRIPT_PATTERN "%s/taskjvm.sh" #define TT_SYS_DIR_KEY "mapred.local.dir" +#define TT_LOG_DIR_KEY "hadoop.log.dir" #define MAX_ITEMS 10 @@ -74,8 +101,28 @@ extern FILE *LOGFILE; void display_usage(FILE *stream); -int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root); +int run_task_as_user(const char * user, const char *jobid, const char *taskid, + const char *tt_root, const char *log_dir); + +int initialize_task(const char *jobid, const char *taskid, + const char *log_dir, const char *user); int kill_user_task(const char *user, const char *task_pid, int sig); +int finalize_task_dirs(const char *jobid, const char *taskid, + const char *log_dir, int first_task); + +int prepare_attempt_directory(const char *attempt_dir, const char *user); + +int finalize_job(const char *job_id); + +// The following functions are exposed for testing + +int check_variable_against_config(const char *config_key, + const char *passed_value); + int get_user_details(const char *user); + +int check_path(char *path); + +char *get_job_cache_directory(const char * tt_root); diff --git src/c++/task-controller/test-task-controller.c src/c++/task-controller/test-task-controller.c new file mode 100644 index 0000000..9660f1a --- /dev/null +++ src/c++/task-controller/test-task-controller.c @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "task-controller.h" + +#define HADOOP_CONF_DIR "/tmp" + +int write_config_file(char *file_name) { + FILE *file; + char const *str = + "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n"; + + file = fopen(file_name, "w"); + if (file == NULL) { + printf("Failed to open %s.\n", file_name); + return EXIT_FAILURE; + } + fwrite(str, 1, strlen(str), file); + fclose(file); + return 0; +} + +void test_check_variable_against_config() { + + // A temporary configuration directory + char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX"; + + // To accomodate "/conf/taskcontroller.cfg" + char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")]; + + strcpy(template, conf_dir_templ); + char *temp_dir = mkdtemp(template); + if (temp_dir == NULL) { + printf("Couldn't create a temporary dir for conf.\n"); + goto cleanup; + } + + // Set the configuration directory + hadoop_conf_dir = strdup(temp_dir); + + // create the configuration directory + strcat(template, "/conf"); + char *conf_dir = strdup(template); + mkdir(conf_dir, S_IRWXU); + + // create the configuration file + strcat(template, "/taskcontroller.cfg"); + if (write_config_file(template) != 0) { + printf("Couldn't write the configuration file.\n"); + goto cleanup; + } + + // Test obtaining a value for a key from the config + char *config_values[4] = { "/tmp/testing1", "/tmp/testing2", + "/tmp/testing3", "/tmp/testing4" }; + char *value = (char *) get_value("mapred.local.dir"); + if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4") + != 0) { + printf("Obtaining a value for a key from the config failed.\n"); + goto cleanup; + } + + // Test the parsing of a multiple valued key from the config + char **values = (char **)get_values("mapred.local.dir"); + char **values_ptr = values; + int i = 0; + while (*values_ptr != NULL) { + printf(" value : %s\n", *values_ptr); + if (strcmp(*values_ptr, config_values[i++]) != 0) { + printf("Configured values are not read out properly. Test failed!"); + goto cleanup;; + } + values_ptr++; + } + + if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) { + printf("Configuration should not contain /tmp/testing5! \n"); + goto cleanup; + } + + if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) { + printf("Configuration should contain /tmp/testing4! \n"); + goto cleanup; + } + + cleanup: if (value != NULL) { + free(value); + } + if (values != NULL) { + free(values); + } + if (hadoop_conf_dir != NULL) { + free(hadoop_conf_dir); + } + unlink(template); + rmdir(conf_dir); + rmdir(hadoop_conf_dir); +} + +void test_get_job_directory() { + char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001"); + printf("job_dir obtained is %s\n", job_dir); + int ret = 0; + if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) { + ret = -1; + } + free(job_dir); + assert(ret == 0); +} + +void test_get_attempt_directory() { + char *attempt_dir = (char *) get_attempt_directory( + "/tmp/taskTracker/jobcache/job_200906101234_0001", + "attempt_200906112028_0001_m_000000_0"); + printf("attempt_dir obtained is %s\n", attempt_dir); + int ret = 0; + if (strcmp( + attempt_dir, + "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0") + != 0) { + ret = -1; + } + free(attempt_dir); + assert(ret == 0); +} + +void test_get_task_launcher_file() { + char + *task_file = + (char *) get_task_launcher_file( + "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0"); + printf("task_file obtained is %s\n", task_file); + int ret = 0; + if (strcmp( + task_file, + "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh") + != 0) { + ret = -1; + } + free(task_file); + assert(ret == 0); +} + +void test_get_task_log_dir() { + char *logdir = (char *) get_task_log_dir("/tmp/testing", + "attempt_200906112028_0001_m_000000_0"); + printf("logdir obtained is %s\n", logdir); + int ret = 0; + if (strcmp(logdir, + "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) { + ret = -1; + } + free(logdir); + assert(ret == 0); +} + +int main(int argc, char **argv) { + printf("Starting tests\n"); + LOGFILE = stdout; + test_check_variable_against_config(); + test_get_job_directory(); + test_get_attempt_directory(); + test_get_task_launcher_file(); + test_get_task_log_dir(); + printf("Finished tests\n"); + return 0; +} diff --git src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java index 8e6842e..8439824 100644 --- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java +++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java @@ -65,7 +65,7 @@ public class TestStreamingAsDifferentUser extends "stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") }; StreamJob streamJob = new StreamJob(args, true); streamJob.setConf(myConf); - streamJob.go(); + assertTrue("Job has not succeeded", streamJob.go() == 0); assertOwnerShip(outputPath); } } diff --git src/java/org/apache/hadoop/mapred/BackupStore.java src/java/org/apache/hadoop/mapred/BackupStore.java index 52d0175..ed1db93 100644 --- src/java/org/apache/hadoop/mapred/BackupStore.java +++ src/java/org/apache/hadoop/mapred/BackupStore.java @@ -548,10 +548,9 @@ public class BackupStore { boolean isActive() { return isActive; } private Writer createSpillFile() throws IOException { - Path tmp = new Path( - TaskTracker.getIntermediateOutputDir( - tid.getJobID().toString(), tid.toString()) + - "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); + Path tmp = + new Path(TaskTracker.getBaseIntermediateOutputDir() + "/backup_" + + tid.getId() + "_" + (spillNumber++) + ".out"); LOG.info("Created file: " + tmp); diff --git src/java/org/apache/hadoop/mapred/Child.java src/java/org/apache/hadoop/mapred/Child.java index 3ea7dc8..ab593cf 100644 --- src/java/org/apache/hadoop/mapred/Child.java +++ src/java/org/apache/hadoop/mapred/Child.java @@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.metrics.jvm.JvmMetrics; import org.apache.log4j.LogManager; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; /** * The main() for child processes. @@ -168,14 +169,15 @@ class Child { LOG.fatal("FSError from child", e); umbilical.fsError(taskid, e.getMessage()); } catch (Throwable throwable) { - LOG.warn("Error running child", throwable); + LOG.warn("Error running child : " + + StringUtils.stringifyException(throwable)); try { if (task != null) { // do cleanup for the task task.taskCleanup(umbilical); } } catch (Throwable th) { - LOG.info("Error cleaning up" + th); + LOG.info("Error cleaning up : " + StringUtils.stringifyException(th)); } // Report back any failures, for diagnostic purposes ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git src/java/org/apache/hadoop/mapred/DefaultTaskController.java src/java/org/apache/hadoop/mapred/DefaultTaskController.java index 174403b..58007fd 100644 --- src/java/org/apache/hadoop/mapred/DefaultTaskController.java +++ src/java/org/apache/hadoop/mapred/DefaultTaskController.java @@ -46,6 +46,7 @@ class DefaultTaskController extends TaskController { * This method launches the new JVM for the task by executing the * the JVM command using the {@link Shell.ShellCommandExecutor} */ + @Override void launchTaskJVM(TaskController.TaskControllerContext context) throws IOException { JvmEnv env = context.env; @@ -59,7 +60,7 @@ class DefaultTaskController extends TaskController { context.shExec = shexec; shexec.execute(); } - + /** * Initialize the task environment. * @@ -72,21 +73,6 @@ class DefaultTaskController extends TaskController { // So this is a dummy method. return; } - - - @Override - void setup() { - // nothing to setup - return; - } - - /* - * No need to do anything as we don't need to do as we dont need anything - * extra from what TaskTracker has done. - */ - @Override - void initializeJob(JobID jobId) { - } @Override void terminateTask(TaskControllerContext context) { @@ -132,5 +118,17 @@ class DefaultTaskController extends TaskController { } } } - + + @Override + void finalizeTaskDirs(TaskControllerContext context) + throws IOException { + // Do nothing + } + + @Override + void finalizeJob(JobID jobId, String workDir, String user) + throws IOException { + // Do nothing + + } } diff --git src/java/org/apache/hadoop/mapred/JvmManager.java src/java/org/apache/hadoop/mapred/JvmManager.java index c2a463b..35aae9d 100644 --- src/java/org/apache/hadoop/mapred/JvmManager.java +++ src/java/org/apache/hadoop/mapred/JvmManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.ProcessTree; +import org.apache.hadoop.util.StringUtils; class JvmManager { @@ -185,16 +186,24 @@ class JvmManager { TaskRunner taskRunner = jvmToRunningTask.get(jvmId); JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); Task task = taskRunner.getTaskInProgress().getTask(); - TaskControllerContext context = - new TaskController.TaskControllerContext(); + + // Initialize task dirs + TaskControllerContext context = + new TaskController.TaskControllerContext(); context.env = jvmRunner.env; context.task = task; - //If we are returning the same task as which the JVM was launched - //we don't initialize task once again. - if(!jvmRunner.env.conf.get("mapred.task.id"). - equals(task.getTaskID().toString())) { - tracker.getTaskController().initializeTask(context); + // If we are returning the same task as which the JVM was launched + // we don't initialize task once again. + if (!jvmRunner.env.conf.get("mapred.task.id").equals( + task.getTaskID().toString())) { + try { + tracker.getTaskController().initializeTask(context); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } + return taskRunner.getTaskInProgress(); } return null; @@ -218,7 +227,7 @@ class JvmManager { synchronized public void taskKilled(TaskRunner tr) { JVMId jvmId = runningTaskToJvm.remove(tr); if (jvmId != null) { - jvmToRunningTask.remove(jvmId); + TaskRunner runner = jvmToRunningTask.remove(jvmId); killJvm(jvmId); } } @@ -393,8 +402,9 @@ class JvmManager { //Launch the task controller to run task JVM initalContext.task = jvmToRunningTask.get(jvmId).getTask(); initalContext.env = env; - tracker.getTaskController().initializeTask(initalContext); + LOG.info("Launching "+ initalContext.task.getTaskID()); tracker.getTaskController().launchTaskJVM(initalContext); + LOG.info("Finished "+ initalContext.task.getTaskID()); } catch (IOException ioe) { // do nothing // error and output are appropriately redirected @@ -403,9 +413,9 @@ class JvmManager { if (shexec == null) { return; } - + kill(); - + int exitCode = shexec.getExitCode(); updateOnJvmExit(jvmId, exitCode); LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + @@ -438,6 +448,7 @@ class JvmManager { .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); + // Destroy the task jvm controller.destroyTaskJVM(initalContext); } else { LOG.info(String.format("JVM Not killed %s but just removed", jvmId @@ -462,6 +473,35 @@ class JvmManager { return busy; } } + + /** + * Finalize the task directories. + * @param runner + * @throws IOException + */ + public void finalizeTaskDirs(TaskRunner runner) + throws IOException { + JVMId jvmId = runningTaskToJvm.get(runner); + if (jvmId == null) { + LOG.debug("JvmId is null, not doing anything for finalizeTaskDirs."); + return; + } + JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); + if (jvmRunner == null) { + LOG + .debug("JvmRunner is null, not doing anything for finalizeTaskDirs."); + return; + } + TaskControllerContext taskContext = new TaskControllerContext(); + taskContext.env = jvmRunner.initalContext.env; + taskContext.task = runner.getTask(); + if (taskContext.env != null && taskContext.task != null) { + tracker.getTaskController().finalizeTaskDirs(taskContext); + } else { + LOG.debug("taskContext.env or taskContext.env is null," + + " not doing anything for finalizeTaskDirs."); + } + } } static class JvmEnv { //Helper class List vargs; @@ -485,4 +525,19 @@ class JvmManager { this.conf = conf; } } + + /** + * Finalize the task directories once the task finishes. + * + * @param runner + * @throws IOException + */ + void finalizeTaskDirs(TaskRunner runner) + throws IOException { + if (runner.getTask().isMapTask()) { + mapJvmManager.finalizeTaskDirs(runner); + } else { + reduceJvmManager.finalizeTaskDirs(runner); + } + } } diff --git src/java/org/apache/hadoop/mapred/LinuxTaskController.java src/java/org/apache/hadoop/mapred/LinuxTaskController.java index c21ccb1..ad1d7a4 100644 --- src/java/org/apache/hadoop/mapred/LinuxTaskController.java +++ src/java/org/apache/hadoop/mapred/LinuxTaskController.java @@ -28,9 +28,9 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.mapred.TaskController.TaskControllerContext; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -73,44 +73,20 @@ class LinuxTaskController extends TaskController { new File(hadoopBin, "task-controller").getAbsolutePath(); } - // The list of directory paths specified in the - // variable mapred.local.dir. This is used to determine - // which among the list of directories is picked up - // for storing data for a particular task. - private String[] mapredLocalDirs; - - // permissions to set on files and directories created. - // When localized files are handled securely, this string - // will change to something more restrictive. Until then, - // it opens up the permissions for all, so that the tasktracker - // and job owners can access files together. - private static final String FILE_PERMISSIONS = "ugo+rwx"; - - // permissions to set on components of the path leading to - // localized files and directories. Read and execute permissions - // are required for different users to be able to access the - // files. - private static final String PATH_PERMISSIONS = "go+rx"; - public LinuxTaskController() { super(); } - @Override - public void setConf(Configuration conf) { - super.setConf(conf); - mapredLocalDirs = conf.getStrings("mapred.local.dir"); - //Setting of the permissions of the local directory is done in - //setup() - } - /** * List of commands that the setuid script will execute. */ enum TaskCommands { LAUNCH_TASK_JVM, + INITIALIZE_TASK, TERMINATE_TASK_JVM, - KILL_TASK_JVM + KILL_TASK_JVM, + FINALIZE_TASK_DIRS, + FINALIZE_JOB, } /** @@ -155,17 +131,59 @@ class LinuxTaskController extends TaskController { try { shExec.execute(); } catch (Exception e) { - LOG.warn("Exception thrown while launching task JVM : " + - StringUtils.stringifyException(e)); + // TODO: check for 143 (SIGTERM) and 137 (SIGKILL) exit codes + LOG.warn("Exception thrown while launching task JVM : " + + StringUtils.stringifyException(e)); LOG.warn("Exit code from task is : " + shExec.getExitCode()); - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); + LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); + logOutput(shExec.getOutput()); + throw new IOException(e); + } + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); + logOutput(shExec.getOutput()); + } + } + + + @Override + void initializeTask(TaskControllerContext context) + throws IOException { + LOG.info("Going to initialize task for " + + context.task.getTaskID().toString()); + + // TODO: check archive related chmod error from logs of child in pipes + // application. In debug mode. + + ShellCommandExecutor shExec = + buildTaskControllerExecutor(TaskCommands.INITIALIZE_TASK, + context.env.conf.getUser(), buildInitializeTaskArgs(context), + context.env); + try { + shExec.execute(); + } catch (Exception e) { + LOG.warn("Exit code from initializeTaskDirs is : " + + shExec.getExitCode()); + LOG.warn("Exception thrown by initializeTaskDirs : " + + StringUtils.stringifyException(e)); + LOG.info("Output from LinuxTaskController's initializeTaskDirs follows:"); + logOutput(shExec.getOutput()); throw new IOException(e); } - if(LOG.isDebugEnabled()) { - LOG.debug("output after executing task jvm = " + shExec.getOutput()); + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's initializeTaskDirs follows:"); + logOutput(shExec.getOutput()); } } + private void logOutput(String output) { + String shExecOutput = output; + if (shExecOutput != null) { + for (String str : shExecOutput.split("\n")) { + LOG.info(str); + } + } + } /** * Returns list of arguments to be passed while launching task VM. * See {@code buildTaskControllerExecutor(TaskCommands, @@ -177,8 +195,11 @@ class LinuxTaskController extends TaskController { List commandArgs = new ArrayList(3); String taskId = context.task.getTaskID().toString(); String jobId = getJobId(context); - LOG.debug("getting the task directory as: " + LOG.info("getting the task directory as: " + getTaskCacheDirectory(context)); + LOG.info("getting the tt_root as " +getDirectoryChosenForTask( + new File(getTaskCacheDirectory(context)), + context) ); commandArgs.add(getDirectoryChosenForTask( new File(getTaskCacheDirectory(context)), context)); @@ -188,9 +209,43 @@ class LinuxTaskController extends TaskController { }else { commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); } + File logDir = new File(TaskLog.getBaseLogDir()); + try { + commandArgs.add(logDir.getCanonicalPath()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return commandArgs; } - + + /** + * Returns list of arguments to be passed while initializing a new task. + * See {@code buildTaskControllerExecutor(TaskCommands, + * String, List, JvmEnv)} documentation. + * @param context + * @return Argument to be used while launching Task VM + */ + private List buildInitializeTaskArgs(TaskControllerContext context) { + List commandArgs = new ArrayList(3); + String taskId = context.task.getTaskID().toString(); + String jobId = getJobId(context); + commandArgs.add(jobId); + if (!context.task.isTaskCleanupTask()) { + commandArgs.add(taskId); + } else { + commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); + } + File logDir = new File(TaskLog.getBaseLogDir()); + try { + commandArgs.add(logDir.getCanonicalPath()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return commandArgs; + } + // get the Job ID from the information in the TaskControllerContext private String getJobId(TaskControllerContext context) { String taskId = context.task.getTaskID().toString(); @@ -208,8 +263,9 @@ class LinuxTaskController extends TaskController { String taskId = context.task.getTaskID().toString(); for (String dir : mapredLocalDirs) { File mapredDir = new File(dir); - File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir( - jobId, taskId, context.task.isTaskCleanupTask())); + // TODO: fix + File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir( + jobId, taskId, context.task.isTaskCleanupTask())).getParentFile(); if (directory.equals(taskDir)) { return dir; } @@ -219,68 +275,7 @@ class LinuxTaskController extends TaskController { throw new IllegalArgumentException("invalid task cache directory " + directory.getAbsolutePath()); } - - /** - * Setup appropriate permissions for directories and files that - * are used by the task. - * - * As the LinuxTaskController launches tasks as a user, different - * from the daemon, all directories and files that are potentially - * used by the tasks are setup with appropriate permissions that - * will allow access. - * - * Until secure data handling is implemented (see HADOOP-4491 and - * HADOOP-4493, for e.g.), the permissions are set up to allow - * read, write and execute access for everyone. This will be - * changed to restricted access as data is handled securely. - */ - void initializeTask(TaskControllerContext context) { - // Setup permissions for the job and task cache directories. - setupTaskCacheFileAccess(context); - // setup permissions for task log directory - setupTaskLogFileAccess(context); - } - - // Allows access for the task to create log files under - // the task log directory - private void setupTaskLogFileAccess(TaskControllerContext context) { - TaskAttemptID taskId = context.task.getTaskID(); - File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG); - String taskAttemptLogDir = f.getParentFile().getAbsolutePath(); - changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false); - } - // Allows access for the task to read, write and execute - // the files under the job and task cache directories - private void setupTaskCacheFileAccess(TaskControllerContext context) { - String taskId = context.task.getTaskID().toString(); - JobID jobId = JobID.forName(getJobId(context)); - //Change permission for the task across all the disks - for(String localDir : mapredLocalDirs) { - File f = new File(localDir); - File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir( - jobId.toString(), taskId, context.task.isTaskCleanupTask())); - if(taskCacheDir.exists()) { - changeDirectoryPermissions(taskCacheDir.getPath(), - FILE_PERMISSIONS, true); - } - }//end of local directory Iteration - } - - // convenience method to execute chmod. - private void changeDirectoryPermissions(String dir, String mode, - boolean isRecursive) { - int ret = 0; - try { - ret = FileUtil.chmod(dir, mode, isRecursive); - } catch (Exception e) { - LOG.warn("Exception in changing permissions for directory " + dir + - ". Exception: " + e.getMessage()); - } - if (ret != 0) { - LOG.warn("Could not change permissions for directory " + dir); - } - } /** * Builds the command line for launching/terminating/killing task JVM. * Following is the format for launching/terminating/killing task JVM @@ -333,10 +328,12 @@ class LinuxTaskController extends TaskController { // is different from what is set with respect with // env.workDir. Hence building this from the taskId everytime. String taskId = context.task.getTaskID().toString(); + // TODO: fix the following line or document it. File cacheDirForJob = context.env.workDir.getParentFile().getParentFile(); if(context.task.isTaskCleanupTask()) { taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX; } + return new File(cacheDirForJob, taskId).getAbsolutePath(); } @@ -363,75 +360,11 @@ class LinuxTaskController extends TaskController { if (pw != null) { pw.close(); } - // set execute permissions for all on the file. File f = new File(commandFile); - if (f.exists()) { - f.setReadable(true, false); - f.setExecutable(true, false); - } - } - } - - - /** - * Sets up the permissions of the following directories: - * - * Job cache directory - * Archive directory - * Hadoop log directories - * - */ - @Override - void setup() { - //set up job cache directory and associated permissions - String localDirs[] = this.mapredLocalDirs; - for(String localDir : localDirs) { - //Cache root - File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir()); - File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir()); - if(!cacheDirectory.exists()) { - if(!cacheDirectory.mkdirs()) { - LOG.warn("Unable to create cache directory : " + - cacheDirectory.getPath()); - } + if (!f.exists()) { + throw new IOException("Taskjvm.sh doesn't exist!!!"); } - if(!jobCacheDirectory.exists()) { - if(!jobCacheDirectory.mkdirs()) { - LOG.warn("Unable to create job cache directory : " + - jobCacheDirectory.getPath()); - } - } - //Give world writable permission for every directory under - //mapred-local-dir. - //Child tries to write files under it when executing. - changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true); - }//end of local directory manipulations - //setting up perms for user logs - File taskLog = TaskLog.getUserLogDir(); - changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false); - } - - /* - * Create Job directories across disks and set their permissions to 777 - * This way when tasks are run we just need to setup permissions for - * task folder. - */ - @Override - void initializeJob(JobID jobid) { - for(String localDir : this.mapredLocalDirs) { - File jobDirectory = new File(localDir, - TaskTracker.getLocalJobDir(jobid.toString())); - if(!jobDirectory.exists()) { - if(!jobDirectory.mkdir()) { - LOG.warn("Unable to create job cache directory : " - + jobDirectory.getPath()); - continue; - } - } - //Should be recursive because the jar and work folders might be - //present under the job cache directory - changeDirectoryPermissions( - jobDirectory.getPath(), FILE_PERMISSIONS, true); + DiskChecker.setPermissions(f, DiskChecker.sevenZeroZero); } } @@ -443,15 +376,63 @@ class LinuxTaskController extends TaskController { * * * @param context context of task which has to be passed kill signal. + * @throws IOException * */ private List buildKillTaskCommandArgs(TaskControllerContext - context){ + context) throws IOException{ List killTaskJVMArgs = new ArrayList(); killTaskJVMArgs.add(context.pid); return killTaskJVMArgs; } - + + /** + * Build the command line to be passed to LinuxTaskController binary to + * finalize task directories once a task finishes. + * + * @param context + * @return + */ + private List buildFinalizeTaskDirsCommandArgs( + TaskControllerContext context) { + List commandArgs = new ArrayList(3); + String taskId = context.task.getTaskID().toString(); + String jobId = getJobId(context); + commandArgs.add(jobId); + if (!context.task.isTaskCleanupTask()) { + commandArgs.add(taskId); + } else { + commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); + } + File logDir = new File(TaskLog.getBaseLogDir()); + try { + commandArgs.add(logDir.getCanonicalPath()); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (context.env.conf.get("mapred.task.id").equals( + context.task.getTaskID().toString())) { + commandArgs.add(String.valueOf(0)); // true. First task + } else { + commandArgs.add(String.valueOf(1)); // false. Not the first task. + } + return commandArgs; + } + + /** + * Build the command line to be passed to LinuxTaskController binary to + * actions for finalizing job. + * + * @param context + * @return + */ + private List buildFinalizeJobCommandArgs(JobID jobId) { + List finalizeJobArgs = new ArrayList(); + finalizeJobArgs.add(jobId.toString()); + return finalizeJobArgs; + } + /** * Convenience method used to sending appropriate Kill signal to the task * VM @@ -465,15 +446,33 @@ class LinuxTaskController extends TaskController { LOG.info("Context task null not killing the JVM"); return; } - ShellCommandExecutor shExec = buildTaskControllerExecutor( - command, context.env.conf.getUser(), - buildKillTaskCommandArgs(context), context.env); + if (context.pid == null) { + LOG.warn("pid is null. Cannot kill task"); + return; + } + + List cmdArgs = null; + if (command.equals(TaskCommands.TERMINATE_TASK_JVM)) { + cmdArgs = buildFinalizeTaskDirsCommandArgs(context); + cmdArgs.addAll(buildKillTaskCommandArgs(context)); + } else if (command.equals(TaskCommands.KILL_TASK_JVM)) { + cmdArgs = buildKillTaskCommandArgs(context); + } + ShellCommandExecutor shExec = + buildTaskControllerExecutor(command, context.env.conf.getUser(), + cmdArgs, context.env); try { shExec.execute(); } catch (Exception e) { - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); + LOG.warn("Exit code from finishTask is : " + shExec.getExitCode()); + LOG.info("Output from LinuxTaskController's finishTask follows:"); + logOutput(shExec.getOutput()); throw new IOException(e); } + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's finishTask follows:"); + logOutput(shExec.getOutput()); + } } @Override @@ -498,6 +497,56 @@ class LinuxTaskController extends TaskController { protected String getTaskControllerExecutablePath() { return taskControllerExe; - } + } + + @Override + void finalizeTaskDirs(TaskControllerContext context) + throws IOException { + LOG.info("Going to finalize task directories for " + + context.task.getTaskID().toString()); + ShellCommandExecutor shExec = + buildTaskControllerExecutor(TaskCommands.FINALIZE_TASK_DIRS, + context.env.conf.getUser(), + buildFinalizeTaskDirsCommandArgs(context), context.env); + try { + shExec.execute(); + } catch (Exception e) { + LOG.warn("Exit code from finalizeTaskDirs is : " + shExec.getExitCode()); + LOG.warn("Exception thrown by finalizeTaskDirs : " + + StringUtils.stringifyException(e)); + LOG.info("Output from LinuxTaskController's finalizeTaskDirs follows:"); + logOutput(shExec.getOutput()); + throw new IOException(e); + } + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's finalizeTaskDirs follows:"); + logOutput(shExec.getOutput()); + } + } + + @Override + void finalizeJob(JobID jobId, String workDir, String user) + throws IOException { + LOG.info("Going to finalize job for " + jobId); + JvmEnv env = + new JvmEnv(null, null, null, null, -1, new File(workDir), null, null); + ShellCommandExecutor shExec = + buildTaskControllerExecutor(TaskCommands.FINALIZE_JOB, user, + buildFinalizeJobCommandArgs(jobId), env); + try { + shExec.execute(); + } catch (Exception e) { + LOG.warn("Exit code from finalizeJob is : " + shExec.getExitCode()); + LOG.warn("Exception thrown by finalizeJob : " + + StringUtils.stringifyException(e)); + LOG.info("Output from LinuxTaskController's finalizeJob follows:"); + logOutput(shExec.getOutput()); + throw new IOException(e); + } + if (LOG.isDebugEnabled()) { + LOG.info("Output from LinuxTaskController's finalizeJob follows:"); + logOutput(shExec.getOutput()); + } + } } diff --git src/java/org/apache/hadoop/mapred/LocalJobRunner.java src/java/org/apache/hadoop/mapred/LocalJobRunner.java index 407f5e6..a3a24c8 100644 --- src/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -191,10 +191,10 @@ class LocalJobRunner implements JobSubmissionProtocol { for (int i = 0; i < mapIds.size(); i++) { if (!this.isInterrupted()) { TaskAttemptID mapId = mapIds.get(i); - Path mapOut = this.mapoutputFile.getOutputFile(mapId); - Path reduceIn = this.mapoutputFile.getInputFileForWrite( - mapId.getTaskID(),reduceId, - localFs.getFileStatus(mapOut).getLen()); + Path mapOut = this.mapoutputFile.getOutputFile(); + Path reduceIn = + this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(), + localFs.getFileStatus(mapOut).getLen()); if (!localFs.mkdirs(reduceIn.getParent())) { throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString()); @@ -224,10 +224,10 @@ class LocalJobRunner implements JobSubmissionProtocol { } } finally { for (TaskAttemptID mapId: mapIds) { - this.mapoutputFile.removeAll(mapId); + this.mapoutputFile.removeAll(); } if (numReduceTasks == 1) { - this.mapoutputFile.removeAll(reduceId); + this.mapoutputFile.removeAll(); } } // delete the temporary directory in output directory diff --git src/java/org/apache/hadoop/mapred/MapOutputFile.java src/java/org/apache/hadoop/mapred/MapOutputFile.java index 30b71c9..0af1151 100644 --- src/java/org/apache/hadoop/mapred/MapOutputFile.java +++ src/java/org/apache/hadoop/mapred/MapOutputFile.java @@ -31,7 +31,9 @@ class MapOutputFile { private JobConf conf; private JobID jobId; - + + static final String INPUT_FILE_FORMAT_STRING = "%s/map_%d.out"; + MapOutputFile() { } @@ -42,132 +44,151 @@ class MapOutputFile { private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); - /** Return the path to local map output file created earlier - * @param mapTaskId a map task id + /** + * Return the path to local map output file created earlier + * + * @return path + * @throws IOException */ - public Path getOutputFile(TaskAttemptID mapTaskId) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out", conf); + public Path getOutputFile() + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker + .getBaseIntermediateOutputDir() + + "/file.out", conf); } - /** Create a local map output file name. - * @param mapTaskId a map task id + /** + * Create a local map output file name. + * * @param size the size of the file + * @return path + * @throws IOException */ - public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out", size, conf); + public Path getOutputFileForWrite(long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker + .getBaseIntermediateOutputDir() + + "/file.out", size, conf); } - /** Return the path to a local map output index file created earlier - * @param mapTaskId a map task id + /** + * Return the path to a local map output index file created earlier + * + * @return path + * @throws IOException */ - public Path getOutputIndexFile(TaskAttemptID mapTaskId) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out.index", conf); + public Path getOutputIndexFile() + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker + .getBaseIntermediateOutputDir() + + "/file.out.index", conf); } - /** Create a local map output index file name. - * @param mapTaskId a map task id + /** + * Create a local map output index file name. + * * @param size the size of the file + * @return path + * @throws IOException */ - public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size) - throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/file.out.index", - size, conf); + public Path getOutputIndexFileForWrite(long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker + .getBaseIntermediateOutputDir() + + "/file.out.index", size, conf); } - /** Return a local map spill file created earlier. - * @param mapTaskId a map task id + /** + * Return a local map spill file created earlier. + * * @param spillNumber the number + * @return path + * @throws IOException */ - public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" - + spillNumber + ".out", conf); + public Path getSpillFile(int spillNumber) + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker + .getBaseIntermediateOutputDir() + + "/spill" + spillNumber + ".out", conf); } - /** Create a local map spill file name. - * @param mapTaskId a map task id + /** + * Create a local map spill file name. + * * @param spillNumber the number * @param size the size of the file + * @return path + * @throws IOException */ - public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + - spillNumber + ".out", size, conf); + public Path getSpillFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker + .getBaseIntermediateOutputDir() + + "/spill" + spillNumber + ".out", size, conf); } - /** Return a local map spill index file created earlier - * @param mapTaskId a map task id + /** + * Return a local map spill index file created earlier + * * @param spillNumber the number + * @return path + * @throws IOException */ - public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber) - throws IOException { - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + - spillNumber + ".out.index", conf); + public Path getSpillIndexFile(int spillNumber) + throws IOException { + return lDirAlloc.getLocalPathToRead(TaskTracker + .getBaseIntermediateOutputDir() + + "/spill" + spillNumber + ".out.index", conf); } - /** Create a local map spill index file name. - * @param mapTaskId a map task id + /** + * Create a local map spill index file name. + * * @param spillNumber the number * @param size the size of the file + * @return path + * @throws IOException */ - public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber, - long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), mapTaskId.toString()) - + "/spill" + spillNumber + - ".out.index", size, conf); + public Path getSpillIndexFileForWrite(int spillNumber, long size) + throws IOException { + return lDirAlloc.getLocalPathForWrite(TaskTracker + .getBaseIntermediateOutputDir() + + "/spill" + spillNumber + ".out.index", size, conf); } - /** Return a local reduce input file created earlier - * @param mapTaskId a map task id - * @param reduceTaskId a reduce task id + /** + * Return a local reduce input file created earlier + * + * @param mapId a map task id + * @return path + * @throws IOException */ - public Path getInputFile(int mapId, TaskAttemptID reduceTaskId) - throws IOException { - // TODO *oom* should use a format here - return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( - jobId.toString(), reduceTaskId.toString()) - + "/map_" + mapId + ".out", - conf); + public Path getInputFile(int mapId) + throws IOException { + String outputDir = TaskTracker.getBaseIntermediateOutputDir(); + return lDirAlloc.getLocalPathToRead(String.format( + INPUT_FILE_FORMAT_STRING, outputDir, Integer.valueOf(mapId)), conf); } - /** Create a local reduce input file name. - * @param mapTaskId a map task id - * @param reduceTaskId a reduce task id + /** + * Create a local reduce input file name. + * + * @param mapId a map task id * @param size the size of the file + * @return path + * @throws IOException */ - public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, - long size) - throws IOException { - // TODO *oom* should use a format here - return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir( - jobId.toString(), reduceTaskId.toString()) - + "/map_" + mapId.getId() + ".out", - size, conf); + public Path getInputFileForWrite(TaskID mapId, long size) + throws IOException { + String outputDir = TaskTracker.getBaseIntermediateOutputDir(); + return lDirAlloc.getLocalPathForWrite(String.format( + INPUT_FILE_FORMAT_STRING, outputDir, mapId.getId()), size, conf); } /** Removes all of the files related to a task. */ - public void removeAll(TaskAttemptID taskId) throws IOException { - conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir( - jobId.toString(), taskId.toString()) -); + public void removeAll() + throws IOException { + conf.deleteLocalFiles(TaskTracker.getBaseIntermediateOutputDir()); } public void setConf(Configuration conf) { diff --git src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/MapTask.java index 5f8b88b..96dd469 100644 --- src/java/org/apache/hadoop/mapred/MapTask.java +++ src/java/org/apache/hadoop/mapred/MapTask.java @@ -1164,8 +1164,8 @@ class MapTask extends Task { try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); - final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), - numSpills, size); + final Path filename = + mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) @@ -1229,9 +1229,9 @@ class MapTask extends Task { if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file - Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( - getTaskID(), numSpills, - partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); + Path indexFilename = + mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions + * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); @@ -1258,8 +1258,8 @@ class MapTask extends Task { try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); - final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), - numSpills, size); + final Path filename = + mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); // we don't run the combiner for a single record @@ -1295,9 +1295,9 @@ class MapTask extends Task { } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file - Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( - getTaskID(), numSpills, - partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); + Path indexFilename = + mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions + * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); @@ -1387,14 +1387,14 @@ class MapTask extends Task { final TaskAttemptID mapId = getTaskID(); for(int i = 0; i < numSpills; i++) { - filename[i] = mapOutputFile.getSpillFile(mapId, i); + filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out")); if (indexCacheList.size() == 0) { - rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0), + rfs.rename(mapOutputFile.getSpillIndexFile(0), new Path(filename[0].getParent(),"file.out.index")); } else { indexCacheList.get(0).writeToFile( @@ -1405,7 +1405,7 @@ class MapTask extends Task { // read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { - Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i); + Path indexFileName = mapOutputFile.getSpillIndexFile(i); indexCacheList.add(new SpillRecord(indexFileName, job)); } @@ -1413,10 +1413,10 @@ class MapTask extends Task { //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; - Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId, - finalOutFileSize); - Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite( - mapId, finalIndexFileSize); + Path finalOutputFile = + mapOutputFile.getOutputFileForWrite(finalOutFileSize); + Path finalIndexFile = + mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); diff --git src/java/org/apache/hadoop/mapred/MapTaskRunner.java src/java/org/apache/hadoop/mapred/MapTaskRunner.java index 918b2a6..94750d0 100644 --- src/java/org/apache/hadoop/mapred/MapTaskRunner.java +++ src/java/org/apache/hadoop/mapred/MapTaskRunner.java @@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner { return false; } - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); return true; } /** Delete all of the temporary map output files. */ public void close() throws IOException { LOG.info(getTask()+" done; removing files."); - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); } } diff --git src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java index 64780db..591b5b1 100644 --- src/java/org/apache/hadoop/mapred/ReduceTask.java +++ src/java/org/apache/hadoop/mapred/ReduceTask.java @@ -208,7 +208,7 @@ class ReduceTask extends Task { if (isLocal) { // for local jobs for(int i = 0; i < numMaps; ++i) { - fileList.add(mapOutputFile.getInputFile(i, getTaskID())); + fileList.add(mapOutputFile.getInputFile(i)); } } else { // for non local jobs @@ -1276,12 +1276,9 @@ class ReduceTask extends Task { // a temp filename. If this file gets created in ramfs, we're fine, // else, we will check the localFS to find a suitable final location // for this path - TaskAttemptID reduceId = reduceTask.getTaskID(); - Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir( - reduceId.getJobID().toString(), - reduceId.toString()) - + "/map_" + - loc.getTaskId().getId() + ".out"); + Path filename = + new Path("/" + TaskTracker.getBaseIntermediateOutputDir() + + "/map_" + loc.getTaskId().getId() + ".out"); // Copy the map output to a temp file whose name is unique to this attempt Path tmpMapOutput = new Path(filename+"-"+id); @@ -1332,6 +1329,8 @@ class ReduceTask extends Task { throw new IOException("Failed to rename map output " + tmpMapOutput + " to " + filename); } + // Secure permissions from the tmpMapOutput still remain. No need + // for doing anything more. synchronized (mapOutputFilesOnDisk) { addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename)); @@ -2282,8 +2281,8 @@ class ReduceTask extends Task { sortPhaseFinished = true; // must spill to disk, but can't retain in-mem for intermediate merge - final Path outputPath = mapOutputFile.getInputFileForWrite(mapId, - reduceTask.getTaskID(), inMemToDiskBytes); + final Path outputPath = + mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes); final RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, @@ -2577,8 +2576,8 @@ class ReduceTask extends Task { long mergeOutputSize = createInMemorySegments(inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); - Path outputPath = mapOutputFile.getInputFileForWrite(mapId, - reduceTask.getTaskID(), mergeOutputSize); + Path outputPath = + mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize); Writer writer = new Writer(conf, rfs, outputPath, diff --git src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java index 903354e..fc6d1ae 100644 --- src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java +++ src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java @@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunner { } // cleanup from failures - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); return true; } @@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunner { public void close() throws IOException { LOG.info(getTask()+" done; removing files."); getTask().getProgress().setStatus("closed"); - mapOutputFile.removeAll(getTask().getTaskID()); + mapOutputFile.removeAll(); } } diff --git src/java/org/apache/hadoop/mapred/SpillRecord.java src/java/org/apache/hadoop/mapred/SpillRecord.java index 7595898..b9c9b12 100644 --- src/java/org/apache/hadoop/mapred/SpillRecord.java +++ src/java/org/apache/hadoop/mapred/SpillRecord.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; @@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DiskChecker; import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH; diff --git src/java/org/apache/hadoop/mapred/TaskController.java src/java/org/apache/hadoop/mapred/TaskController.java index 030bf6b..1b044e2 100644 --- src/java/org/apache/hadoop/mapred/TaskController.java +++ src/java/org/apache/hadoop/mapred/TaskController.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import org.apache.commons.logging.Log; @@ -24,7 +25,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.DiskChecker.PermissionsInfo; import org.apache.hadoop.util.Shell.ShellCommandExecutor; /** @@ -45,17 +48,55 @@ abstract class TaskController implements Configurable { public Configuration getConf() { return conf; } - + + // The list of directory paths specified in the + // variable mapred.local.dir. This is used to determine + // which among the list of directories is picked up + // for storing data for a particular task. + protected String[] mapredLocalDirs; + public void setConf(Configuration conf) { this.conf = conf; + mapredLocalDirs = conf.getStrings("mapred.local.dir"); } - - /** - * Setup task controller component. + + private static final String FILE_PERMISSIONS = "ugo+rwx"; + +/** + * Sets up the permissions of the following directories: + * + * Job cache directory Archive directory Hadoop log directories * */ - abstract void setup(); - + void setup() { + // set up job cache directory and associated permissions + String localDirs[] = this.mapredLocalDirs; + for (String localDir : localDirs) { + // Cache root + File cacheDirectory = new File(localDir, TaskTracker.getCacheSubdir()); + File jobCacheDirectory = + new File(localDir, TaskTracker.getJobCacheSubdir()); + if (!cacheDirectory.exists()) { + if (!cacheDirectory.mkdirs()) { + LOG.warn("Unable to create cache directory : " + + cacheDirectory.getPath()); + } + DiskChecker + .setPermissions(cacheDirectory, DiskChecker.sevenSevenSeven); + } + if (!jobCacheDirectory.exists()) { + if (!jobCacheDirectory.mkdirs()) { + LOG.warn("Unable to create job cache directory : " + + jobCacheDirectory.getPath()); + DiskChecker.setPermissions(jobCacheDirectory, + DiskChecker.sevenFiveFive); + } + } + } + // setting up permissions for user logs + File taskLog = TaskLog.getUserLogDir(); + DiskChecker.setPermissions(taskLog, DiskChecker.sevenFiveFive); + } /** * Launch a task JVM @@ -65,7 +106,7 @@ abstract class TaskController implements Configurable { */ abstract void launchTaskJVM(TaskControllerContext context) throws IOException; - + /** * Top level cleanup a task JVM method. * @@ -90,23 +131,21 @@ abstract class TaskController implements Configurable { } killTask(context); } - - /** - * Perform initializing actions required before a task can run. - * - * For instance, this method can be used to setup appropriate - * access permissions for files and directories that will be - * used by tasks. Tasks use the job cache, log, PID and distributed cache - * directories and files as part of their functioning. Typically, - * these files are shared between the daemon and the tasks - * themselves. So, a TaskController that is launching tasks - * as different users can implement this method to setup - * appropriate ownership and permissions for these directories - * and files. - */ - abstract void initializeTask(TaskControllerContext context); - - + + /** Perform initializing actions required before a task can run. + * + * For instance, this method can be used to setup appropriate + * access permissions for files and directories that will be + * used by tasks. Tasks use the job cache, log, PID and distributed cache + * directories and files as part of their functioning. Typically, + * these files are shared between the daemon and the tasks + * themselves. So, a TaskController that is launching tasks + * as different users can implement this method to setup + * appropriate ownership and permissions for these directories + * and files. + */ + abstract void initializeTask(TaskControllerContext context) throws IOException; + /** * Contains task information required for the task controller. */ @@ -122,14 +161,6 @@ abstract class TaskController implements Configurable { // waiting time before sending SIGKILL to task JVM after sending SIGTERM long sleeptimeBeforeSigkill; } - - /** - * Method which is called after the job is localized so that task controllers - * can implement their own job localization logic. - * - * @param tip Task of job for which localization happens. - */ - abstract void initializeJob(JobID jobId); /** * Sends a graceful terminate signal to taskJVM and it sub-processes. @@ -144,6 +175,24 @@ abstract class TaskController implements Configurable { * * @param context task context */ - abstract void killTask(TaskControllerContext context); + + /** + * Take task-controller specific actions to finalize task directories once the + * task finishes + * + * @param context + * @throws IOException + */ + abstract void finalizeTaskDirs(TaskControllerContext context) + throws IOException; + + /** + * Take task-controller specific actions to finalize job on its finishing + * + * @param context + * @throws IOException + */ + abstract void finalizeJob(JobID jobId, String workDir, String user) + throws IOException; } diff --git src/java/org/apache/hadoop/mapred/TaskLog.java src/java/org/apache/hadoop/mapred/TaskLog.java index 9baafd8..b2aca7c 100644 --- src/java/org/apache/hadoop/mapred/TaskLog.java +++ src/java/org/apache/hadoop/mapred/TaskLog.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; import org.apache.log4j.Appender; @@ -55,8 +56,7 @@ public class TaskLog { LogFactory.getLog(TaskLog.class); private static final File LOG_DIR = - new File(System.getProperty("hadoop.log.dir"), - "userlogs").getAbsoluteFile(); + new File(getBaseLogDir(), "userlogs").getAbsoluteFile(); static LocalFileSystem localFS = null; static { @@ -156,8 +156,12 @@ public class TaskLog { return new File(getBaseDir(taskid), "log.index"); } } - - private static File getBaseDir(String taskid) { + + static String getBaseLogDir() { + return System.getProperty("hadoop.log.dir"); + } + + static File getBaseDir(String taskid) { return new File(LOG_DIR, taskid); } private static long prevOutLength; @@ -196,6 +200,10 @@ public class TaskLog { Path indexFilePath = new Path(indexFile.getAbsolutePath()); Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath()); localFS.rename (tmpIndexFilePath, indexFilePath); + + // TODO: Not needed after MAPREDUCEE-AAAA + // Retain the original permissions for the newly created file + localFS.setPermission(indexFilePath, new FsPermission((short)00644)); } private static void resetPrevLengths(TaskAttemptID firstTaskid) { prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length(); diff --git src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskRunner.java index 86c5868..8e3e320 100644 --- src/java/org/apache/hadoop/mapred/TaskRunner.java +++ src/java/org/apache/hadoop/mapred/TaskRunner.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -135,12 +136,6 @@ abstract class TaskRunner extends Thread { // start with same classpath as parent process appendSystemClasspaths(classPaths); - if (!workDir.mkdirs()) { - if (!workDir.isDirectory()) { - LOG.fatal("Mkdirs failed to create " + workDir.toString()); - } - } - // include the user specified classpath appendJobJarClasspaths(conf.getJar(), classPaths); @@ -227,6 +222,9 @@ abstract class TaskRunner extends Thread { if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } + // attempt's work dir is already private, no need for setting private + // permissions for the tmp directory inside. + vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); // Add classpath. @@ -274,9 +272,13 @@ abstract class TaskRunner extends Thread { // Set up the redirection of the task's stdout and stderr streams File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT); File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR); - boolean b = stdout.getParentFile().mkdirs(); + File logDir = stdout.getParentFile(); + boolean b = logDir.mkdirs(); if (!b) { LOG.warn("mkdirs failed. Ignoring"); + } else { + // TODO: This is not needed once MAPREDUCE-AAAA + DiskChecker.setPermissions(logDir, DiskChecker.sevenSevenSeven); } tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr); @@ -389,10 +391,10 @@ abstract class TaskRunner extends Thread { static File formWorkDir(LocalDirAllocator lDirAlloc, TaskAttemptID task, boolean isCleanup, JobConf conf) throws IOException { - File workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalTaskDir(task.getJobID().toString(), - task.toString(), isCleanup) - + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString()); + File workDir = + new File(lDirAlloc.getLocalPathToRead( + TaskTracker.getTaskWorkDir(task.getJobID().toString(), task + .toString(), isCleanup), conf).toString()); return workDir; } @@ -460,7 +462,11 @@ abstract class TaskRunner extends Thread { localFs.delete(localTaskFile, true); OutputStream out = localFs.create(localTaskFile); try { + String originalMapredLocalDir = conf.get("mapred.local.dir"); + conf.set("mapred.local.dir", conf.get("mapred.child.local.dir")); conf.writeXml(out); + // Reset back the original mapred.local.dir + conf.set("mapred.local.dir", originalMapredLocalDir); } finally { out.close(); } diff --git src/java/org/apache/hadoop/mapred/TaskTracker.java src/java/org/apache/hadoop/mapred/TaskTracker.java index c922ba3..f07b3b9 100644 --- src/java/org/apache/hadoop/mapred/TaskTracker.java +++ src/java/org/apache/hadoop/mapred/TaskTracker.java @@ -189,6 +189,9 @@ public class TaskTracker private static final String CACHEDIR = "archive"; private static final String JOBCACHE = "jobcache"; private static final String OUTPUT = "output"; + // TODO: fix name + private static final String TASKWORK = "task-work"; + private static final String JARSDIR = "jars"; private JobConf fConf; private FileSystem localFs; private int maxMapSlots; @@ -385,16 +388,34 @@ public class TaskTracker } static String getLocalJobDir(String jobid) { - return getJobCacheSubdir() + Path.SEPARATOR + jobid; + return getJobCacheSubdir() + Path.SEPARATOR + jobid; } static String getLocalTaskDir(String jobid, String taskid) { - return getLocalTaskDir(jobid, taskid, false) ; + return getLocalTaskDir(jobid, taskid, false); } + /** + * Intermediate output directory for a given task as seen by the child. + * TODO + * @param jobid + * @param taskid + * @return + */ + static String getBaseIntermediateOutputDir() { + return TaskTracker.OUTPUT; + } + + /** + * Intermediate output directory for a given task as seen by the TaskTracker. + * + * @param jobid + * @param taskid + * @return + */ static String getIntermediateOutputDir(String jobid, String taskid) { - return getLocalTaskDir(jobid, taskid) - + Path.SEPARATOR + TaskTracker.OUTPUT ; + return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR + + TaskTracker.OUTPUT; } static String getLocalTaskDir(String jobid, @@ -406,7 +427,18 @@ public class TaskTracker } return taskDir; } - + + static String getTaskWorkDir(String jobid, String taskid, + boolean isCleanupAttempt) { + String dir = + getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.TASKWORK + + Path.SEPARATOR + taskid; + if (isCleanupAttempt) { + dir = dir + TASK_CLEANUP_SUFFIX; + } + return dir + Path.SEPARATOR + MRConstants.WORKDIR; + } + String getPid(TaskAttemptID tid) { TaskInProgress tip = tasks.get(tid); if (tip != null) { @@ -567,7 +599,7 @@ public class TaskTracker * startup, to remove any leftovers from previous run. */ public void cleanupStorage() throws IOException { - this.fConf.deleteLocalFiles(); + //this.fConf.deleteLocalFiles(); } // Object on wait which MapEventsFetcherThread is going to wait. @@ -762,39 +794,39 @@ public class TaskTracker } catch(FileNotFoundException fe) { jobFileSize = -1; } - Path localJobFile = lDirAlloc.getLocalPathForWrite( + Path localJobFile = lDirAlloc.getPrivateLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "job.xml", jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) { if (!rjob.localized) { - + + // Initialize the job directories first FileSystem localFs = FileSystem.getLocal(fConf); - // this will happen on a partial execution of localizeJob. - // Sometimes the job.xml gets copied but copying job.jar - // might throw out an exception - // we should clean up and then try again - Path jobDir = localJobFile.getParent(); - if (localFs.exists(jobDir)){ - localFs.delete(jobDir, true); - boolean b = localFs.mkdirs(jobDir); - if (!b) - throw new IOException("Not able to create job directory " - + jobDir.toString()); - } + initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir")); + + // Download job.xml systemFS.copyToLocalFile(jobFile, localJobFile); + // Job dir is not private, set private permissions for job file. + DiskChecker.setPermissions(new File(localJobFile.toUri().getPath()), + DiskChecker.sevenZeroZero); + + // TODO: Fix this. TT config is not loaded at all! JobConf localJobConf = new JobConf(localJobFile); // create the 'work' directory // job-specific shared directory for use as scratch space - Path workDir = lDirAlloc.getLocalPathForWrite( + Path workDir = lDirAlloc.getPrivateLocalPathForWrite( (getLocalJobDir(jobId.toString()) + Path.SEPARATOR + MRConstants.WORKDIR), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); } + // Job dir is not private, set private permissions for work dir. + DiskChecker.setPermissions(new File(workDir.toUri().getPath()), + DiskChecker.sevenZeroZero); System.setProperty("job.local.dir", workDir.toString()); localJobConf.set("job.local.dir", workDir.toString()); @@ -811,35 +843,93 @@ public class TaskTracker } // Here we check for and we check five times the size of jarFileSize // to accommodate for unjarring the jar file in work directory - localJarFile = new Path(lDirAlloc.getLocalPathForWrite( - getLocalJobDir(jobId.toString()) - + Path.SEPARATOR + "jars", - 5 * jarFileSize, fConf), "job.jar"); - if (!localFs.mkdirs(localJarFile.getParent())) { - throw new IOException("Mkdirs failed to create jars directory "); - } + localJarFile = + lDirAlloc.getPrivateLocalPathForWrite(getLocalJobDir(jobId + .toString()) + + Path.SEPARATOR + "jars" + Path.SEPARATOR + "job.jar", + 5 * jarFileSize, fConf); + + // Download job.jar + // jars dir is already private. No need for setting private + // permissions for localJarFile systemFS.copyToLocalFile(jarFilePath, localJarFile); + localJobConf.setJar(localJarFile.toString()); OutputStream out = localFs.create(localJobFile); + // Job dir is not private, set private permissions for the re-created + // job file. + DiskChecker.setPermissions(new File(localJobFile.toUri().getPath()), + DiskChecker.sevenZeroZero); try { localJobConf.writeXml(out); } finally { out.close(); } - // also unjar the job.jar files - RunJar.unJar(new File(localJarFile.toString()), - new File(localJarFile.getParent().toString())); + // also unjar the job.jar files + // jars dir is already private. No need for setting private + // permissions for the un-jarred files. + RunJar.unJar(new File(localJarFile.toString()), new File( + localJarFile.getParent().toString())); } rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || localJobConf.getKeepFailedTaskFiles()); rjob.localized = true; rjob.jobConf = localJobConf; - taskController.initializeJob(jobId); } } launchTaskForJob(tip, new JobConf(rjob.jobConf)); } + /** + * Prepare the job directories for a given job. To be called only if the job + * is not already localized. + */ + private void initializeJobDirs(JobID jobId, FileSystem fs, String[] localDirs) + throws IOException { + boolean initStatus = true; + String jobDirPath = getLocalJobDir(jobId.toString()); + for (String localDir : localDirs) { + Path jobDir = new Path(localDir, jobDirPath); + if (fs.exists(jobDir)) { + // this will happen on a partial execution of localizeJob. + // Sometimes the job.xml gets copied but copying job.jar + // might throw out an exception + // we should clean up and then try again + fs.delete(jobDir, true); + } + + boolean jobDirStatus = fs.mkdirs(jobDir); + if (!jobDirStatus) { + LOG.warn("Not able to create job directory " + jobDir.toString()); + } + + Path taskWorkDir = new Path(jobDir, TaskTracker.TASKWORK); + if (!fs.mkdirs(taskWorkDir)) { + LOG.warn("Not able to create job's " + TaskTracker.TASKWORK + + " directory for " + jobDir.toString()); + } + + Path jarsDir = new Path(jobDir, TaskTracker.JARSDIR); + if (!fs.mkdirs(jarsDir)) { + LOG.warn("Not able to create job's " + TaskTracker.JARSDIR + + " directory for " + jobDir.toString()); + } + + initStatus = initStatus || jobDirStatus; + // TODO: fix return status for the following. + DiskChecker.setPermissions(new File(jobDir.toUri().getPath()), + DiskChecker.sevenFiveFive); + DiskChecker.setPermissions(new File(taskWorkDir.toUri().getPath()), + DiskChecker.sevenFiveFive); + DiskChecker.setPermissions(new File(jarsDir.toUri().getPath()), + DiskChecker.sevenZeroZero); + } + if (!initStatus) { + throw new IOException("Not able to initialize job directories " + + "in any of the configured local directories for job " + jobId.toString()); + } + } + private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{ synchronized (tip) { tip.setJobConf(jobConf); @@ -1381,6 +1471,16 @@ public class TaskTracker indexCache.removeMap(tip.getTask().getTaskID().toString()); } } + // Finalize the job directories + try { + taskController.finalizeJob(jobId, TaskTracker.getLocalJobDir(jobId + .toString()) + + Path.SEPARATOR + "work", rjob.jobConf.getUser()); + } catch (IOException ioe) { + LOG.warn("Finalize job failed with the exception : " + + StringUtils.stringifyException(ioe)); + } + // Delete the job directory for this // task if the job is done/failed if (!rjob.keepJobFiles){ @@ -1557,7 +1657,7 @@ public class TaskTracker mapOutputFile.setJobId(taskId.getJobID()); mapOutputFile.setConf(conf); - Path tmp_output = mapOutputFile.getOutputFile(taskId); + Path tmp_output = mapOutputFile.getOutputFile(); if(tmp_output == null) return 0; FileSystem localFS = FileSystem.getLocal(conf); @@ -1836,7 +1936,7 @@ public class TaskTracker private void localizeTask(Task task) throws IOException{ Path localTaskDir = - lDirAlloc.getLocalPathForWrite( + lDirAlloc.getPrivateLocalPathForWrite( TaskTracker.getLocalTaskDir(task.getJobID().toString(), task.getTaskID().toString(), task.isTaskCleanupTask()), defaultJobConf ); @@ -1846,6 +1946,9 @@ public class TaskTracker throw new IOException("Mkdirs failed to create " + localTaskDir.toString()); } + // job dir is not private, set private permissions for attempt-dir + DiskChecker.setPermissions(new File(localTaskDir.toUri().getPath()), + DiskChecker.sevenZeroZero); // create symlink for ../work if it already doesnt exist String workDir = lDirAlloc.getLocalPathToRead( @@ -1855,24 +1958,46 @@ public class TaskTracker String link = localTaskDir.getParent().toString() + Path.SEPARATOR + "work"; File flink = new File(link); - if (!flink.exists()) + if (!flink.exists()) { FileUtil.symLink(workDir, link); - + // job dir is not private, set private permissions for the newly created + // link + DiskChecker.setPermissions(flink, DiskChecker.sevenZeroZero); + } + // create the working-directory of the task - Path cwd = lDirAlloc.getLocalPathForWrite( - getLocalTaskDir(task.getJobID().toString(), - task.getTaskID().toString(), task.isTaskCleanupTask()) - + Path.SEPARATOR + MRConstants.WORKDIR, - defaultJobConf); + Path cwd = + lDirAlloc.getPrivateLocalPathForWrite(getTaskWorkDir(task.getJobID() + .toString(), task.getTaskID().toString(), task + .isTaskCleanupTask()), defaultJobConf); if (!localFs.mkdirs(cwd)) { throw new IOException("Mkdirs failed to create " + cwd.toString()); } + // attempt dir is not private, no need for setting private permissions for + // the task workDir. Path localTaskFile = new Path(localTaskDir, "job.xml"); task.setJobFile(localTaskFile.toString()); localJobConf.set("mapred.local.dir", fConf.get("mapred.local.dir")); + + // Prepare the mapred.local.dir for the child. The child is + // sand-boxed now. Whenever it uses LocalDirAllocator from now on, it + // will only see files inside the attempt-directory. + String childMapredLocalDir = ""; + LOG.info("mapred.local.dir for child before changing : " + + fConf.get("mapred.local.dir")); + for (String mapredLocalDir : fConf.getStrings("mapred.local.dir")) { + childMapredLocalDir += + mapredLocalDir + + Path.SEPARATOR + + TaskTracker.getLocalTaskDir(task.getJobID().toString(), task + .getTaskID().toString(), task.isJobCleanupTask()) + ","; + } + LOG.info("mapred.child.local.dir for child : " + childMapredLocalDir); + localJobConf.set("mapred.child.local.dir", childMapredLocalDir); + if (fConf.get("slave.host.name") != null) { localJobConf.set("slave.host.name", fConf.get("slave.host.name")); @@ -1914,11 +2039,19 @@ public class TaskTracker localJobConf.setNumTasksToExecutePerJvm(1); } OutputStream out = localFs.create(localTaskFile); + // attempt dir is already private, no need for setting private permissions + // for the newly created localTaskFile. + try { + localJobConf.set("mapred.local.dir", localJobConf + .get("mapred.child.local.dir")); localJobConf.writeXml(out); + // Reset back the original mapred.local.dir + localJobConf.set("mapred.local.dir", fConf.get("mapred.local.dir")); } finally { out.close(); } + task.setConf(localJobConf); } @@ -2049,6 +2182,22 @@ public class TaskTracker * The task is reporting that it's done running */ public synchronized void reportDone() { + + // Finalize the task directories if needed. + if (localJobConf.getNumTasksToExecutePerJvm() != 1) { + // JVMs are reused. Task succeeded. Try finalizing the task directories. + try { + jvmManager.finalizeTaskDirs(runner); + } catch (IOException e) { + LOG.warn("Finalizing task-directories failed with the exception : " + + StringUtils.stringifyException(e)); + } + } else { + // JVMs are not reused, we do task-directories' finalization when the + // JVM finishes. See the final block in JvmRunner.runChild() + ; // do nothing + } + if (isCleaningup()) { if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) { this.taskStatus.setRunState(TaskStatus.State.FAILED); @@ -2165,13 +2314,12 @@ public class TaskTracker } File workDir = null; try { - workDir = new File(lDirAlloc.getLocalPathToRead( - TaskTracker.getLocalTaskDir( - task.getJobID().toString(), - task.getTaskID().toString(), - task.isTaskCleanupTask()) - + Path.SEPARATOR + MRConstants.WORKDIR, - localJobConf). toString()); + workDir = + new File(lDirAlloc.getLocalPathToRead( + TaskTracker.getTaskWorkDir(task.getJobID().toString(), + task.getTaskID().toString(), task + .isTaskCleanupTask()), localJobConf) + .toString()); } catch (IOException e) { LOG.warn("Working Directory of the task " + task.getTaskID() + "doesnt exist. Caught exception " + @@ -2451,6 +2599,8 @@ public class TaskTracker } String taskDir = getLocalTaskDir(task.getJobID().toString(), taskId.toString(), task.isTaskCleanupTask()); + String taskWorkDir = getTaskWorkDir(task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()); if (needCleanup) { if (runner != null) { //cleans up the output directory of the task (where map outputs @@ -2465,18 +2615,25 @@ public class TaskTracker directoryCleanupThread.addToQueue(localFs, getLocalFiles(defaultJobConf, taskDir)); + directoryCleanupThread.addToQueue(localFs, + getLocalFiles(defaultJobConf, + taskWorkDir)); } else { directoryCleanupThread.addToQueue(localFs, getLocalFiles(defaultJobConf, taskDir+"/job.xml")); + // TODO: fix + directoryCleanupThread.addToQueue(localFs, + getLocalFiles(defaultJobConf, + taskWorkDir+"/taskjvm.sh")); } } else { if (localJobConf.getNumTasksToExecutePerJvm() == 1) { directoryCleanupThread.addToQueue(localFs, getLocalFiles(defaultJobConf, - taskDir+"/work")); + taskWorkDir)); } } } catch (Throwable ie) { diff --git src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java index 521c758..d7e8b96 100644 --- src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java +++ src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java @@ -97,7 +97,7 @@ public class ClusterWithLinuxTaskController extends TestCase { MyLinuxTaskController.class.getName()); mrCluster = new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri() - .toString(), 1, null, null, conf); + .toString(), 4, null, null, conf); // Get the configured taskcontroller-path String path = System.getProperty("taskcontroller-path"); @@ -143,8 +143,18 @@ public class ClusterWithLinuxTaskController extends TestCase { PrintWriter writer = new PrintWriter(new FileOutputStream(configurationFile)); - writer.println(String.format("mapred.local.dir=%s", mrCluster - .getTaskTrackerLocalDir(0))); + StringBuffer sb = new StringBuffer(); + String[] localDirs = mrCluster.getTaskTrackerRunner(0).getLocalDirs(); + for(int i = 0 ; i < localDirs.length; i++) { + sb.append(localDirs[i]); + if((i + 1) != localDirs.length) { + sb.append(","); + } + } + writer.println(String.format("mapred.local.dir=%s", sb.toString())); + + writer + .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir())); writer.flush(); writer.close(); diff --git src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java index 5c26fd1..5f2c49f 100644 --- src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java +++ src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java @@ -171,7 +171,7 @@ public class MiniMRCluster { StringBuffer localPath = new StringBuffer(); for(int i=0; i < numDir; ++i) { File ttDir = new File(localDirBase, - Integer.toString(trackerId) + "_" + 0); + Integer.toString(trackerId) + "_" + i); if (!ttDir.mkdirs()) { if (!ttDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + ttDir); diff --git src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java index f923f88..c75da59 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java +++ src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java @@ -20,8 +20,10 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; /** * Test a java-based mapred job with LinuxTaskController running the jobs as a @@ -39,13 +41,36 @@ public class TestJobExecutionAsDifferentUser extends startCluster(); Path inDir = new Path("input"); Path outDir = new Path("output"); - RunningJob job = - UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir); + + RunningJob job; + + // Run a job with zero maps/reduces + job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); + + // Run a job with 1 map and zero reduces + job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0); + job.waitForCompletion(); assertTrue("Job failed", job.isSuccessful()); assertOwnerShip(outDir); + + // Run a normal job with maps/reduces + job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1); + job.waitForCompletion(); + assertTrue("Job failed", job.isSuccessful()); + assertOwnerShip(outDir); + + // Run a job with jvm reuse + JobConf myConf = getClusterConf(); + myConf.set("mapred.job.reuse.jvm.num.tasks", "-1"); + String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" }; + assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args)); + // TODO: check reuse } - public void testEnvironment() throws IOException { + public void TestEnvironment() throws IOException { if (!shouldRun()) { return; } diff --git src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java index 37c2c94..99633d9 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java +++ src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java @@ -348,15 +348,26 @@ public class TestKillSubProcesses extends TestCase { FileSystem fs = FileSystem.getLocal(conf); TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath(); scriptDir = new Path(TEST_ROOT_DIR + "/script"); - if(fs.exists(scriptDir)){ + if (fs.exists(scriptDir)) { fs.delete(scriptDir, true); } + + // Create the directory and set open permissions so that the TT can + // access. + fs.mkdirs(scriptDir); + fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL, + FsAction.ALL)); + // create shell script Random rm = new Random(); Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt() + ".sh"); String shellScript = scriptPath.toString(); + + // Construct the script. Set umask to 0000 so that TT can access all the + // files. String script = + "umask 0000\n" + "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" + "echo hello\n" + "trap 'echo got SIGTERM' 15 \n" + diff --git src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java index 665dc33..2f70803 100644 --- src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java +++ src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java @@ -289,7 +289,7 @@ public class TestMapRed extends TestCase { first = false; MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID()); mapOutputFile.setConf(conf); - Path input = mapOutputFile.getInputFile(0, taskId); + Path input = mapOutputFile.getInputFile(0); FileSystem fs = FileSystem.get(conf); assertTrue("reduce input exists " + input, fs.exists(input)); SequenceFile.Reader rdr =