diff --git build.xml build.xml
index f9e0694..e69d869 100644
--- build.xml
+++ build.xml
@@ -1658,5 +1658,16 @@
-
+
+
+
+
+
+
+
+
+
+
diff --git conf/taskcontroller.cfg conf/taskcontroller.cfg
deleted file mode 100644
index fb4cced..0000000
--- conf/taskcontroller.cfg
+++ /dev/null
@@ -1,3 +0,0 @@
-mapred.local.dir=#configured value of hadoop.tmp.dir it can be a list of paths comma seperated
-hadoop.pid.dir=#configured HADOOP_PID_DIR
-hadoop.indent.str=#configured HADOOP_IDENT_STR
diff --git lib/hadoop-core-0.21.0-dev.jar lib/hadoop-core-0.21.0-dev.jar
index 9494544..8107ec7 100644
Binary files lib/hadoop-core-0.21.0-dev.jar and lib/hadoop-core-0.21.0-dev.jar differ
diff --git src/c++/task-controller/Makefile.in src/c++/task-controller/Makefile.in
index ba35f96..83f418f 100644
--- src/c++/task-controller/Makefile.in
+++ src/c++/task-controller/Makefile.in
@@ -16,9 +16,11 @@
# limitations under the License.
#
OBJS=main.o task-controller.o configuration.o
+TESTOBJS=test-task-controller.o task-controller.o configuration.o
CC=@CC@
CFLAGS = @CFLAGS@
BINARY=task-controller
+TESTBINARY=test-task-controller
installdir = @prefix@
@@ -34,9 +36,15 @@ task-controller.o: task-controller.c task-controller.h
configuration.o: configuration.h configuration.c
$(CC) $(CFLAG) -o configuration.o -c configuration.c
+test-task-controller.o: task-controller.c task-controller.h
+ $(CC) $(CFLAG) -o test-task-controller.o -c test-task-controller.c
+
+test: $(TESTOBJS)
+ $(CC) $(CFLAG) -o $(TESTBINARY) $(TESTOBJS)
+ cp $(TESTBINARY) $(installdir)
clean:
- rm -rf $(BINARY) $(OBJS)
+ rm -rf $(BINARY) $(OBJS) $(TESTOBJS)
install: all
cp $(BINARY) $(installdir)
diff --git src/c++/task-controller/configuration.c src/c++/task-controller/configuration.c
index 05b267a..b6abbf5 100644
--- src/c++/task-controller/configuration.c
+++ src/c++/task-controller/configuration.c
@@ -71,9 +71,8 @@ void get_configs() {
snprintf(file_name, str_len, CONF_FILE_PATTERN, HADOOP_CONF_DIR);
#endif
-#ifdef DEBUG
- fprintf(LOGFILE,"get_configs :Conf file name is : %s \n", file_name);
-#endif
+ fprintf(LOGFILE, "get_configs :Conf file name is : %s \n", file_name);
+
//allocate space for ten configuration items.
config.confdetails = (struct confentry **) malloc(sizeof(struct confentry *)
* MAX_SIZE);
@@ -87,7 +86,7 @@ void get_configs() {
while(!feof(conf_file)) {
line = (char *) malloc(linesize);
if(line == NULL) {
- fprintf(LOGFILE,"malloc failed while reading configuration file.\n");
+ fprintf(LOGFILE, "malloc failed while reading configuration file.\n");
goto cleanup;
}
size_read = getline(&line,&linesize,conf_file);
@@ -123,9 +122,9 @@ void get_configs() {
"Failed allocating memory for single configuration item\n");
goto cleanup;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"get_configs : Adding conf key : %s \n", equaltok);
-#endif
+
+ fprintf(LOGFILE, "get_configs : Adding conf key : %s \n", equaltok);
+
memset(config.confdetails[config.size], 0, sizeof(struct confentry));
config.confdetails[config.size]->key = (char *) malloc(
sizeof(char) * (strlen(equaltok)+1));
@@ -142,9 +141,9 @@ void get_configs() {
free(config.confdetails[config.size]);
continue;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"get_configs : Adding conf value : %s \n", equaltok);
-#endif
+
+ fprintf(LOGFILE, "get_configs : Adding conf value : %s \n", equaltok);
+
config.confdetails[config.size]->value = (char *) malloc(
sizeof(char) * (strlen(equaltok)+1));
strcpy((char *)config.confdetails[config.size]->value, equaltok);
@@ -184,8 +183,7 @@ void get_configs() {
* array, next time onwards used the populated array.
*
*/
-
-const char * get_value(char* key) {
+const char * get_value(const char* key) {
int count;
if (config.size == 0) {
get_configs();
@@ -196,15 +194,19 @@ const char * get_value(char* key) {
}
for (count = 0; count < config.size; count++) {
if (strcmp(config.confdetails[count]->key, key) == 0) {
- return config.confdetails[count]->value;
+ return strdup(config.confdetails[count]->value);
}
}
return NULL;
}
-const char ** get_values(char * key) {
+/**
+ * Function to return an array of values for a key.
+ * Value delimiter is assumed to be a comma.
+ */
+const char ** get_values(const char * key) {
const char ** toPass = NULL;
- const char * value = get_value(key);
+ const char *value = get_value(key);
char *tempTok = NULL;
char *tempstr = NULL;
int size = 0;
diff --git src/c++/task-controller/configuration.h.in src/c++/task-controller/configuration.h.in
index af8086b..ebdf4d8 100644
--- src/c++/task-controller/configuration.h.in
+++ src/c++/task-controller/configuration.h.in
@@ -53,10 +53,10 @@ extern struct configuration config;
extern char *hadoop_conf_dir;
#endif
//method exposed to get the configurations
-const char * get_value(char* key);
+const char * get_value(const char* key);
//method to free allocated configuration
void free_configurations();
//function to return array of values pointing to the key. Values are
//comma seperated strings.
-const char ** get_values(char* key);
+const char ** get_values(const char* key);
diff --git src/c++/task-controller/configure src/c++/task-controller/configure
old mode 100644
new mode 100755
diff --git src/c++/task-controller/configure.ac src/c++/task-controller/configure.ac
index 2fd52a8..8605ac8 100644
--- src/c++/task-controller/configure.ac
+++ src/c++/task-controller/configure.ac
@@ -38,7 +38,7 @@ AC_PROG_CC
# Checks for header files.
AC_HEADER_STDC
-AC_CHECK_HEADERS([stdlib.h string.h unistd.h])
+AC_CHECK_HEADERS([stdlib.h string.h unistd.h fcntl.h])
#check for HADOOP_CONF_DIR
@@ -50,12 +50,14 @@ fi
# Checks for typedefs, structures, and compiler characteristics.
AC_C_CONST
AC_TYPE_PID_T
+AC_TYPE_MODE_T
+AC_TYPE_SIZE_T
# Checks for library functions.
AC_FUNC_MALLOC
AC_FUNC_REALLOC
-AC_CHECK_FUNCS([strerror])
+AC_FUNC_CHOWN
+AC_CHECK_FUNCS([strerror memset mkdir rmdir strdup])
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
-
diff --git src/c++/task-controller/main.c src/c++/task-controller/main.c
index c19785a..13f13f6 100644
--- src/c++/task-controller/main.c
+++ src/c++/task-controller/main.c
@@ -17,6 +17,26 @@
*/
#include "task-controller.h"
+void open_log_file(char *log_file) {
+ if (log_file == NULL) {
+ LOGFILE = stdout;
+ } else {
+ LOGFILE = fopen(log_file, "a");
+ if (LOGFILE == NULL) {
+ fprintf(stdout, "Unable to open LOGFILE : %s \n", log_file);
+ LOGFILE = stdout;
+ }
+ if (LOGFILE != stdout) {
+ if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
+ | S_IRGRP | S_IWGRP) < 0) {
+ fprintf(stdout, "Unable to change permission of the log file %s \n",
+ log_file);
+ fprintf(stdout, "changing log file to stdout");
+ LOGFILE = stdout;
+ }
+ }
+ }
+}
int main(int argc, char **argv) {
int command;
@@ -24,6 +44,7 @@ int main(int argc, char **argv) {
const char * job_id = NULL;
const char * task_id = NULL;
const char * tt_root = NULL;
+ const char *log_dir = NULL;
int exit_code = 0;
const char * task_pid = NULL;
const char* const short_options = "l:";
@@ -35,7 +56,7 @@ int main(int argc, char **argv) {
//Minimum number of arguments required to run the task-controller
//command-name user command tt-root
if (argc < 3) {
- display_usage(stderr);
+ display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
@@ -54,24 +75,9 @@ int main(int argc, char **argv) {
break;
}
} while (next_option != -1);
- if (log_file == NULL) {
- LOGFILE = stderr;
- } else {
- LOGFILE = fopen(log_file, "a");
- if (LOGFILE == NULL) {
- fprintf(stderr, "Unable to open LOGFILE : %s \n", log_file);
- LOGFILE = stderr;
- }
- if (LOGFILE != stderr) {
- if (chmod(log_file, S_IREAD | S_IEXEC | S_IWRITE | S_IROTH | S_IWOTH
- | S_IRGRP | S_IWGRP) < 0) {
- fprintf(stderr, "Unable to change permission of the log file %s \n",
- log_file);
- fprintf(stderr, "changing log file to stderr");
- LOGFILE = stderr;
- }
- }
- }
+
+ open_log_file(log_file);
+
//checks done for user name
//checks done if the user is root or not.
if (argv[optind] == NULL) {
@@ -88,26 +94,50 @@ int main(int argc, char **argv) {
}
optind = optind + 1;
command = atoi(argv[optind++]);
-#ifdef DEBUG
+
fprintf(LOGFILE, "main : command provided %d\n",command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
-#endif
+
switch (command) {
case LAUNCH_TASK_JVM:
tt_root = argv[optind++];
job_id = argv[optind++];
task_id = argv[optind++];
+ log_dir = argv[optind++];
exit_code
- = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
+ = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root, log_dir);
+ break;
+ case INITIALIZE_TASK:
+ job_id = argv[optind++];
+ task_id = argv[optind++];
+ log_dir = argv[optind++];
+ exit_code = initialize_task(job_id, task_id, log_dir, user_detail->pw_name);
break;
case TERMINATE_TASK_JVM:
+ job_id = argv[optind++];
+ task_id = argv[optind++];
+ log_dir = argv[optind++];
task_pid = argv[optind++];
+ exit_code = finalize_task_dirs(job_id, task_id, log_dir, 1); // Not the first task.
+ // TODO: fix
+ open_log_file(log_file);
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM);
break;
case KILL_TASK_JVM:
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
+ case FINALIZE_TASK_DIRS:
+ job_id = argv[optind++];
+ task_id = argv[optind++];
+ log_dir = argv[optind++];
+ int first_task = atoi(argv[optind++]); // TODO: put in checks for -ve values.
+ exit_code = finalize_task_dirs(job_id, task_id, log_dir, first_task);
+ break;
+ case FINALIZE_JOB:
+ job_id = argv[optind++];
+ exit_code = finalize_job(job_id);
+ break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
diff --git src/c++/task-controller/task-controller.c src/c++/task-controller/task-controller.c
index 511f0cf..3e56f2b 100644
--- src/c++/task-controller/task-controller.c
+++ src/c++/task-controller/task-controller.c
@@ -71,104 +71,610 @@ int change_user(const char * user) {
return 0;
}
-// function to check if the passed tt_root is present in hadoop.tmp.dir
-int check_tt_root(const char *tt_root) {
- char ** mapred_local_dir;
- int found = -1;
+/**
+ * Checks the passed value for the variable config_key against the values in
+ * the configuration.
+ * Returns 0 if the passed value is found in the configuration,
+ * -1 otherwise
+ */
+int check_variable_against_config(const char *config_key,
+ const char *passed_value) {
- if (tt_root == NULL) {
+ if (config_key == NULL || passed_value == NULL) {
return -1;
}
- mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY);
+ int found = -1;
+
+ const char **config_value = get_values(config_key);
- if (mapred_local_dir == NULL) {
+ if (config_value == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", config_key);
return -1;
}
- while(*mapred_local_dir != NULL) {
- if(strcmp(*mapred_local_dir,tt_root) == 0) {
+ char **config_val_ptr = (char **) config_value;
+ while (*config_val_ptr != NULL) {
+ if (strcmp(*config_val_ptr, passed_value) == 0) {
found = 0;
break;
}
+ config_val_ptr++;
+ }
+
+ if (found != 0) {
+ fprintf(
+ LOGFILE,
+ "Invalid value passed: \
+ Configured value of %s is %s. \
+ Passed value is %s.\n",
+ config_key, get_value(config_key), passed_value);
}
- free(mapred_local_dir);
+ free(config_value);
return found;
}
/**
+ * Utility function to concatenate argB to argA using the concat_pattern
+ */
+char *concatenate(const char *argA, const char *argB, char *concat_pattern,
+ char *return_path_name) {
+ if (argA == NULL || argB == NULL) {
+ fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+ return_path_name);
+ return NULL;
+ }
+
+ char *return_path = NULL;
+ int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+
+ return_path = (char *) malloc(sizeof(char) * (str_len + 1));
+ if (return_path == NULL) {
+ fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+ return NULL;
+ }
+ memset(return_path, '\0', str_len + 1);
+ snprintf(return_path, str_len, concat_pattern, argA, argB);
+ return return_path;
+}
+
+/**
+ * Get the job-directory path from tt_root and job-id
+ */
+char *get_job_directory(const char * tt_root, const char *jobid) {
+ return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+}
+
+/**
+ * Get the job-jars directory from the job_dir
+ */
+char *get_job_jars_directory(const char *job_dir) {
+ return concatenate(job_dir, "", JOB_DIR_TO_JARS_PATTERN,
+ "job_jars_dir_path");
+}
+
+/**
+ * Get the work directory from the job_dir
+ */
+char *get_work_directory(const char *job_dir) {
+ return concatenate(job_dir, "", JOB_DIR_TO_WORK_DIR_PATTERN,
+ "job_work_dir_path");
+}
+
+/**
+ * Get the attempt directory for the given attempt_id
+ */
+char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
+ return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
+ "attempt_dir_path");
+}
+
+/**
+ * Get the task_work directory for the given attempt_id
+ */
+char *get_task_work_directory(const char *job_dir, const char *attempt_id) {
+ return concatenate(job_dir, attempt_id, JOB_DIR_TO_TASK_WORK_DIR_PATTERN,
+ "task_work_dir_path");
+}
+
+/*
+ * Get the path to the task launcher file which is created by the TT
+ */
+char *get_task_launcher_file(const char *attempt_dir) {
+ return concatenate(attempt_dir, "", ATTEMPT_DIR_TO_TASK_SCRIPT_PATTERN,
+ "task_script_path");
+}
+
+/**
+ * Get the log directory for the given attempt.
+ */
+char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
+ return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
+ "task_log_dir");
+}
+
+/**
+ * Function to check if the passed tt_root is present in mapred.local.dir
+ * the task-controller is configured with.
+ */
+int check_tt_root(const char *tt_root) {
+ return check_variable_against_config(TT_SYS_DIR_KEY, tt_root);
+}
+
+/**
+ * Function to check if the passed log_dir is present in hadoop.log.dir
+ * the task-controller is configured with.
+ */
+int check_task_logs_dir(const char *log_dir) {
+ return check_variable_against_config(TT_LOG_DIR_KEY, log_dir);
+}
+
+/**
* Function to check if the constructed path and absolute
* path resolve to one and same.
*/
-
int check_path(char *path) {
+ fprintf(LOGFILE, "Passed path : %s\n", path);
char * resolved_path = (char *) canonicalize_file_name(path);
- if(resolved_path == NULL) {
+ if (resolved_path == NULL) {
+ switch (errno) {
+ case ENAMETOOLONG:
+ fprintf(LOGFILE, "The resulting path is too long.\n");
+ break;
+ case EACCES:
+ fprintf(LOGFILE, "The path is not readable.\n");
+ break;
+ case ENOENT:
+ fprintf(LOGFILE, "The input file name is empty or %s",
+ "the path components does not exist.\n");
+ break;
+ case ELOOP:
+ fprintf(LOGFILE, "More than `MAXSYMLINKS' many symlinks have been followed.\n");
+ break;
+ }
return ERROR_RESOLVING_FILE_PATH;
}
- if(strcmp(resolved_path, path) !=0) {
+ fprintf(LOGFILE, "Resolved path : %s\n", resolved_path);
+ if (strcmp(resolved_path, path) != 0) {
free(resolved_path);
return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
}
free(resolved_path);
return 0;
}
+
/**
- * Function to check if a user actually owns the file.
+ * Function to change the owner/group of a given path.
*/
-int check_owner(uid_t uid, char *path) {
- struct stat filestat;
- if(stat(path, &filestat)!=0) {
- return UNABLE_TO_STAT_FILE;
+int change_owner(const char *path, uid_t uid, gid_t gid) {
+ int exit_code = chown(path, uid, gid);
+ if (exit_code != 0) {
+ switch (errno) {
+ case (EPERM):
+ fprintf(LOGFILE,
+ "Process lacks permission to make the requested change.\n");
+ break;
+ case (EROFS):
+ fprintf(LOGFILE, "File is on a read-only file system.\n");
+ break;
+ case (EACCES):
+ fprintf(
+ LOGFILE,
+ "Process does not have search permission for a directory \
+ component of the file name.\n");
+ break;
+ case (ENAMETOOLONG):
+ fprintf(LOGFILE, "The file name is too long.\n");
+ break;
+ case (ENOENT):
+ fprintf(LOGFILE,
+ "A directory component in the file name doesn't exist.\n");
+ break;
+ case (ENOTDIR):
+ fprintf(
+ LOGFILE,
+ "A directory component in the file name exists, \
+ but it isn't a directory.\n");
+ break;
+ case (ELOOP):
+ fprintf(
+ LOGFILE,
+ "Too many symbolic links were resolved \
+ while trying to look up the file name.\n");
+ break;
+ default:
+ fprintf(LOGFILE, "chown failed with error number : %d.\n", errno);
+ }
}
- //check owner.
- if(uid != filestat.st_uid){
- return FILE_NOT_OWNED_BY_TASKTRACKER;
+ return exit_code;
+}
+
+/**
+ * Function to change the mode of a given path.
+ */
+int change_mode(const char *path, mode_t mode) {
+ int exit_code = chmod(path, mode);
+ if (exit_code != 0) {
+ switch (errno) {
+ case (ENOENT):
+ fprintf(LOGFILE, "The named file doesn't exist.\n");
+ break;
+ case (EPERM):
+ fprintf(
+ LOGFILE,
+ "This process does not have permission to change the access\
+ permissions of this file.\n");
+ break;
+ case (EROFS):
+ fprintf(LOGFILE, "The file resides on a read-only file system.\n");
+ break;
+ case (EACCES):
+ fprintf(
+ LOGFILE,
+ "Process does not have search permission for a directory \
+ component of the file name.\n");
+ break;
+ case (ENAMETOOLONG):
+ fprintf(LOGFILE, "The file name is too long.\n");
+ break;
+ case (ENOTDIR):
+ fprintf(
+ LOGFILE,
+ "A directory component in the file name exists, \
+ but it isn't a directory.\n");
+ break;
+ case (ELOOP):
+ fprintf(
+ LOGFILE,
+ "Too many symbolic links were resolved \
+ while trying to look up the file name.\n");
+ break;
+ default:
+ fprintf(LOGFILE, "chown failed with error number : %d.\n", errno);
+ }
+ }
+ return exit_code;
+}
+
+/**
+ * Function to prepare the attempt directories for the task JVM.
+ * This is done by changing the ownership of the attempt directory recursively
+ * to the job owner.
+ */
+int prepare_attempt_directories(const char *job_id, const char *attempt_id,
+ const char *user) {
+ if (job_id == NULL || attempt_id == NULL || user == NULL) {
+ fprintf(LOGFILE, "Either attempt_id is null or the user passed is null.\n");
+ return -1;
+ }
+
+ if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
+ return -1;
+ }
+
+ char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY);
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ }
+
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+
+ char *job_dir;
+ char *attempt_dir;
+ char *task_work_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 1;
+ while (*local_dir_ptr != NULL) {
+ job_dir = get_job_directory(*local_dir_ptr, job_id);
+
+ // prepare attempt-dir in each of the mapred_local_dir
+ attempt_dir = get_attempt_directory(job_dir, attempt_id);
+ if (opendir(attempt_dir) == NULL) {
+ fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n",
+ attempt_dir);
+ } else if (secure_path(attempt_dir, user_detail->pw_uid,
+ user_detail->pw_gid) != 0) {
+ fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", attempt_dir);
+ failed = 0;
+ }
+
+ // prepare task-work_dir in each of the mapred_local_dir
+ task_work_dir = get_task_work_directory(job_dir, attempt_id);
+ if (opendir(task_work_dir) == NULL) {
+ fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n",
+ task_work_dir);
+ } else if (secure_path(task_work_dir, user_detail->pw_uid,
+ user_detail->pw_gid) != 0) {
+ fprintf(LOGFILE, "Failed to secure the attempt_dir %s\n", task_work_dir);
+ failed = 0;
+ }
+
+ local_dir_ptr++;
+ free(attempt_dir);
+ free(task_work_dir);
+ free(job_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+
+ cleanup();
+ if (failed == 0) {
+ return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
}
return 0;
}
-/*
- * function to provide path to the task file which is created by the tt
- *
- *Check TT_LOCAL_TASK_SCRIPT_PATTERN for pattern
+/**
+ * Function to prepare the job jars directories for the task JVM. Task JVM
+ * creates symbolic links to the jars in the job directories and hence needs
+ * access permissions. So here, we change the ownership of the jars directory
+ * recursively to the job owner.
*/
-void get_task_file_path(const char * jobid, const char * taskid,
- const char * tt_root, char **task_script_path) {
- const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY);
- *task_script_path = NULL;
- int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen(
- taskid)) + strlen(tt_root);
-
- if (mapred_local_dir == NULL) {
- return;
- }
-
- *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1));
- if (*task_script_path == NULL) {
- fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n");
- free(mapred_local_dir);
- return;
- }
-
- memset(*task_script_path,'\0',str_len+1);
- snprintf(*task_script_path, str_len, TT_LOCAL_TASK_SCRIPT_PATTERN, tt_root,
- jobid, taskid);
-#ifdef DEBUG
- fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path);
- fflush(LOGFILE);
-#endif
- free(mapred_local_dir);
+int prepare_jars_and_work_directories(const char *jobid, const char *user) {
+ if (jobid == NULL || user == NULL) {
+ fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n");
+ return -1;
+ }
+
+ if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+ return -1;
+ }
+
+ char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED;
+ }
+
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+
+ char *job_dir;
+ char *jars_dir;
+ char *work_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 1;
+ while (*local_dir_ptr != NULL) {
+ // prepare jars in each of the mapred_local_dir
+ job_dir = get_job_directory(*local_dir_ptr, jobid);
+
+ // prepare jars in each of the mapred_local_dir
+ jars_dir = get_job_jars_directory(job_dir);
+ if (opendir(jars_dir) == NULL) {
+ fprintf(LOGFILE, "jars_dir %s doesn't exist. Not doing anything.\n",
+ jars_dir);
+ } else if (secure_path(jars_dir, user_detail->pw_uid, user_detail->pw_gid)
+ != 0) {
+ fprintf(LOGFILE, "Failed to secure the jars_dir %s\n", jars_dir);
+ failed = 0;
+ }
+
+ // prepare work in each of the mapred_local_dir
+ work_dir = get_work_directory(job_dir);
+ if (opendir(work_dir) == NULL) {
+ fprintf(LOGFILE, "work_dir %s doesn't exist. Not doing anything.\n",
+ jars_dir);
+ } else if (secure_path(work_dir, user_detail->pw_uid, user_detail->pw_gid)
+ != 0) {
+ fprintf(LOGFILE, "Failed to secure the work_dir %s\n", jars_dir);
+ failed = 0;
+ }
+
+ local_dir_ptr++;
+ free(job_dir);
+ free(jars_dir);
+ free(work_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+ cleanup();
+ if (failed == 0) {
+ return PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED;
+ }
+ return 0;
+}
+
+/**
+ * Function to prepare the task logs for the child. It gives the ownership
+ * of the attempt's log-dir to the user. It gives readable permissions to
+ * everyone for the attempt's dir and it's contents. This is only till
+ * security is fixed for task-logs.
+ */
+int prepare_task_logs(const char *log_dir, const char *task_id) {
+
+ char *task_log_dir = get_task_log_dir(log_dir, task_id);
+ if (opendir(task_log_dir) == NULL) {
+ fprintf(LOGFILE, "task_log_dir %s doesn't exist. Not doing anything.\n",
+ task_log_dir);
+ // See TaskRunner.java to see that an absent log-dir doesn't fail the task.
+ return 0;
+ }
+
+ if (change_owner(task_log_dir, user_detail->pw_uid, user_detail->pw_gid)
+ != 0) {
+ fprintf(LOGFILE, "couldn't change the ownership of attempt log dir %s\n",
+ task_log_dir);
+ return -1;
+ }
+
+ // Create and set permissions for all the log related files.
+ // See org.apache.hadoop.mapred.TaskLog for information about these files.
+ // TODO: Once MAPREDUCE-AAAA is fixed, the following will not be needed.
+ char *files[5] = { "stdout", "stderr", "syslog", "profile.out", "debugout" };
+ char *all_files[6];
+ int i;
+ for (i = 0; i < 5; i++) {
+ all_files[i] = files[i];
+ }
+ if (strstr(task_id, ".cleanup") != NULL) {
+ // This is a cleanup attempt
+ all_files[5] = "log.index.cleanup";
+ } else {
+ all_files[5] = "log.index";
+ }
+
+ char *file_path;
+ int exit_code = 0;
+ mode_t old_umask = umask(0033);
+ for (i = 0; i < 7; i++) {
+ file_path = concatenate(task_log_dir, all_files[i], "%s/%s",
+ ("%s_file_path", all_files[i]));
+ // Give read permissions to everyone and write permissions to the user.
+ if (creat(file_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) == -1) {
+ fprintf(LOGFILE, "couldn't create the file %s in the log_dir %s\n",
+ file_path, task_log_dir);
+ exit_code = -1;
+ } else if (change_owner(file_path, user_detail->pw_uid,
+ user_detail->pw_gid) != 0) {
+ fprintf(LOGFILE, "couldn't change the ownership of the file %s\n",
+ file_path);
+ exit_code = -1;
+ }
+ }
+ umask(old_umask);
+ return exit_code;
+}
+
+/**
+ * Function to secure the given path. It does the following recursively:
+ * 1) changes the owner/group of the paths to the passed owner/group
+ * 2) changes the file permission to be only readable to the owner.
+ */
+int secure_path(const char *path, uid_t uid, gid_t gid) {
+ FTS *tree = NULL; // the file hierarchy
+ FTSENT *entry = NULL; // a file in the hierarchy
+ char *paths[] = { (char *)path };
+ int process_path = 1;
+ int error_code = 0;
+
+ // Secure permissions
+ mode_t file_mode = S_IREAD | S_IWRITE | S_IEXEC;
+
+ // Get physical locations and don't resolve the symlinks.
+ // Don't change directory while walking the directory.
+ int ftsoptions = FTS_PHYSICAL | FTS_NOCHDIR;
+ tree = fts_open(paths, ftsoptions, NULL);
+ while (((entry = fts_read(tree)) != NULL) && error_code == 0) {
+ switch (entry->fts_info) {
+ case FTS_D:
+ // A directory being visited in pre-order.
+ // We change ownership of directories in post-order.
+ // so ignore the pre-order visit.
+ process_path = 1;
+ break;
+ case FTS_DC:
+ // A directory that causes a cycle in the tree
+ // We don't expect cycles, ignore.
+ process_path = 1;
+ break;
+ case FTS_DNR:
+ // A directory which cannot be read
+ // Ignore and set error code.
+ process_path = 1;
+ error_code = -1;
+ break;
+ case FTS_DOT:
+ // "." or ".."
+ process_path = 1;
+ break;
+ case FTS_F:
+ // A regular file
+ process_path = 0;
+ break;
+ case FTS_DP:
+ // A directory being visited in post-order
+ process_path = 0;
+ break;
+ case FTS_SL:
+ // A symbolic link
+ process_path = 0;
+ break;
+ case FTS_SLNONE:
+ // A symbolic link with a nonexistent target
+ process_path = 0;
+ break;
+ case FTS_NS:
+ // A file for which no stat(2) information was available
+ // Ignore and set error code
+ process_path = 1;
+ error_code = -1;
+ break;
+ case FTS_DEFAULT:
+ // File that doesn't belong to any of the above type. Ignore.
+ process_path = 1;
+ break;
+ }
+
+ if (error_code != 0) {
+ break;
+ }
+ if (process_path == 1) {
+ continue;
+ }
+
+ if (change_owner(entry->fts_path, uid, gid) != 0) {
+ fprintf(LOGFILE, "couldn't change the ownership of %s\n",
+ entry->fts_path);
+ error_code = -3;
+ } else if (change_mode(entry->fts_path, file_mode) != 0) {
+ fprintf(LOGFILE, "couldn't change the permissions of %s\n",
+ entry->fts_path);
+ error_code = -3;
+ }
+ }
+ fts_close(tree);
+ return error_code;
+}
+
+/**
+ * Function to finalise directories to be owned by the TaskTracker back again.
+ * This is used in the following cases:
+ * 1) For changing ownership of the attempt-directory when the
+ * attempt finishes and
+ * 2) For changing ownership of the jars directory when the job finishes.
+ */
+int finalize_directories(const char *dir_to_be_finalised) {
+ if (dir_to_be_finalised == NULL) {
+ fprintf(LOGFILE, "dir_to_be_finalised is null.\n");
+ return -1;
+ }
+
+ if (opendir(dir_to_be_finalised) == NULL) {
+ fprintf(LOGFILE,
+ "dir_to_be_finalised %s doesn't exist. Not doing anything.\n",
+ dir_to_be_finalised);
+ return 0;
+ }
+
+ uid_t uid = getuid();
+ gid_t gid = getgid();
+
+ fprintf(LOGFILE, "Securing the path %s recursively to TT.\n",
+ dir_to_be_finalised);
+ if (secure_path(dir_to_be_finalised, uid, gid) != 0) {
+ fprintf(LOGFILE, "Failed to secure the path %s to TT.\n",
+ dir_to_be_finalised);
+ return -1;
+ }
+ return 0;
}
-//end of private functions
void display_usage(FILE *stream) {
fprintf(stream,
"Usage: task-controller [-l logfile] user command command-args\n");
}
//function used to populate and user_details structure.
-
int get_user_details(const char *user) {
if (user_detail == NULL) {
user_detail = getpwnam(user);
@@ -180,31 +686,66 @@ int get_user_details(const char *user) {
return 0;
}
+/**
+ * Function used to initialize task. Prepares attempt_dir, jars_dir and
+ * log_dir to be accessible by the child
+ * TODO: fix exit codes
+ */
+int initialize_task(const char *jobid, const char *taskid,
+ const char *log_dir, const char *user) {
+ int exit_code = 0;
+ fprintf(LOGFILE, "job-id passed to initialize_task : %s.\n", jobid);
+ fprintf(LOGFILE, "task-d passed to initialize_task : %s.\n", taskid);
+ fprintf(LOGFILE, "log_dir passed to initialize_task : %s.\n", log_dir);
+
+ if (prepare_attempt_directories(jobid, taskid, user) != 0) {
+ fprintf(LOGFILE,
+ "Couldn't prepare the attempt directories for %s of user %s.\n",
+ taskid, user);
+ exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ goto cleanup;
+ }
+
+ if (check_task_logs_dir(log_dir) < 0) {
+ fprintf(LOGFILE, "Problem with the log directory passed or configured.\n");
+ exit_code = INVALID_TT_LOG_DIR;
+ goto cleanup;
+ }
+
+ if (prepare_task_logs(log_dir, taskid) != 0) {
+ fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n",
+ log_dir, taskid);
+ exit_code = PREPARE_TASK_LOGS_FAILED;
+ }
+
+ cleanup:
+ // free configurations
+ cleanup();
+ return exit_code;
+}
+
/*
- *Function used to launch a task as the provided user.
- * First the function checks if the tt_root passed is found in
- * hadoop.temp.dir
- * Uses get_task_file_path to fetch the task script file path.
- * Does an execlp on the same in order to replace the current image with
+ * Function used to launch a task as the provided user. It does the following :
+ * 1) Checks if the tt_root passed is found in mapred.local.dir
+ * 2) Prepares attempt_dir, jars_dir and log_dir to be accessible by the child
+ * 3) Uses get_task_launcher_file to fetch the task script file path
+ * 4) Does an execlp on the same in order to replace the current image with
* task image.
*/
-
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
- const char *tt_root) {
- char *task_script_path = NULL;
+ const char *tt_root, const char *log_dir) {
int exit_code = 0;
uid_t uid = getuid();
- if(jobid == NULL || taskid == NULL) {
+ if (jobid == NULL || taskid == NULL || tt_root == NULL || log_dir == NULL) {
return INVALID_ARGUMENT_NUMBER;
}
-#ifdef DEBUG
- fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid);
- fprintf(LOGFILE,"run_task_as_user : task id : %s \n", taskid);
- fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root);
- fflush(LOGFILE);
-#endif
+ fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
+ fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
+ fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
+ fprintf(LOGFILE, "log_dir passed to run_task_as_user : %s.\n", log_dir);
+
//Check tt_root before switching the user, as reading configuration
//file requires privileged access.
if (check_tt_root(tt_root) < 0) {
@@ -213,41 +754,113 @@ int run_task_as_user(const char * user, const char *jobid, const char *taskid,
return INVALID_TT_ROOT;
}
- //change the user
- fclose(LOGFILE);
- fcloseall();
- umask(0);
- if (change_user(user) != 0) {
- cleanup();
- return SETUID_OPER_FAILED;
+ char *job_dir = NULL, *task_work_dir = NULL, *jars_dir =
+ NULL, *task_script_path = NULL;
+
+ if (prepare_attempt_directories(jobid, taskid, user) != 0) {
+ fprintf(LOGFILE,
+ "Couldn't prepare the attempt directories for %s of user %s.\n",
+ taskid, user);
+ exit_code = PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ goto cleanup;
+ }
+
+ if (prepare_jars_and_work_directories(jobid, user) != 0) {
+ fprintf(LOGFILE,
+ "Couldn't prepare jars and work directories %s of user %s.\n", jobid,
+ user);
+ exit_code = PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED;
+ goto cleanup;
+ }
+
+ if (check_task_logs_dir(log_dir) < 0) {
+ fprintf(LOGFILE, "Problem with the log directory passed or configured.\n");
+ exit_code = INVALID_TT_LOG_DIR;
+ goto cleanup;
+ }
+
+ if (prepare_task_logs(log_dir, taskid) != 0) {
+ fprintf(LOGFILE, "Couldn't prepare task logs directory %s for %s.\n", log_dir,
+ taskid);
+ exit_code = PREPARE_TASK_LOGS_FAILED;
+ goto cleanup;
+ }
+
+ job_dir = get_job_directory(tt_root, jobid);
+ if (job_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
+ exit_code = OUT_OF_MEMORY;
+ goto cleanup;
}
- get_task_file_path(jobid, taskid, tt_root, &task_script_path);
+ task_work_dir = get_task_work_directory(job_dir, taskid);
+ if (task_work_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't obtain task_work_dir for %s in %s.\n", taskid, job_dir);
+ exit_code = OUT_OF_MEMORY;
+ goto cleanup;
+ }
+
+ task_script_path = get_task_launcher_file(task_work_dir);
if (task_script_path == NULL) {
- cleanup();
- return INVALID_TASK_SCRIPT_PATH;
+ fprintf(LOGFILE, "Couldn't obtain task_script_path in %s.\n", job_dir);
+ exit_code = OUT_OF_MEMORY;
+ goto cleanup;
}
+
errno = 0;
exit_code = check_path(task_script_path);
if(exit_code != 0) {
goto cleanup;
}
- errno = 0;
- exit_code = check_owner(uid, task_script_path);
- if(exit_code != 0) {
+
+ //change the user
+ fcloseall();
+ umask(0077);
+ if (change_user(user) != 0) {
+ exit_code = SETUID_OPER_FAILED;
goto cleanup;
}
+
errno = 0;
cleanup();
execlp(task_script_path, task_script_path, NULL);
if (errno != 0) {
- free(task_script_path);
- exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
- }
+ switch (errno) {
+ case E2BIG:
+ fprintf(LOGFILE,
+ "The total number of bytes in the environment (envp)\
+ and argument list (argv) is too large.");
+ break;
+ case EACCES:
+ fprintf(LOGFILE,
+ "Search permission is denied on a component of the path prefix\
+ of filename or the name of a script interpreter. or");
+ fprintf(LOGFILE,
+ "Execute permission is denied for the file or a script or \
+ ELF interpreter.");
+ break;
+ case ENOENT:
+ fprintf(LOGFILE,
+ "The file filename or a script or ELF interpreter does not exist,\
+ or a shared library needed for file or interpreter cannot be found.");
+ break;
+ }
+ free(task_script_path);
+ exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+ }
return exit_code;
cleanup:
+ if (job_dir != NULL) {
+ free(job_dir);
+ }
+ if (task_work_dir != NULL) {
+ free(task_work_dir);
+ }
+ if (jars_dir != NULL) {
+ free(jars_dir);
+ }
if (task_script_path != NULL) {
free(task_script_path);
}
@@ -261,19 +874,23 @@ cleanup:
* The function sends appropriate signal to the process group
* specified by the task_pid.
*/
-
int kill_user_task(const char *user, const char *task_pid, int sig) {
int pid = 0;
if(task_pid == NULL) {
return INVALID_ARGUMENT_NUMBER;
}
+
+ fprintf(LOGFILE, "user passed to kill_user_task : %s.\n", user);
+ fprintf(LOGFILE, "task-pid passed to kill_user_task : %s.\n", task_pid);
+ fprintf(LOGFILE, "signal passed to kill_user_task : %d.\n", sig);
+
pid = atoi(task_pid);
if(pid <= 0) {
return INVALID_TASK_PID;
}
- fclose(LOGFILE);
+
fcloseall();
if (change_user(user) != 0) {
cleanup();
@@ -283,6 +900,7 @@ int kill_user_task(const char *user, const char *task_pid, int sig) {
//Don't continue if the process-group is not alive anymore.
if(kill(-pid,0) < 0) {
errno = 0;
+ cleanup();
return 0;
}
@@ -298,3 +916,147 @@ int kill_user_task(const char *user, const char *task_pid, int sig) {
return 0;
}
+/**
+ * Function to tidy up things when a task given by task-id finishes.
+ * As of now, it changes the ownership of the attempt directory and log
+ * directory back to TT so that
+ * 1) TT can serve the output in case of map-tasks,
+ * 2) TT can directly read the logs of finished tasks and
+ * 3) task dirs can be cleaned up directly by TT once not needed anymore.
+ */
+int finalize_task_dirs(const char *job_id, const char *task_id,
+ const char *log_dir, int first_task) {
+
+ if (job_id == NULL || task_id == NULL || log_dir == NULL) {
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ fprintf(LOGFILE, "job-id passed to finalize_task_dirs : %s.\n", job_id);
+ fprintf(LOGFILE, "task-d passed to finalize_task_dirs : %s.\n", task_id);
+ fprintf(LOGFILE, "log_dir passed to finalize_task_dirs : %s.\n", log_dir);
+ fprintf(LOGFILE,
+ "first_task passed to finalize_task_dirs : %d.\n",
+ first_task);
+
+ char *full_local_dir_str = (char *)get_value(TT_SYS_DIR_KEY);
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return PREPARE_ATTEMPT_DIRECTORIES_FAILED;
+ }
+
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+
+ char *job_dir;
+ char *attempt_dir;
+ char *task_work_dir;
+ char *task_log_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 1;
+ while (*local_dir_ptr != NULL) {
+ job_dir = get_job_directory(*local_dir_ptr, job_id);
+
+ // finalise attempt-dir in each of the mapred_local_dir
+ attempt_dir = get_attempt_directory(job_dir, task_id);
+ if (opendir(attempt_dir) == NULL) {
+ fprintf(LOGFILE, "attempt_dir %s doesn't exist. Not doing anything.\n",
+ attempt_dir);
+ } else if (finalize_directories(attempt_dir)) {
+ fprintf(LOGFILE, "Failed to finalise the directory %s\n", attempt_dir);
+ failed = 0;
+ }
+
+ if (first_task == 1) {
+ // Not the first task.
+
+ // finalise task-work-dir in each of the mapred_local_dir
+ // this is used in case of jvm reuse.
+ task_work_dir = get_task_work_directory(job_dir, task_id);
+ if (opendir(task_work_dir) == NULL) {
+ fprintf(LOGFILE,
+ "task_work_dir %s doesn't exist. Not doing anything.\n",
+ task_work_dir);
+ } else if (finalize_directories(task_work_dir)) {
+ fprintf(LOGFILE, "Failed to finalise the directory %s\n",
+ task_work_dir);
+ failed = 0;
+ }
+ free(task_work_dir);
+ }
+
+ local_dir_ptr++;
+ free(attempt_dir);
+ free(job_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+
+ cleanup();
+ if (failed == 0) {
+ return FINALIZE_TASK_DIRS_FAILED;
+ }
+ return 0;
+}
+
+/**
+ * Function to tidy up things once the job given by job-id finishes.
+ * As of now, it changes the ownership of job jars back to the TT so that they
+ * can be cleaned up by TT itself.
+ */
+int finalize_job(const char *jobid) {
+ if (jobid == NULL) {
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ fprintf(LOGFILE, "Job-id passed to finalize_job : %s.\n", jobid);
+
+ char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return FINALIZE_JOB_FAILED;
+ }
+
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+
+ char *job_dir;
+ char *jars_dir;
+ char *work_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 1;
+ while (*local_dir_ptr != NULL) {
+ job_dir = get_job_directory(*local_dir_ptr, jobid);
+
+ // finalise jars in each of the mapred_local_dir
+ jars_dir = get_job_jars_directory(job_dir);
+ if (finalize_directories(jars_dir) != 0) {
+ fprintf(LOGFILE, "Failed to finalise the jars_dir %s\n", jars_dir);
+ failed = 0;
+ }
+
+ // finalise work dir in each of the mapred_local_dir
+ work_dir = get_work_directory(job_dir);
+ if (finalize_directories(work_dir) != 0) {
+ fprintf(LOGFILE, "Failed to finalise the work_dir %s\n", work_dir);
+ failed = 0;
+ }
+
+ local_dir_ptr++;
+ free(job_dir);
+ free(work_dir);
+ free(jars_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+ cleanup();
+ if (failed == 0) {
+ return FINALIZE_JOB_FAILED;
+ }
+ return 0;
+}
diff --git src/c++/task-controller/task-controller.h src/c++/task-controller/task-controller.h
index 8e545b5..0f21910 100644
--- src/c++/task-controller/task-controller.h
+++ src/c++/task-controller/task-controller.h
@@ -28,14 +28,21 @@
#include
#include
#include
-#include
+#include
+#include
+#include
+#include
+
#include "configuration.h"
//command definitions
enum command {
LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM
+ KILL_TASK_JVM,
+ FINALIZE_TASK_DIRS,
+ FINALIZE_JOB,
};
enum errorcodes {
@@ -45,22 +52,43 @@ enum errorcodes {
SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
INVALID_TT_ROOT, //5
SETUID_OPER_FAILED, //6
- INVALID_TASK_SCRIPT_PATH, //7
- UNABLE_TO_EXECUTE_TASK_SCRIPT, //8
- UNABLE_TO_KILL_TASK, //9
- INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10
- INVALID_TASK_PID, //11
- ERROR_RESOLVING_FILE_PATH, //12
- RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
- UNABLE_TO_STAT_FILE, //14
- FILE_NOT_OWNED_BY_TASKTRACKER //15
+ UNABLE_TO_EXECUTE_TASK_SCRIPT, //7
+ UNABLE_TO_KILL_TASK, //8
+ INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //9
+ INVALID_TASK_PID, //10
+ ERROR_RESOLVING_FILE_PATH, //11
+ RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //12
+ UNABLE_TO_STAT_FILE, //13
+ FILE_NOT_OWNED_BY_TASKTRACKER, //14
+ PREPARE_ATTEMPT_DIRECTORIES_FAILED, //15
+ PREPARE_JARS_AND_WORK_DIRECTORIES_FAILED, //16
+ PREPARE_TASK_LOGS_FAILED, //17
+ INVALID_TT_LOG_DIR, //18
+ FINALIZE_TASK_DIRS_FAILED, //19
+ FINALIZE_JOB_FAILED, //20
+ OUT_OF_MEMORY, //21
};
+#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+
+#define TT_ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN"/%s"
+
+#define JOB_DIR_TO_JARS_PATTERN "%s/jars"
+
+#define JOB_DIR_TO_WORK_DIR_PATTERN "%s/work"
-#define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
+#define JOB_DIR_TO_ATTEMPT_DIR_PATTERN "%s/%s"
+
+#define JOB_DIR_TO_TASK_WORK_DIR_PATTERN "%s/task-work/%s"
+
+#define ATTEMPT_LOG_DIR_PATTERN "%s/userlogs/%s"
+
+#define ATTEMPT_DIR_TO_TASK_SCRIPT_PATTERN "%s/taskjvm.sh"
#define TT_SYS_DIR_KEY "mapred.local.dir"
+#define TT_LOG_DIR_KEY "hadoop.log.dir"
+
#define MAX_ITEMS 10
#ifndef HADOOP_CONF_DIR
@@ -74,8 +102,28 @@ extern FILE *LOGFILE;
void display_usage(FILE *stream);
-int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root);
+int run_task_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root, const char *log_dir);
+
+int initialize_task(const char *jobid, const char *taskid,
+ const char *log_dir, const char *user);
int kill_user_task(const char *user, const char *task_pid, int sig);
+int finalize_task_dirs(const char *jobid, const char *taskid,
+ const char *log_dir, int first_task);
+
+int prepare_attempt_directory(const char *attempt_dir, const char *user);
+
+int finalize_job(const char *job_id);
+
+// The following functions are exposed for testing
+
+int check_variable_against_config(const char *config_key,
+ const char *passed_value);
+
int get_user_details(const char *user);
+
+int check_path(char *path);
+
+char *get_job_cache_directory(const char * tt_root);
diff --git src/c++/task-controller/test-task-controller.c src/c++/task-controller/test-task-controller.c
new file mode 100644
index 0000000..9660f1a
--- /dev/null
+++ src/c++/task-controller/test-task-controller.c
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "task-controller.h"
+
+#define HADOOP_CONF_DIR "/tmp"
+
+int write_config_file(char *file_name) {
+ FILE *file;
+ char const *str =
+ "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+
+ file = fopen(file_name, "w");
+ if (file == NULL) {
+ printf("Failed to open %s.\n", file_name);
+ return EXIT_FAILURE;
+ }
+ fwrite(str, 1, strlen(str), file);
+ fclose(file);
+ return 0;
+}
+
+void test_check_variable_against_config() {
+
+ // A temporary configuration directory
+ char *conf_dir_templ = "/tmp/test-task-controller-conf-dir-XXXXXX";
+
+ // To accomodate "/conf/taskcontroller.cfg"
+ char template[strlen(conf_dir_templ) + strlen("/conf/taskcontroller.cfg")];
+
+ strcpy(template, conf_dir_templ);
+ char *temp_dir = mkdtemp(template);
+ if (temp_dir == NULL) {
+ printf("Couldn't create a temporary dir for conf.\n");
+ goto cleanup;
+ }
+
+ // Set the configuration directory
+ hadoop_conf_dir = strdup(temp_dir);
+
+ // create the configuration directory
+ strcat(template, "/conf");
+ char *conf_dir = strdup(template);
+ mkdir(conf_dir, S_IRWXU);
+
+ // create the configuration file
+ strcat(template, "/taskcontroller.cfg");
+ if (write_config_file(template) != 0) {
+ printf("Couldn't write the configuration file.\n");
+ goto cleanup;
+ }
+
+ // Test obtaining a value for a key from the config
+ char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
+ "/tmp/testing3", "/tmp/testing4" };
+ char *value = (char *) get_value("mapred.local.dir");
+ if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
+ != 0) {
+ printf("Obtaining a value for a key from the config failed.\n");
+ goto cleanup;
+ }
+
+ // Test the parsing of a multiple valued key from the config
+ char **values = (char **)get_values("mapred.local.dir");
+ char **values_ptr = values;
+ int i = 0;
+ while (*values_ptr != NULL) {
+ printf(" value : %s\n", *values_ptr);
+ if (strcmp(*values_ptr, config_values[i++]) != 0) {
+ printf("Configured values are not read out properly. Test failed!");
+ goto cleanup;;
+ }
+ values_ptr++;
+ }
+
+ if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) {
+ printf("Configuration should not contain /tmp/testing5! \n");
+ goto cleanup;
+ }
+
+ if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) {
+ printf("Configuration should contain /tmp/testing4! \n");
+ goto cleanup;
+ }
+
+ cleanup: if (value != NULL) {
+ free(value);
+ }
+ if (values != NULL) {
+ free(values);
+ }
+ if (hadoop_conf_dir != NULL) {
+ free(hadoop_conf_dir);
+ }
+ unlink(template);
+ rmdir(conf_dir);
+ rmdir(hadoop_conf_dir);
+}
+
+void test_get_job_directory() {
+ char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+ printf("job_dir obtained is %s\n", job_dir);
+ int ret = 0;
+ if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+ ret = -1;
+ }
+ free(job_dir);
+ assert(ret == 0);
+}
+
+void test_get_attempt_directory() {
+ char *attempt_dir = (char *) get_attempt_directory(
+ "/tmp/taskTracker/jobcache/job_200906101234_0001",
+ "attempt_200906112028_0001_m_000000_0");
+ printf("attempt_dir obtained is %s\n", attempt_dir);
+ int ret = 0;
+ if (strcmp(
+ attempt_dir,
+ "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+ != 0) {
+ ret = -1;
+ }
+ free(attempt_dir);
+ assert(ret == 0);
+}
+
+void test_get_task_launcher_file() {
+ char
+ *task_file =
+ (char *) get_task_launcher_file(
+ "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0");
+ printf("task_file obtained is %s\n", task_file);
+ int ret = 0;
+ if (strcmp(
+ task_file,
+ "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+ != 0) {
+ ret = -1;
+ }
+ free(task_file);
+ assert(ret == 0);
+}
+
+void test_get_task_log_dir() {
+ char *logdir = (char *) get_task_log_dir("/tmp/testing",
+ "attempt_200906112028_0001_m_000000_0");
+ printf("logdir obtained is %s\n", logdir);
+ int ret = 0;
+ if (strcmp(logdir,
+ "/tmp/testing/userlogs/attempt_200906112028_0001_m_000000_0") != 0) {
+ ret = -1;
+ }
+ free(logdir);
+ assert(ret == 0);
+}
+
+int main(int argc, char **argv) {
+ printf("Starting tests\n");
+ LOGFILE = stdout;
+ test_check_variable_against_config();
+ test_get_job_directory();
+ test_get_attempt_directory();
+ test_get_task_launcher_file();
+ test_get_task_log_dir();
+ printf("Finished tests\n");
+ return 0;
+}
diff --git src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
index 8e6842e..8439824 100644
--- src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
+++ src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingAsDifferentUser.java
@@ -65,7 +65,7 @@ public class TestStreamingAsDifferentUser extends
"stream.tmpdir=" + System.getProperty("test.build.data", "/tmp") };
StreamJob streamJob = new StreamJob(args, true);
streamJob.setConf(myConf);
- streamJob.go();
+ assertTrue("Job has not succeeded", streamJob.go() == 0);
assertOwnerShip(outputPath);
}
}
diff --git src/java/org/apache/hadoop/mapred/BackupStore.java src/java/org/apache/hadoop/mapred/BackupStore.java
index 52d0175..ed1db93 100644
--- src/java/org/apache/hadoop/mapred/BackupStore.java
+++ src/java/org/apache/hadoop/mapred/BackupStore.java
@@ -548,10 +548,9 @@ public class BackupStore {
boolean isActive() { return isActive; }
private Writer createSpillFile() throws IOException {
- Path tmp = new Path(
- TaskTracker.getIntermediateOutputDir(
- tid.getJobID().toString(), tid.toString()) +
- "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out");
+ Path tmp =
+ new Path(TaskTracker.getBaseIntermediateOutputDir() + "/backup_"
+ + tid.getId() + "_" + (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
diff --git src/java/org/apache/hadoop/mapred/Child.java src/java/org/apache/hadoop/mapred/Child.java
index 3ea7dc8..ab593cf 100644
--- src/java/org/apache/hadoop/mapred/Child.java
+++ src/java/org/apache/hadoop/mapred/Child.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.log4j.LogManager;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
/**
* The main() for child processes.
@@ -168,14 +169,15 @@ class Child {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage());
} catch (Throwable throwable) {
- LOG.warn("Error running child", throwable);
+ LOG.warn("Error running child : "
+ + StringUtils.stringifyException(throwable));
try {
if (task != null) {
// do cleanup for the task
task.taskCleanup(umbilical);
}
} catch (Throwable th) {
- LOG.info("Error cleaning up" + th);
+ LOG.info("Error cleaning up : " + StringUtils.stringifyException(th));
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git src/java/org/apache/hadoop/mapred/DefaultTaskController.java src/java/org/apache/hadoop/mapred/DefaultTaskController.java
index 174403b..58007fd 100644
--- src/java/org/apache/hadoop/mapred/DefaultTaskController.java
+++ src/java/org/apache/hadoop/mapred/DefaultTaskController.java
@@ -46,6 +46,7 @@ class DefaultTaskController extends TaskController {
* This method launches the new JVM for the task by executing the
* the JVM command using the {@link Shell.ShellCommandExecutor}
*/
+ @Override
void launchTaskJVM(TaskController.TaskControllerContext context)
throws IOException {
JvmEnv env = context.env;
@@ -59,7 +60,7 @@ class DefaultTaskController extends TaskController {
context.shExec = shexec;
shexec.execute();
}
-
+
/**
* Initialize the task environment.
*
@@ -72,21 +73,6 @@ class DefaultTaskController extends TaskController {
// So this is a dummy method.
return;
}
-
-
- @Override
- void setup() {
- // nothing to setup
- return;
- }
-
- /*
- * No need to do anything as we don't need to do as we dont need anything
- * extra from what TaskTracker has done.
- */
- @Override
- void initializeJob(JobID jobId) {
- }
@Override
void terminateTask(TaskControllerContext context) {
@@ -132,5 +118,17 @@ class DefaultTaskController extends TaskController {
}
}
}
-
+
+ @Override
+ void finalizeTaskDirs(TaskControllerContext context)
+ throws IOException {
+ // Do nothing
+ }
+
+ @Override
+ void finalizeJob(JobID jobId, String workDir, String user)
+ throws IOException {
+ // Do nothing
+
+ }
}
diff --git src/java/org/apache/hadoop/mapred/JvmManager.java src/java/org/apache/hadoop/mapred/JvmManager.java
index c2a463b..35aae9d 100644
--- src/java/org/apache/hadoop/mapred/JvmManager.java
+++ src/java/org/apache/hadoop/mapred/JvmManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.util.StringUtils;
class JvmManager {
@@ -185,16 +186,24 @@ class JvmManager {
TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
Task task = taskRunner.getTaskInProgress().getTask();
- TaskControllerContext context =
- new TaskController.TaskControllerContext();
+
+ // Initialize task dirs
+ TaskControllerContext context =
+ new TaskController.TaskControllerContext();
context.env = jvmRunner.env;
context.task = task;
- //If we are returning the same task as which the JVM was launched
- //we don't initialize task once again.
- if(!jvmRunner.env.conf.get("mapred.task.id").
- equals(task.getTaskID().toString())) {
- tracker.getTaskController().initializeTask(context);
+ // If we are returning the same task as which the JVM was launched
+ // we don't initialize task once again.
+ if (!jvmRunner.env.conf.get("mapred.task.id").equals(
+ task.getTaskID().toString())) {
+ try {
+ tracker.getTaskController().initializeTask(context);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
+
return taskRunner.getTaskInProgress();
}
return null;
@@ -218,7 +227,7 @@ class JvmManager {
synchronized public void taskKilled(TaskRunner tr) {
JVMId jvmId = runningTaskToJvm.remove(tr);
if (jvmId != null) {
- jvmToRunningTask.remove(jvmId);
+ TaskRunner runner = jvmToRunningTask.remove(jvmId);
killJvm(jvmId);
}
}
@@ -393,8 +402,9 @@ class JvmManager {
//Launch the task controller to run task JVM
initalContext.task = jvmToRunningTask.get(jvmId).getTask();
initalContext.env = env;
- tracker.getTaskController().initializeTask(initalContext);
+ LOG.info("Launching "+ initalContext.task.getTaskID());
tracker.getTaskController().launchTaskJVM(initalContext);
+ LOG.info("Finished "+ initalContext.task.getTaskID());
} catch (IOException ioe) {
// do nothing
// error and output are appropriately redirected
@@ -403,9 +413,9 @@ class JvmManager {
if (shexec == null) {
return;
}
-
+
kill();
-
+
int exitCode = shexec.getExitCode();
updateOnJvmExit(jvmId, exitCode);
LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
@@ -438,6 +448,7 @@ class JvmManager {
.getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+ // Destroy the task jvm
controller.destroyTaskJVM(initalContext);
} else {
LOG.info(String.format("JVM Not killed %s but just removed", jvmId
@@ -462,6 +473,35 @@ class JvmManager {
return busy;
}
}
+
+ /**
+ * Finalize the task directories.
+ * @param runner
+ * @throws IOException
+ */
+ public void finalizeTaskDirs(TaskRunner runner)
+ throws IOException {
+ JVMId jvmId = runningTaskToJvm.get(runner);
+ if (jvmId == null) {
+ LOG.debug("JvmId is null, not doing anything for finalizeTaskDirs.");
+ return;
+ }
+ JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+ if (jvmRunner == null) {
+ LOG
+ .debug("JvmRunner is null, not doing anything for finalizeTaskDirs.");
+ return;
+ }
+ TaskControllerContext taskContext = new TaskControllerContext();
+ taskContext.env = jvmRunner.initalContext.env;
+ taskContext.task = runner.getTask();
+ if (taskContext.env != null && taskContext.task != null) {
+ tracker.getTaskController().finalizeTaskDirs(taskContext);
+ } else {
+ LOG.debug("taskContext.env or taskContext.env is null,"
+ + " not doing anything for finalizeTaskDirs.");
+ }
+ }
}
static class JvmEnv { //Helper class
List vargs;
@@ -485,4 +525,19 @@ class JvmManager {
this.conf = conf;
}
}
+
+ /**
+ * Finalize the task directories once the task finishes.
+ *
+ * @param runner
+ * @throws IOException
+ */
+ void finalizeTaskDirs(TaskRunner runner)
+ throws IOException {
+ if (runner.getTask().isMapTask()) {
+ mapJvmManager.finalizeTaskDirs(runner);
+ } else {
+ reduceJvmManager.finalizeTaskDirs(runner);
+ }
+ }
}
diff --git src/java/org/apache/hadoop/mapred/LinuxTaskController.java src/java/org/apache/hadoop/mapred/LinuxTaskController.java
index c21ccb1..b62c150 100644
--- src/java/org/apache/hadoop/mapred/LinuxTaskController.java
+++ src/java/org/apache/hadoop/mapred/LinuxTaskController.java
@@ -28,9 +28,9 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -73,44 +73,20 @@ class LinuxTaskController extends TaskController {
new File(hadoopBin, "task-controller").getAbsolutePath();
}
- // The list of directory paths specified in the
- // variable mapred.local.dir. This is used to determine
- // which among the list of directories is picked up
- // for storing data for a particular task.
- private String[] mapredLocalDirs;
-
- // permissions to set on files and directories created.
- // When localized files are handled securely, this string
- // will change to something more restrictive. Until then,
- // it opens up the permissions for all, so that the tasktracker
- // and job owners can access files together.
- private static final String FILE_PERMISSIONS = "ugo+rwx";
-
- // permissions to set on components of the path leading to
- // localized files and directories. Read and execute permissions
- // are required for different users to be able to access the
- // files.
- private static final String PATH_PERMISSIONS = "go+rx";
-
public LinuxTaskController() {
super();
}
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- mapredLocalDirs = conf.getStrings("mapred.local.dir");
- //Setting of the permissions of the local directory is done in
- //setup()
- }
-
/**
* List of commands that the setuid script will execute.
*/
enum TaskCommands {
LAUNCH_TASK_JVM,
+ INITIALIZE_TASK,
TERMINATE_TASK_JVM,
- KILL_TASK_JVM
+ KILL_TASK_JVM,
+ FINALIZE_TASK_DIRS,
+ FINALIZE_JOB,
}
/**
@@ -155,17 +131,56 @@ class LinuxTaskController extends TaskController {
try {
shExec.execute();
} catch (Exception e) {
- LOG.warn("Exception thrown while launching task JVM : " +
- StringUtils.stringifyException(e));
+ // TODO: check for 143 (SIGTERM) and 137 (SIGKILL) exit codes
+ LOG.warn("Exception thrown while launching task JVM : "
+ + StringUtils.stringifyException(e));
LOG.warn("Exit code from task is : " + shExec.getExitCode());
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+
+ @Override
+ void initializeTask(TaskControllerContext context)
+ throws IOException {
+ LOG.info("Going to initialize task for "
+ + context.task.getTaskID().toString());
+
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(TaskCommands.INITIALIZE_TASK,
+ context.env.conf.getUser(), buildInitializeTaskArgs(context),
+ context.env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from initializeTaskDirs is : "
+ + shExec.getExitCode());
+ LOG.warn("Exception thrown by initializeTaskDirs : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's initializeTaskDirs follows:");
+ logOutput(shExec.getOutput());
throw new IOException(e);
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("output after executing task jvm = " + shExec.getOutput());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's initializeTaskDirs follows:");
+ logOutput(shExec.getOutput());
}
}
+ private void logOutput(String output) {
+ String shExecOutput = output;
+ if (shExecOutput != null) {
+ for (String str : shExecOutput.split("\n")) {
+ LOG.info(str);
+ }
+ }
+ }
/**
* Returns list of arguments to be passed while launching task VM.
* See {@code buildTaskControllerExecutor(TaskCommands,
@@ -177,8 +192,11 @@ class LinuxTaskController extends TaskController {
List commandArgs = new ArrayList(3);
String taskId = context.task.getTaskID().toString();
String jobId = getJobId(context);
- LOG.debug("getting the task directory as: "
+ LOG.info("getting the task directory as: "
+ getTaskCacheDirectory(context));
+ LOG.info("getting the tt_root as " +getDirectoryChosenForTask(
+ new File(getTaskCacheDirectory(context)),
+ context) );
commandArgs.add(getDirectoryChosenForTask(
new File(getTaskCacheDirectory(context)),
context));
@@ -188,9 +206,43 @@ class LinuxTaskController extends TaskController {
}else {
commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
}
+ File logDir = new File(TaskLog.getBaseLogDir());
+ try {
+ commandArgs.add(logDir.getCanonicalPath());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
return commandArgs;
}
-
+
+ /**
+ * Returns list of arguments to be passed while initializing a new task.
+ * See {@code buildTaskControllerExecutor(TaskCommands,
+ * String, List, JvmEnv)} documentation.
+ * @param context
+ * @return Argument to be used while launching Task VM
+ */
+ private List buildInitializeTaskArgs(TaskControllerContext context) {
+ List commandArgs = new ArrayList(3);
+ String taskId = context.task.getTaskID().toString();
+ String jobId = getJobId(context);
+ commandArgs.add(jobId);
+ if (!context.task.isTaskCleanupTask()) {
+ commandArgs.add(taskId);
+ } else {
+ commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+ }
+ File logDir = new File(TaskLog.getBaseLogDir());
+ try {
+ commandArgs.add(logDir.getCanonicalPath());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return commandArgs;
+ }
+
// get the Job ID from the information in the TaskControllerContext
private String getJobId(TaskControllerContext context) {
String taskId = context.task.getTaskID().toString();
@@ -208,8 +260,9 @@ class LinuxTaskController extends TaskController {
String taskId = context.task.getTaskID().toString();
for (String dir : mapredLocalDirs) {
File mapredDir = new File(dir);
- File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
- jobId, taskId, context.task.isTaskCleanupTask()));
+ // TODO: fix
+ File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
+ jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
if (directory.equals(taskDir)) {
return dir;
}
@@ -219,68 +272,7 @@ class LinuxTaskController extends TaskController {
throw new IllegalArgumentException("invalid task cache directory "
+ directory.getAbsolutePath());
}
-
- /**
- * Setup appropriate permissions for directories and files that
- * are used by the task.
- *
- * As the LinuxTaskController launches tasks as a user, different
- * from the daemon, all directories and files that are potentially
- * used by the tasks are setup with appropriate permissions that
- * will allow access.
- *
- * Until secure data handling is implemented (see HADOOP-4491 and
- * HADOOP-4493, for e.g.), the permissions are set up to allow
- * read, write and execute access for everyone. This will be
- * changed to restricted access as data is handled securely.
- */
- void initializeTask(TaskControllerContext context) {
- // Setup permissions for the job and task cache directories.
- setupTaskCacheFileAccess(context);
- // setup permissions for task log directory
- setupTaskLogFileAccess(context);
- }
-
- // Allows access for the task to create log files under
- // the task log directory
- private void setupTaskLogFileAccess(TaskControllerContext context) {
- TaskAttemptID taskId = context.task.getTaskID();
- File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
- String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
- changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
- }
- // Allows access for the task to read, write and execute
- // the files under the job and task cache directories
- private void setupTaskCacheFileAccess(TaskControllerContext context) {
- String taskId = context.task.getTaskID().toString();
- JobID jobId = JobID.forName(getJobId(context));
- //Change permission for the task across all the disks
- for(String localDir : mapredLocalDirs) {
- File f = new File(localDir);
- File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
- jobId.toString(), taskId, context.task.isTaskCleanupTask()));
- if(taskCacheDir.exists()) {
- changeDirectoryPermissions(taskCacheDir.getPath(),
- FILE_PERMISSIONS, true);
- }
- }//end of local directory Iteration
- }
-
- // convenience method to execute chmod.
- private void changeDirectoryPermissions(String dir, String mode,
- boolean isRecursive) {
- int ret = 0;
- try {
- ret = FileUtil.chmod(dir, mode, isRecursive);
- } catch (Exception e) {
- LOG.warn("Exception in changing permissions for directory " + dir +
- ". Exception: " + e.getMessage());
- }
- if (ret != 0) {
- LOG.warn("Could not change permissions for directory " + dir);
- }
- }
/**
* Builds the command line for launching/terminating/killing task JVM.
* Following is the format for launching/terminating/killing task JVM
@@ -333,10 +325,12 @@ class LinuxTaskController extends TaskController {
// is different from what is set with respect with
// env.workDir. Hence building this from the taskId everytime.
String taskId = context.task.getTaskID().toString();
+ // TODO: fix the following line or document it.
File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
if(context.task.isTaskCleanupTask()) {
taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
}
+
return new File(cacheDirForJob, taskId).getAbsolutePath();
}
@@ -363,75 +357,11 @@ class LinuxTaskController extends TaskController {
if (pw != null) {
pw.close();
}
- // set execute permissions for all on the file.
File f = new File(commandFile);
- if (f.exists()) {
- f.setReadable(true, false);
- f.setExecutable(true, false);
+ if (!f.exists()) {
+ throw new IOException("Taskjvm.sh doesn't exist!!!");
}
- }
- }
-
-
- /**
- * Sets up the permissions of the following directories:
- *
- * Job cache directory
- * Archive directory
- * Hadoop log directories
- *
- */
- @Override
- void setup() {
- //set up job cache directory and associated permissions
- String localDirs[] = this.mapredLocalDirs;
- for(String localDir : localDirs) {
- //Cache root
- File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
- File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
- if(!cacheDirectory.exists()) {
- if(!cacheDirectory.mkdirs()) {
- LOG.warn("Unable to create cache directory : " +
- cacheDirectory.getPath());
- }
- }
- if(!jobCacheDirectory.exists()) {
- if(!jobCacheDirectory.mkdirs()) {
- LOG.warn("Unable to create job cache directory : " +
- jobCacheDirectory.getPath());
- }
- }
- //Give world writable permission for every directory under
- //mapred-local-dir.
- //Child tries to write files under it when executing.
- changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
- }//end of local directory manipulations
- //setting up perms for user logs
- File taskLog = TaskLog.getUserLogDir();
- changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
- }
-
- /*
- * Create Job directories across disks and set their permissions to 777
- * This way when tasks are run we just need to setup permissions for
- * task folder.
- */
- @Override
- void initializeJob(JobID jobid) {
- for(String localDir : this.mapredLocalDirs) {
- File jobDirectory = new File(localDir,
- TaskTracker.getLocalJobDir(jobid.toString()));
- if(!jobDirectory.exists()) {
- if(!jobDirectory.mkdir()) {
- LOG.warn("Unable to create job cache directory : "
- + jobDirectory.getPath());
- continue;
- }
- }
- //Should be recursive because the jar and work folders might be
- //present under the job cache directory
- changeDirectoryPermissions(
- jobDirectory.getPath(), FILE_PERMISSIONS, true);
+ FileUtil.setPermissions(f, FileUtil.sevenZeroZero);
}
}
@@ -443,15 +373,63 @@ class LinuxTaskController extends TaskController {
*
*
* @param context context of task which has to be passed kill signal.
+ * @throws IOException
*
*/
private List buildKillTaskCommandArgs(TaskControllerContext
- context){
+ context) throws IOException{
List killTaskJVMArgs = new ArrayList();
killTaskJVMArgs.add(context.pid);
return killTaskJVMArgs;
}
-
+
+ /**
+ * Build the command line to be passed to LinuxTaskController binary to
+ * finalize task directories once a task finishes.
+ *
+ * @param context
+ * @return
+ */
+ private List buildFinalizeTaskDirsCommandArgs(
+ TaskControllerContext context) {
+ List commandArgs = new ArrayList(3);
+ String taskId = context.task.getTaskID().toString();
+ String jobId = getJobId(context);
+ commandArgs.add(jobId);
+ if (!context.task.isTaskCleanupTask()) {
+ commandArgs.add(taskId);
+ } else {
+ commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+ }
+ File logDir = new File(TaskLog.getBaseLogDir());
+ try {
+ commandArgs.add(logDir.getCanonicalPath());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ if (context.env.conf.get("mapred.task.id").equals(
+ context.task.getTaskID().toString())) {
+ commandArgs.add(String.valueOf(0)); // true. First task
+ } else {
+ commandArgs.add(String.valueOf(1)); // false. Not the first task.
+ }
+ return commandArgs;
+ }
+
+ /**
+ * Build the command line to be passed to LinuxTaskController binary to
+ * actions for finalizing job.
+ *
+ * @param context
+ * @return
+ */
+ private List buildFinalizeJobCommandArgs(JobID jobId) {
+ List finalizeJobArgs = new ArrayList();
+ finalizeJobArgs.add(jobId.toString());
+ return finalizeJobArgs;
+ }
+
/**
* Convenience method used to sending appropriate Kill signal to the task
* VM
@@ -465,15 +443,33 @@ class LinuxTaskController extends TaskController {
LOG.info("Context task null not killing the JVM");
return;
}
- ShellCommandExecutor shExec = buildTaskControllerExecutor(
- command, context.env.conf.getUser(),
- buildKillTaskCommandArgs(context), context.env);
+ if (context.pid == null) {
+ LOG.warn("pid is null. Cannot kill task");
+ return;
+ }
+
+ List cmdArgs = null;
+ if (command.equals(TaskCommands.TERMINATE_TASK_JVM)) {
+ cmdArgs = buildFinalizeTaskDirsCommandArgs(context);
+ cmdArgs.addAll(buildKillTaskCommandArgs(context));
+ } else if (command.equals(TaskCommands.KILL_TASK_JVM)) {
+ cmdArgs = buildKillTaskCommandArgs(context);
+ }
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(command, context.env.conf.getUser(),
+ cmdArgs, context.env);
try {
shExec.execute();
} catch (Exception e) {
- LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+ LOG.warn("Exit code from finishTask is : " + shExec.getExitCode());
+ LOG.info("Output from LinuxTaskController's finishTask follows:");
+ logOutput(shExec.getOutput());
throw new IOException(e);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's finishTask follows:");
+ logOutput(shExec.getOutput());
+ }
}
@Override
@@ -498,6 +494,56 @@ class LinuxTaskController extends TaskController {
protected String getTaskControllerExecutablePath() {
return taskControllerExe;
- }
+ }
+
+ @Override
+ void finalizeTaskDirs(TaskControllerContext context)
+ throws IOException {
+ LOG.info("Going to finalize task directories for "
+ + context.task.getTaskID().toString());
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(TaskCommands.FINALIZE_TASK_DIRS,
+ context.env.conf.getUser(),
+ buildFinalizeTaskDirsCommandArgs(context), context.env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from finalizeTaskDirs is : " + shExec.getExitCode());
+ LOG.warn("Exception thrown by finalizeTaskDirs : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's finalizeTaskDirs follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's finalizeTaskDirs follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
+
+ @Override
+ void finalizeJob(JobID jobId, String workDir, String user)
+ throws IOException {
+ LOG.info("Going to finalize job for " + jobId);
+ JvmEnv env =
+ new JvmEnv(null, null, null, null, -1, new File(workDir), null, null);
+ ShellCommandExecutor shExec =
+ buildTaskControllerExecutor(TaskCommands.FINALIZE_JOB, user,
+ buildFinalizeJobCommandArgs(jobId), env);
+ try {
+ shExec.execute();
+ } catch (Exception e) {
+ LOG.warn("Exit code from finalizeJob is : " + shExec.getExitCode());
+ LOG.warn("Exception thrown by finalizeJob : "
+ + StringUtils.stringifyException(e));
+ LOG.info("Output from LinuxTaskController's finalizeJob follows:");
+ logOutput(shExec.getOutput());
+ throw new IOException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Output from LinuxTaskController's finalizeJob follows:");
+ logOutput(shExec.getOutput());
+ }
+ }
}
diff --git src/java/org/apache/hadoop/mapred/LocalJobRunner.java src/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 407f5e6..a3a24c8 100644
--- src/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ src/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -191,10 +191,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn = this.mapoutputFile.getInputFileForWrite(
- mapId.getTaskID(),reduceId,
- localFs.getFileStatus(mapOut).getLen());
+ Path mapOut = this.mapoutputFile.getOutputFile();
+ Path reduceIn =
+ this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(),
+ localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@@ -224,10 +224,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
}
} finally {
for (TaskAttemptID mapId: mapIds) {
- this.mapoutputFile.removeAll(mapId);
+ this.mapoutputFile.removeAll();
}
if (numReduceTasks == 1) {
- this.mapoutputFile.removeAll(reduceId);
+ this.mapoutputFile.removeAll();
}
}
// delete the temporary directory in output directory
diff --git src/java/org/apache/hadoop/mapred/MapOutputFile.java src/java/org/apache/hadoop/mapred/MapOutputFile.java
index 30b71c9..0af1151 100644
--- src/java/org/apache/hadoop/mapred/MapOutputFile.java
+++ src/java/org/apache/hadoop/mapred/MapOutputFile.java
@@ -31,7 +31,9 @@ class MapOutputFile {
private JobConf conf;
private JobID jobId;
-
+
+ static final String INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
MapOutputFile() {
}
@@ -42,132 +44,151 @@ class MapOutputFile {
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
- /** Return the path to local map output file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", conf);
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/file.out", conf);
}
- /** Create a local map output file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", size, conf);
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/file.out", size, conf);
}
- /** Return the path to a local map output index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index", conf);
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/file.out.index", conf);
}
- /** Create a local map output index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output index file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index",
- size, conf);
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/file.out.index", size, conf);
}
- /** Return a local map spill file created earlier.
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill file created earlier.
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill"
- + spillNumber + ".out", conf);
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/spill" + spillNumber + ".out", conf);
}
- /** Create a local map spill file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out", size, conf);
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/spill" + spillNumber + ".out", size, conf);
}
- /** Return a local map spill index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill index file created earlier
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out.index", conf);
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/spill" + spillNumber + ".out.index", conf);
}
- /** Create a local map spill index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill index file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" + spillNumber +
- ".out.index", size, conf);
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker
+ .getBaseIntermediateOutputDir()
+ + "/spill" + spillNumber + ".out.index", size, conf);
}
- /** Return a local reduce input file created earlier
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
*/
- public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId + ".out",
- conf);
+ public Path getInputFile(int mapId)
+ throws IOException {
+ String outputDir = TaskTracker.getBaseIntermediateOutputDir();
+ return lDirAlloc.getLocalPathToRead(String.format(
+ INPUT_FILE_FORMAT_STRING, outputDir, Integer.valueOf(mapId)), conf);
}
- /** Create a local reduce input file name.
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId,
- long size)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId.getId() + ".out",
- size, conf);
+ public Path getInputFileForWrite(TaskID mapId, long size)
+ throws IOException {
+ String outputDir = TaskTracker.getBaseIntermediateOutputDir();
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ INPUT_FILE_FORMAT_STRING, outputDir, mapId.getId()), size, conf);
}
/** Removes all of the files related to a task. */
- public void removeAll(TaskAttemptID taskId) throws IOException {
- conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), taskId.toString())
-);
+ public void removeAll()
+ throws IOException {
+ conf.deleteLocalFiles(TaskTracker.getBaseIntermediateOutputDir());
}
public void setConf(Configuration conf) {
diff --git src/java/org/apache/hadoop/mapred/MapTask.java src/java/org/apache/hadoop/mapred/MapTask.java
index 5f8b88b..96dd469 100644
--- src/java/org/apache/hadoop/mapred/MapTask.java
+++ src/java/org/apache/hadoop/mapred/MapTask.java
@@ -1164,8 +1164,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
@@ -1229,9 +1229,9 @@ class MapTask extends Task {
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1258,8 +1258,8 @@ class MapTask extends Task {
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
@@ -1295,9 +1295,9 @@ class MapTask extends Task {
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1387,14 +1387,14 @@ class MapTask extends Task {
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
+ filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+ rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
@@ -1405,7 +1405,7 @@ class MapTask extends Task {
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
@@ -1413,10 +1413,10 @@ class MapTask extends Task {
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
diff --git src/java/org/apache/hadoop/mapred/MapTaskRunner.java src/java/org/apache/hadoop/mapred/MapTaskRunner.java
index 918b2a6..94750d0 100644
--- src/java/org/apache/hadoop/mapred/MapTaskRunner.java
+++ src/java/org/apache/hadoop/mapred/MapTaskRunner.java
@@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner {
return false;
}
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
diff --git src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/ReduceTask.java
index 64780db..591b5b1 100644
--- src/java/org/apache/hadoop/mapred/ReduceTask.java
+++ src/java/org/apache/hadoop/mapred/ReduceTask.java
@@ -208,7 +208,7 @@ class ReduceTask extends Task {
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+ fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
@@ -1276,12 +1276,9 @@ class ReduceTask extends Task {
// a temp filename. If this file gets created in ramfs, we're fine,
// else, we will check the localFS to find a suitable final location
// for this path
- TaskAttemptID reduceId = reduceTask.getTaskID();
- Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
- reduceId.getJobID().toString(),
- reduceId.toString())
- + "/map_" +
- loc.getTaskId().getId() + ".out");
+ Path filename =
+ new Path("/" + TaskTracker.getBaseIntermediateOutputDir()
+ + "/map_" + loc.getTaskId().getId() + ".out");
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
@@ -1332,6 +1329,8 @@ class ReduceTask extends Task {
throw new IOException("Failed to rename map output " +
tmpMapOutput + " to " + filename);
}
+ // Secure permissions from the tmpMapOutput still remain. No need
+ // for doing anything more.
synchronized (mapOutputFilesOnDisk) {
addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
@@ -2282,8 +2281,8 @@ class ReduceTask extends Task {
sortPhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), inMemToDiskBytes);
+ final Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
@@ -2577,8 +2576,8 @@ class ReduceTask extends Task {
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), mergeOutputSize);
+ Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
diff --git src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
index 903354e..fc6d1ae 100644
--- src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
+++ src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
@@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunner {
}
// cleanup from failures
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
@@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunner {
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
getTask().getProgress().setStatus("closed");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
diff --git src/java/org/apache/hadoop/mapred/SpillRecord.java src/java/org/apache/hadoop/mapred/SpillRecord.java
index 7595898..b9c9b12 100644
--- src/java/org/apache/hadoop/mapred/SpillRecord.java
+++ src/java/org/apache/hadoop/mapred/SpillRecord.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DiskChecker;
import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
diff --git src/java/org/apache/hadoop/mapred/TaskController.java src/java/org/apache/hadoop/mapred/TaskController.java
index 030bf6b..9f842f7 100644
--- src/java/org/apache/hadoop/mapred/TaskController.java
+++ src/java/org/apache/hadoop/mapred/TaskController.java
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -45,17 +47,55 @@ abstract class TaskController implements Configurable {
public Configuration getConf() {
return conf;
}
-
+
+ // The list of directory paths specified in the
+ // variable mapred.local.dir. This is used to determine
+ // which among the list of directories is picked up
+ // for storing data for a particular task.
+ protected String[] mapredLocalDirs;
+
public void setConf(Configuration conf) {
this.conf = conf;
+ mapredLocalDirs = conf.getStrings("mapred.local.dir");
}
-
- /**
- * Setup task controller component.
+
+ private static final String FILE_PERMISSIONS = "ugo+rwx";
+
+/**
+ * Sets up the permissions of the following directories:
+ *
+ * Job cache directory Archive directory Hadoop log directories
*
*/
- abstract void setup();
-
+ void setup() {
+ // set up job cache directory and associated permissions
+ String localDirs[] = this.mapredLocalDirs;
+ for (String localDir : localDirs) {
+ // Cache root
+ File cacheDirectory = new File(localDir, TaskTracker.getCacheSubdir());
+ File jobCacheDirectory =
+ new File(localDir, TaskTracker.getJobCacheSubdir());
+ if (!cacheDirectory.exists()) {
+ if (!cacheDirectory.mkdirs()) {
+ LOG.warn("Unable to create cache directory : "
+ + cacheDirectory.getPath());
+ }
+ FileUtil
+ .setPermissions(cacheDirectory, FileUtil.sevenSevenSeven);
+ }
+ if (!jobCacheDirectory.exists()) {
+ if (!jobCacheDirectory.mkdirs()) {
+ LOG.warn("Unable to create job cache directory : "
+ + jobCacheDirectory.getPath());
+ FileUtil.setPermissions(jobCacheDirectory,
+ FileUtil.sevenFiveFive);
+ }
+ }
+ }
+ // setting up permissions for user logs
+ File taskLog = TaskLog.getUserLogDir();
+ FileUtil.setPermissions(taskLog, FileUtil.sevenFiveFive);
+ }
/**
* Launch a task JVM
@@ -65,7 +105,7 @@ abstract class TaskController implements Configurable {
*/
abstract void launchTaskJVM(TaskControllerContext context)
throws IOException;
-
+
/**
* Top level cleanup a task JVM method.
*
@@ -90,23 +130,22 @@ abstract class TaskController implements Configurable {
}
killTask(context);
}
-
- /**
- * Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, PID and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context);
-
-
+
+ /** Perform initializing actions required before a task can run.
+ *
+ * For instance, this method can be used to setup appropriate
+ * access permissions for files and directories that will be
+ * used by tasks. Tasks use the job cache, log, PID and distributed cache
+ * directories and files as part of their functioning. Typically,
+ * these files are shared between the daemon and the tasks
+ * themselves. So, a TaskController that is launching tasks
+ * as different users can implement this method to setup
+ * appropriate ownership and permissions for these directories
+ * and files.
+ */
+ abstract void initializeTask(TaskControllerContext context)
+ throws IOException;
+
/**
* Contains task information required for the task controller.
*/
@@ -122,14 +161,6 @@ abstract class TaskController implements Configurable {
// waiting time before sending SIGKILL to task JVM after sending SIGTERM
long sleeptimeBeforeSigkill;
}
-
- /**
- * Method which is called after the job is localized so that task controllers
- * can implement their own job localization logic.
- *
- * @param tip Task of job for which localization happens.
- */
- abstract void initializeJob(JobID jobId);
/**
* Sends a graceful terminate signal to taskJVM and it sub-processes.
@@ -144,6 +175,24 @@ abstract class TaskController implements Configurable {
*
* @param context task context
*/
-
abstract void killTask(TaskControllerContext context);
+
+ /**
+ * Take task-controller specific actions to finalize task directories once the
+ * task finishes
+ *
+ * @param context
+ * @throws IOException
+ */
+ abstract void finalizeTaskDirs(TaskControllerContext context)
+ throws IOException;
+
+ /**
+ * Take task-controller specific actions to finalize job on its finishing
+ *
+ * @param context
+ * @throws IOException
+ */
+ abstract void finalizeJob(JobID jobId, String workDir, String user)
+ throws IOException;
}
diff --git src/java/org/apache/hadoop/mapred/TaskLog.java src/java/org/apache/hadoop/mapred/TaskLog.java
index 9baafd8..b2aca7c 100644
--- src/java/org/apache/hadoop/mapred/TaskLog.java
+++ src/java/org/apache/hadoop/mapred/TaskLog.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
@@ -55,8 +56,7 @@ public class TaskLog {
LogFactory.getLog(TaskLog.class);
private static final File LOG_DIR =
- new File(System.getProperty("hadoop.log.dir"),
- "userlogs").getAbsoluteFile();
+ new File(getBaseLogDir(), "userlogs").getAbsoluteFile();
static LocalFileSystem localFS = null;
static {
@@ -156,8 +156,12 @@ public class TaskLog {
return new File(getBaseDir(taskid), "log.index");
}
}
-
- private static File getBaseDir(String taskid) {
+
+ static String getBaseLogDir() {
+ return System.getProperty("hadoop.log.dir");
+ }
+
+ static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
private static long prevOutLength;
@@ -196,6 +200,10 @@ public class TaskLog {
Path indexFilePath = new Path(indexFile.getAbsolutePath());
Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
localFS.rename (tmpIndexFilePath, indexFilePath);
+
+ // TODO: Not needed after MAPREDUCEE-AAAA
+ // Retain the original permissions for the newly created file
+ localFS.setPermission(indexFilePath, new FsPermission((short)00644));
}
private static void resetPrevLengths(TaskAttemptID firstTaskid) {
prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
diff --git src/java/org/apache/hadoop/mapred/TaskRunner.java src/java/org/apache/hadoop/mapred/TaskRunner.java
index 86c5868..bb52b03 100644
--- src/java/org/apache/hadoop/mapred/TaskRunner.java
+++ src/java/org/apache/hadoop/mapred/TaskRunner.java
@@ -135,12 +135,6 @@ abstract class TaskRunner extends Thread {
// start with same classpath as parent process
appendSystemClasspaths(classPaths);
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- LOG.fatal("Mkdirs failed to create " + workDir.toString());
- }
- }
-
// include the user specified classpath
appendJobJarClasspaths(conf.getJar(), classPaths);
@@ -227,6 +221,9 @@ abstract class TaskRunner extends Thread {
if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
+ // attempt's work dir is already private, no need for setting private
+ // permissions for the tmp directory inside.
+
vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
// Add classpath.
@@ -274,9 +271,13 @@ abstract class TaskRunner extends Thread {
// Set up the redirection of the task's stdout and stderr streams
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
- boolean b = stdout.getParentFile().mkdirs();
+ File logDir = stdout.getParentFile();
+ boolean b = logDir.mkdirs();
if (!b) {
LOG.warn("mkdirs failed. Ignoring");
+ } else {
+ // TODO: This is not needed once MAPREDUCE-AAAA
+ FileUtil.setPermissions(logDir, FileUtil.sevenSevenSeven);
}
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
@@ -389,10 +390,10 @@ abstract class TaskRunner extends Thread {
static File formWorkDir(LocalDirAllocator lDirAlloc,
TaskAttemptID task, boolean isCleanup, JobConf conf)
throws IOException {
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.toString(), isCleanup)
- + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
+ File workDir =
+ new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getTaskWorkDir(task.getJobID().toString(), task
+ .toString(), isCleanup), conf).toString());
return workDir;
}
@@ -460,7 +461,11 @@ abstract class TaskRunner extends Thread {
localFs.delete(localTaskFile, true);
OutputStream out = localFs.create(localTaskFile);
try {
+ String originalMapredLocalDir = conf.get("mapred.local.dir");
+ conf.set("mapred.local.dir", conf.get("mapred.child.local.dir"));
conf.writeXml(out);
+ // Reset back the original mapred.local.dir
+ conf.set("mapred.local.dir", originalMapredLocalDir);
} finally {
out.close();
}
diff --git src/java/org/apache/hadoop/mapred/TaskTracker.java src/java/org/apache/hadoop/mapred/TaskTracker.java
index c922ba3..531fca6 100644
--- src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -189,6 +189,9 @@ public class TaskTracker
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
private static final String OUTPUT = "output";
+ // TODO: fix name
+ private static final String TASKWORK = "task-work";
+ private static final String JARSDIR = "jars";
private JobConf fConf;
private FileSystem localFs;
private int maxMapSlots;
@@ -385,16 +388,34 @@ public class TaskTracker
}
static String getLocalJobDir(String jobid) {
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
static String getLocalTaskDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid, false) ;
+ return getLocalTaskDir(jobid, taskid, false);
}
+ /**
+ * Intermediate output directory for a given task as seen by the child.
+ * TODO
+ * @param jobid
+ * @param taskid
+ * @return
+ */
+ static String getBaseIntermediateOutputDir() {
+ return TaskTracker.OUTPUT;
+ }
+
+ /**
+ * Intermediate output directory for a given task as seen by the TaskTracker.
+ *
+ * @param jobid
+ * @param taskid
+ * @return
+ */
static String getIntermediateOutputDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid)
- + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.OUTPUT;
}
static String getLocalTaskDir(String jobid,
@@ -406,7 +427,18 @@ public class TaskTracker
}
return taskDir;
}
-
+
+ static String getTaskWorkDir(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String dir =
+ getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.TASKWORK
+ + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ dir = dir + TASK_CLEANUP_SUFFIX;
+ }
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
if (tip != null) {
@@ -567,7 +599,7 @@ public class TaskTracker
* startup, to remove any leftovers from previous run.
*/
public void cleanupStorage() throws IOException {
- this.fConf.deleteLocalFiles();
+ //this.fConf.deleteLocalFiles();
}
// Object on wait which MapEventsFetcherThread is going to wait.
@@ -762,28 +794,27 @@ public class TaskTracker
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "job.xml",
- jobFileSize, fConf);
+
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if (!rjob.localized) {
-
+
+ // Initialize the job directories first
FileSystem localFs = FileSystem.getLocal(fConf);
- // this will happen on a partial execution of localizeJob.
- // Sometimes the job.xml gets copied but copying job.jar
- // might throw out an exception
- // we should clean up and then try again
- Path jobDir = localJobFile.getParent();
- if (localFs.exists(jobDir)){
- localFs.delete(jobDir, true);
- boolean b = localFs.mkdirs(jobDir);
- if (!b)
- throw new IOException("Not able to create job directory "
- + jobDir.toString());
- }
+ initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+ Path localJobFile = lDirAlloc.getLocalPathForWrite(
+ getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "job.xml",
+ jobFileSize, fConf);
+
+ // Download job.xml
systemFS.copyToLocalFile(jobFile, localJobFile);
+ // Job dir is not private, set private permissions for job file.
+ FileUtil.setPermissions(new File(localJobFile.toUri().getPath()),
+ FileUtil.sevenZeroZero);
+
+ // TODO: Fix this. TT config is not loaded at all!
JobConf localJobConf = new JobConf(localJobFile);
// create the 'work' directory
@@ -795,6 +826,9 @@ public class TaskTracker
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
+ // Job dir is not private, set private permissions for work dir.
+ FileUtil.setPermissions(new File(workDir.toUri().getPath()),
+ FileUtil.sevenZeroZero);
System.setProperty("job.local.dir", workDir.toString());
localJobConf.set("job.local.dir", workDir.toString());
@@ -811,35 +845,145 @@ public class TaskTracker
}
// Here we check for and we check five times the size of jarFileSize
// to accommodate for unjarring the jar file in work directory
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "jars",
- 5 * jarFileSize, fConf), "job.jar");
- if (!localFs.mkdirs(localJarFile.getParent())) {
- throw new IOException("Mkdirs failed to create jars directory ");
- }
+ localJarFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobDir(jobId
+ .toString())
+ + Path.SEPARATOR + "jars" + Path.SEPARATOR + "job.jar",
+ 5 * jarFileSize, fConf);
+
+ // Download job.jar
+ // jars dir is already private. No need for setting private
+ // permissions for localJarFile
systemFS.copyToLocalFile(jarFilePath, localJarFile);
+
localJobConf.setJar(localJarFile.toString());
OutputStream out = localFs.create(localJobFile);
+ // Job dir is not private, set private permissions for the re-created
+ // job file.
+ FileUtil.setPermissions(new File(localJobFile.toUri().getPath()),
+ FileUtil.sevenZeroZero);
try {
localJobConf.writeXml(out);
} finally {
out.close();
}
- // also unjar the job.jar files
- RunJar.unJar(new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()));
+ // also unjar the job.jar files
+ // jars dir is already private. No need for setting private
+ // permissions for the un-jarred files.
+ RunJar.unJar(new File(localJarFile.toString()), new File(
+ localJarFile.getParent().toString()));
}
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
rjob.localized = true;
rjob.jobConf = localJobConf;
- taskController.initializeJob(jobId);
}
}
launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
+ /**
+ * Prepare the job directories for a given job. To be called only if the job
+ * is not already localized.
+ *
+ * @param jobId
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private void initializeJobDirs(JobID jobId, FileSystem fs, String[] localDirs)
+ throws IOException {
+ boolean initStatus = true;
+ String jobDirPath = getLocalJobDir(jobId.toString());
+ for (String localDir : localDirs) {
+ Path jobDir = new Path(localDir, jobDirPath);
+ if (fs.exists(jobDir)) {
+ // this will happen on a partial execution of localizeJob.
+ // Sometimes the job.xml gets copied but copying job.jar
+ // might throw out an exception
+ // we should clean up and then try again
+ fs.delete(jobDir, true);
+ }
+
+ boolean jobDirStatus = fs.mkdirs(jobDir);
+ if (!jobDirStatus) {
+ LOG.warn("Not able to create job directory " + jobDir.toString());
+ }
+
+ Path taskWorkDir = new Path(jobDir, TaskTracker.TASKWORK);
+ if (!fs.mkdirs(taskWorkDir)) {
+ LOG.warn("Not able to create job's " + TaskTracker.TASKWORK
+ + " directory for " + jobDir.toString());
+ }
+
+ Path jarsDir = new Path(jobDir, TaskTracker.JARSDIR);
+ if (!fs.mkdirs(jarsDir)) {
+ LOG.warn("Not able to create job's " + TaskTracker.JARSDIR
+ + " directory for " + jobDir.toString());
+ }
+
+ initStatus = initStatus || jobDirStatus;
+ // TODO: fix return status for the following.
+ FileUtil.setPermissions(new File(jobDir.toUri().getPath()),
+ FileUtil.sevenFiveFive);
+ FileUtil.setPermissions(new File(taskWorkDir.toUri().getPath()),
+ FileUtil.sevenFiveFive);
+ FileUtil.setPermissions(new File(jarsDir.toUri().getPath()),
+ FileUtil.sevenZeroZero);
+ }
+ if (!initStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobId.toString());
+ }
+ }
+
+ /**
+ * create taskDirs on all the disks. Otherwise, in some cases, like when
+ * LinuxTaskController is in use, child might wish to balance load across
+ * disks but cannot itself create attempt directory because of the fact that
+ * job directory is writable only by the TT.
+ *
+ * @param jobId
+ * @param attemptId
+ * @param isCleanupAttempt
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private void initializeAttemptDirs(String jobId, String attemptId,
+ boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
+ throws IOException {
+
+ boolean initStatus = true;
+ String attemptDirPath =
+ getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
+
+ for (String localDir : localDirs) {
+
+ Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+ boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+ if (!attemptDirStatus) {
+ LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ + " couldn't be created.");
+ }
+
+ // job dir is not private, set private permissions for attempt-dir
+ FileUtil.setPermissions(new File(localAttemptDir.toUri().getPath()),
+ FileUtil.sevenZeroZero);
+ // TODO: fix return code of the above.
+
+ initStatus = initStatus || attemptDirStatus;
+ }
+
+ if (!initStatus) {
+ throw new IOException("Not able to initialize attempt directories "
+ + "in any of the configured local directories for the attempt "
+ + attemptId.toString());
+ }
+ }
+
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
synchronized (tip) {
tip.setJobConf(jobConf);
@@ -1381,6 +1525,16 @@ public class TaskTracker
indexCache.removeMap(tip.getTask().getTaskID().toString());
}
}
+ // Finalize the job directories
+ try {
+ taskController.finalizeJob(jobId, TaskTracker.getLocalJobDir(jobId
+ .toString())
+ + Path.SEPARATOR + "work", rjob.jobConf.getUser());
+ } catch (IOException ioe) {
+ LOG.warn("Finalize job failed with the exception : "
+ + StringUtils.stringifyException(ioe));
+ }
+
// Delete the job directory for this
// task if the job is done/failed
if (!rjob.keepJobFiles){
@@ -1557,7 +1711,7 @@ public class TaskTracker
mapOutputFile.setJobId(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path tmp_output = mapOutputFile.getOutputFile(taskId);
+ Path tmp_output = mapOutputFile.getOutputFile();
if(tmp_output == null)
return 0;
FileSystem localFS = FileSystem.getLocal(conf);
@@ -1835,16 +1989,22 @@ public class TaskTracker
private void localizeTask(Task task) throws IOException{
+ FileSystem localFs = FileSystem.getLocal(fConf);
+
+ // create taskDirs on all the disks.
+ initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask(), localFs, fConf
+ .getStrings("mapred.local.dir"));
+
Path localTaskDir =
lDirAlloc.getLocalPathForWrite(
TaskTracker.getLocalTaskDir(task.getJobID().toString(),
task.getTaskID().toString(), task.isTaskCleanupTask()),
defaultJobConf );
- FileSystem localFs = FileSystem.getLocal(fConf);
- if (!localFs.mkdirs(localTaskDir)) {
- throw new IOException("Mkdirs failed to create "
- + localTaskDir.toString());
+ if (!localFs.exists(localTaskDir)) {
+ throw new IOException("localTaskDir "
+ + localTaskDir.toString() + " doesn't exist!!" );
}
// create symlink for ../work if it already doesnt exist
@@ -1855,24 +2015,46 @@ public class TaskTracker
String link = localTaskDir.getParent().toString()
+ Path.SEPARATOR + "work";
File flink = new File(link);
- if (!flink.exists())
+ if (!flink.exists()) {
FileUtil.symLink(workDir, link);
-
+ // job dir is not private, set private permissions for the newly created
+ // link
+ FileUtil.setPermissions(flink, FileUtil.sevenZeroZero);
+ }
+
// create the working-directory of the task
- Path cwd = lDirAlloc.getLocalPathForWrite(
- getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ Path cwd =
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+ .toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
+ // attempt dir is not private, no need for setting private permissions for
+ // the task workDir.
Path localTaskFile = new Path(localTaskDir, "job.xml");
task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
+
+ // Prepare the mapred.local.dir for the child. The child is
+ // sand-boxed now. Whenever it uses LocalDirAllocator from now on, it
+ // will only see files inside the attempt-directory.
+ String childMapredLocalDir = "";
+ LOG.info("mapred.local.dir for child before changing : "
+ + fConf.get("mapred.local.dir"));
+ for (String mapredLocalDir : fConf.getStrings("mapred.local.dir")) {
+ childMapredLocalDir +=
+ mapredLocalDir
+ + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(task.getJobID().toString(), task
+ .getTaskID().toString(), task.isJobCleanupTask()) + ",";
+ }
+ LOG.info("mapred.child.local.dir for child : " + childMapredLocalDir);
+ localJobConf.set("mapred.child.local.dir", childMapredLocalDir);
+
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
@@ -1914,11 +2096,19 @@ public class TaskTracker
localJobConf.setNumTasksToExecutePerJvm(1);
}
OutputStream out = localFs.create(localTaskFile);
+ // attempt dir is already private, no need for setting private permissions
+ // for the newly created localTaskFile.
+
try {
+ localJobConf.set("mapred.local.dir", localJobConf
+ .get("mapred.child.local.dir"));
localJobConf.writeXml(out);
+ // Reset back the original mapred.local.dir
+ localJobConf.set("mapred.local.dir", fConf.get("mapred.local.dir"));
} finally {
out.close();
}
+
task.setConf(localJobConf);
}
@@ -2049,6 +2239,22 @@ public class TaskTracker
* The task is reporting that it's done running
*/
public synchronized void reportDone() {
+
+ // Finalize the task directories if needed.
+ if (localJobConf.getNumTasksToExecutePerJvm() != 1) {
+ // JVMs are reused. Task succeeded. Try finalizing the task directories.
+ try {
+ jvmManager.finalizeTaskDirs(runner);
+ } catch (IOException e) {
+ LOG.warn("Finalizing task-directories failed with the exception : "
+ + StringUtils.stringifyException(e));
+ }
+ } else {
+ // JVMs are not reused, we do task-directories' finalization when the
+ // JVM finishes. See the final block in JvmRunner.runChild()
+ ; // do nothing
+ }
+
if (isCleaningup()) {
if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
this.taskStatus.setRunState(TaskStatus.State.FAILED);
@@ -2165,13 +2371,12 @@ public class TaskTracker
}
File workDir = null;
try {
- workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(
- task.getJobID().toString(),
- task.getTaskID().toString(),
- task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- localJobConf). toString());
+ workDir =
+ new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getTaskWorkDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task
+ .isTaskCleanupTask()), localJobConf)
+ .toString());
} catch (IOException e) {
LOG.warn("Working Directory of the task " + task.getTaskID() +
"doesnt exist. Caught exception " +
@@ -2451,6 +2656,8 @@ public class TaskTracker
}
String taskDir = getLocalTaskDir(task.getJobID().toString(),
taskId.toString(), task.isTaskCleanupTask());
+ String taskWorkDir = getTaskWorkDir(task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2465,18 +2672,25 @@ public class TaskTracker
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
taskDir));
+ directoryCleanupThread.addToQueue(localFs,
+ getLocalFiles(defaultJobConf,
+ taskWorkDir));
}
else {
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
taskDir+"/job.xml"));
+ // TODO: fix
+ directoryCleanupThread.addToQueue(localFs,
+ getLocalFiles(defaultJobConf,
+ taskWorkDir+"/taskjvm.sh"));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir+"/work"));
+ taskWorkDir));
}
}
} catch (Throwable ie) {
diff --git src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
index 521c758..d7e8b96 100644
--- src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
+++ src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
@@ -97,7 +97,7 @@ public class ClusterWithLinuxTaskController extends TestCase {
MyLinuxTaskController.class.getName());
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
- .toString(), 1, null, null, conf);
+ .toString(), 4, null, null, conf);
// Get the configured taskcontroller-path
String path = System.getProperty("taskcontroller-path");
@@ -143,8 +143,18 @@ public class ClusterWithLinuxTaskController extends TestCase {
PrintWriter writer =
new PrintWriter(new FileOutputStream(configurationFile));
- writer.println(String.format("mapred.local.dir=%s", mrCluster
- .getTaskTrackerLocalDir(0)));
+ StringBuffer sb = new StringBuffer();
+ String[] localDirs = mrCluster.getTaskTrackerRunner(0).getLocalDirs();
+ for(int i = 0 ; i < localDirs.length; i++) {
+ sb.append(localDirs[i]);
+ if((i + 1) != localDirs.length) {
+ sb.append(",");
+ }
+ }
+ writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+
+ writer
+ .println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
writer.flush();
writer.close();
diff --git src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
index 5c26fd1..5f2c49f 100644
--- src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
+++ src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
@@ -171,7 +171,7 @@ public class MiniMRCluster {
StringBuffer localPath = new StringBuffer();
for(int i=0; i < numDir; ++i) {
File ttDir = new File(localDirBase,
- Integer.toString(trackerId) + "_" + 0);
+ Integer.toString(trackerId) + "_" + i);
if (!ttDir.mkdirs()) {
if (!ttDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + ttDir);
diff --git src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
index f923f88..c75da59 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
+import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
/**
* Test a java-based mapred job with LinuxTaskController running the jobs as a
@@ -39,13 +41,36 @@ public class TestJobExecutionAsDifferentUser extends
startCluster();
Path inDir = new Path("input");
Path outDir = new Path("output");
- RunningJob job =
- UtilsForTests.runJobSucceed(getClusterConf(), inDir, outDir);
+
+ RunningJob job;
+
+ // Run a job with zero maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 0, 0);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with 1 map and zero reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 0);
+ job.waitForCompletion();
assertTrue("Job failed", job.isSuccessful());
assertOwnerShip(outDir);
+
+ // Run a normal job with maps/reduces
+ job = UtilsForTests.runJob(getClusterConf(), inDir, outDir, 1, 1);
+ job.waitForCompletion();
+ assertTrue("Job failed", job.isSuccessful());
+ assertOwnerShip(outDir);
+
+ // Run a job with jvm reuse
+ JobConf myConf = getClusterConf();
+ myConf.set("mapred.job.reuse.jvm.num.tasks", "-1");
+ String[] args = { "-m", "6", "-r", "3", "-mt", "1000", "-rt", "1000" };
+ assertEquals(0, ToolRunner.run(myConf, new SleepJob(), args));
+ // TODO: check reuse
}
- public void testEnvironment() throws IOException {
+ public void TestEnvironment() throws IOException {
if (!shouldRun()) {
return;
}
diff --git src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
index 37c2c94..99633d9 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
@@ -348,15 +348,26 @@ public class TestKillSubProcesses extends TestCase {
FileSystem fs = FileSystem.getLocal(conf);
TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
scriptDir = new Path(TEST_ROOT_DIR + "/script");
- if(fs.exists(scriptDir)){
+ if (fs.exists(scriptDir)) {
fs.delete(scriptDir, true);
}
+
+ // Create the directory and set open permissions so that the TT can
+ // access.
+ fs.mkdirs(scriptDir);
+ fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL,
+ FsAction.ALL));
+
// create shell script
Random rm = new Random();
Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
+ ".sh");
String shellScript = scriptPath.toString();
+
+ // Construct the script. Set umask to 0000 so that TT can access all the
+ // files.
String script =
+ "umask 0000\n" +
"echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
"echo hello\n" +
"trap 'echo got SIGTERM' 15 \n" +
diff --git src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
index 665dc33..2f70803 100644
--- src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
+++ src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
@@ -289,7 +289,7 @@ public class TestMapRed extends TestCase {
first = false;
MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path input = mapOutputFile.getInputFile(0, taskId);
+ Path input = mapOutputFile.getInputFile(0);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =