commit d3bb33c07fac48efbcc0ff84847d8dbfe40175ed Author: Eric Yang Date: Wed Mar 14 21:41:30 2018 -0400 Draft 2 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 0254dac..da63b2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; @@ -436,28 +438,7 @@ public void writeLaunchEnv(OutputStream out, Map environment, } sb.initializeLogs(user, logDir); sb.echo("Launching container"); - if (environment.containsKey(ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE) && - environment.containsKey(ENV_CONTAINER_TYPE) && - environment.get(ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE) - .equalsIgnoreCase("true") && - environment.get(ENV_CONTAINER_TYPE) - .equals("docker") - ) { - for (String launchCmd : command) { - sb.env(ENV_DOCKER_CONTAINER_RUN_CMD,launchCmd); - } - StringBuilder userEnv = new StringBuilder(); - for (Map.Entry kv: environment.entrySet()) { - userEnv.append("-e "); - userEnv.append(kv.getKey()); - userEnv.append("=${"); - userEnv.append(kv.getKey()); - userEnv.append("} "); - } - sb.env(ENV_DOCKER_CONTAINER_RUN_USER_ENV, userEnv.toString()); - } else { - sb.command(command); - } + sb.command(command); PrintStream pout = null; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java index 889eb40..b46d662 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java @@ -726,6 +726,8 @@ public void launchContainer(ContainerRuntimeContext ctx) String imageName = environment.get(ENV_DOCKER_CONTAINER_IMAGE); String network = environment.get(ENV_DOCKER_CONTAINER_NETWORK); String hostname = environment.get(ENV_DOCKER_CONTAINER_HOSTNAME); + String disableOverride = environment.get( + ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE); if(network == null || network.isEmpty()) { network = defaultNetwork; @@ -784,8 +786,11 @@ public void launchContainer(ContainerRuntimeContext ctx) @SuppressWarnings("unchecked") DockerRunCommand runCommand = new DockerRunCommand(containerIdStr, dockerRunAsUser, imageName) - .setContainerWorkDir(containerWorkDir.toString()) .setNetworkType(network); + + if (disableOverride == null || disableOverride.equals("false")) { + runCommand.setContainerWorkDir(containerWorkDir.toString()); + } setHostname(runCommand, containerIdStr, hostname); runCommand.setCapabilities(capabilities); @@ -853,15 +858,19 @@ public void launchContainer(ContainerRuntimeContext ctx) addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand); - String disableOverride = environment.get( - ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE); - if (disableOverride != null && disableOverride.equals("true")) { LOG.info("command override disabled"); runCommand.setOverrideDisabled(true); - List overrideCommands = new ArrayList<>(); - overrideCommands.add("$YARN_CONTAINER_RUNTIME_DOCKER_RUN_CMD"); - runCommand.setOverrideCommandWithArgs(overrideCommands); + StringBuilder userEnv = new StringBuilder(); + for (Map.Entry kv: environment.entrySet()) { + userEnv.append("-e "); + userEnv.append(kv.getKey()); + userEnv.append("|"); + userEnv.append(kv.getValue()); + userEnv.append(" "); + } + runCommand.setEnv(userEnv.toString()); + runCommand.setOverrideCommandWithArgs(container.getLaunchContext().getCommands()); runCommand.disableDetach(); runCommand.setLogDir(container.getLogDir()); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java index b70193d..5de7b5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java @@ -169,6 +169,11 @@ public DockerRunCommand setOverrideDisabled(boolean toggle) { super.addCommandArguments("use-entry-point", value); return this; } + + public DockerRunCommand setEnv(String env) { + super.addCommandArguments("environ", env); + return this; + } public DockerRunCommand setLogDir(String logDir) { super.addCommandArguments("log-dir", logDir); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c index bd57bfd..5113b07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c @@ -1530,17 +1530,8 @@ int launch_docker_container_as_user(const char * user, const char *app_id, goto cleanup; } - if (use_entry_point(command_file)) { -/* snprintf(docker_command_with_binary, command_size, "%s %s", docker_binary, docker_command); - executable_binary = get_container_launcher_file(work_dir); - FILE *LAUNCH_SCRIPT_FP = fopen(executable_binary, "a"); - if (LAUNCH_SCRIPT_FP == NULL) { - fprintf(LOGFILE, "write to %s failed.\n", executable_binary); - goto cleanup; - } - fprintf(LAUNCH_SCRIPT_FP, "%s\n", docker_command_with_binary); - fflush(LAUNCH_SCRIPT_FP); - fclose(LAUNCH_SCRIPT_FP);*/ + snprintf(docker_command_with_binary, command_size, "%s %s", docker_binary, docker_command); +// if (use_entry_point(command_file) == 0) { char *so = init_log_path(chosen_container_log_dir, "stdout.txt"); if (so == NULL) { exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; @@ -1551,7 +1542,14 @@ int launch_docker_container_as_user(const char * user, const char *app_id, exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; goto cleanup; } - if (fork() == 0) { + pid_t pid = fork(); + if (pid == -1) { + fprintf (ERRORFILE, + "Could not invoke docker %s.\n", docker_command_with_binary); + fflush(ERRORFILE); + exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; + goto cleanup; + } else if (pid == 0) { FILE* so_fd = fopen(so, "a+"); if (so_fd == NULL) { fprintf(ERRORFILE, "Could not append to %s\n", so); @@ -1564,27 +1562,30 @@ int launch_docker_container_as_user(const char * user, const char *app_id, exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; goto cleanup; } + fprintf(so_fd, "Launching docker container...\n"); + fprintf(so_fd, "Docker run command: %s\n", docker_command_with_binary); dup2(fileno(so_fd), fileno(stdout)); dup2(fileno(se_fd), fileno(stderr)); fclose(so_fd); fclose(se_fd); - execl(docker_binary, "docker", "ps", "-a", NULL); + char **args = split_delimiter(docker_command_with_binary, " "); + execv(docker_binary, args); + exit(0); } free(so); free(se); - } else { - snprintf(docker_command_with_binary, command_size, "%s %s", docker_binary, docker_command); - fprintf(LOGFILE, "Launching docker container...\n"); - fprintf(LOGFILE, "Docker run command: %s\n", docker_command_with_binary); - FILE* start_docker = popen(docker_command_with_binary, "r"); - if (pclose (start_docker) != 0) { - fprintf (ERRORFILE, - "Could not invoke docker %s.\n", docker_command_with_binary); - fflush(ERRORFILE); - exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; - goto cleanup; - } - } +// } else { +// fprintf(LOGFILE, "Launching docker container...\n"); +// fprintf(LOGFILE, "Docker run command: %s\n", docker_command_with_binary); +// FILE* start_docker = popen(docker_command_with_binary, "r"); +// if (pclose (start_docker) != 0) { +// fprintf (ERRORFILE, +// "Could not invoke docker %s.\n", docker_command_with_binary); +// fflush(ERRORFILE); +// exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; +// goto cleanup; +// } +// } snprintf(docker_inspect_command, command_size, "%s inspect --format {{.State.Pid}} %s", @@ -1596,10 +1597,9 @@ int launch_docker_container_as_user(const char * user, const char *app_id, int pid = 0; int res = fscanf (inspect_docker, "%d", &pid); fprintf(LOGFILE, "pid from docker inspect: %d\n", pid); - if (pclose (inspect_docker) != 0 || res <= 0) - { + if (pclose (inspect_docker) != 0 || res <= 0) { fprintf (ERRORFILE, - "Could not inspect docker to get pid %s.\n", docker_inspect_command); + "Could not inspect docker to get pid %s.\n", docker_inspect_command); fflush(ERRORFILE); exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; goto cleanup; @@ -1634,6 +1634,7 @@ int launch_docker_container_as_user(const char * user, const char *app_id, } fprintf(LOGFILE, "Waiting for docker container to finish.\n"); + #ifdef __linux size_t command_size = MIN(sysconf(_SC_ARG_MAX), 128*1024); char* proc_pid_path = alloc_and_clear_memory(command_size, sizeof(char)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c index 75fa721..2d502de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/utils/docker-util.c @@ -59,11 +59,16 @@ static int add_param_to_command(const struct configuration *command_config, cons const int with_argument, char *out, const size_t outlen) { size_t tmp_buffer_size = 4096; int ret = 0; + int docker_override=0; char *tmp_buffer = (char *) alloc_and_clear_memory(tmp_buffer_size, sizeof(char)); char *value = get_configuration_value(key, DOCKER_COMMAND_FILE_SECTION, command_config); if (value != NULL) { if (with_argument) { - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, param, value); +// if (docker_override == 0) { + snprintf(tmp_buffer, tmp_buffer_size, "%s%s ", param, value); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, param, value); +// } ret = add_to_buffer(out, outlen, tmp_buffer); } else if (strcmp(value, "true") == 0) { ret = add_to_buffer(out, outlen, param); @@ -131,6 +136,7 @@ static int add_param_to_command_if_allowed(const struct configuration *command_c char **permitted_values = get_configuration_values_delimiter(allowed_key, CONTAINER_EXECUTOR_CFG_DOCKER_SECTION, executor_cfg, ","); + int docker_override = 0; //use_entry_point2(command_config); int i = 0, j = 0, permitted = 0, ret = 0; if (multiple_values) { values = get_configuration_values_delimiter(key, DOCKER_COMMAND_FILE_SECTION, command_config, ","); @@ -178,7 +184,11 @@ static int add_param_to_command_if_allowed(const struct configuration *command_c } } if (permitted == 1) { - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, param, values[i]); +// if (docker_override == 0) { + snprintf(tmp_buffer, tmp_buffer_size, "%s%s ", param, values[i]); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, param, values[i]); +// } ret = add_to_buffer(out, outlen, tmp_buffer); if (ret != 0) { fprintf(ERRORFILE, "Output buffer too small\n"); @@ -230,6 +240,15 @@ static int validate_container_name(const char *container_name) { return INVALID_DOCKER_CONTAINER_NAME; } +static int use_entry_point2(const struct configuration *command_config) { + int use_entry_point = -1; + char *value = get_configuration_value("use-entry-point", DOCKER_COMMAND_FILE_SECTION, command_config); + if (strcasecmp(value, "true") == 0) { + use_entry_point = 0; + } + return use_entry_point; +} + const char *get_docker_error_message(const int error_code) { switch (error_code) { @@ -295,7 +314,7 @@ char *get_docker_binary(const struct configuration *conf) { } int use_entry_point(const char *command_file) { - int use_entry_point = 0; + int use_entry_point = -1; int ret = 0; struct configuration command_config = {0, NULL}; ret = read_config(command_file, &command_config); @@ -304,7 +323,7 @@ int use_entry_point(const char *command_file) { } char *value = get_configuration_value("use-entry-point", DOCKER_COMMAND_FILE_SECTION, &command_config); if (strcasecmp(value, "true") == 0) { - use_entry_point = 1; + use_entry_point = 0; } return use_entry_point; } @@ -825,6 +844,7 @@ static int set_group_add(const struct configuration *command_config, char *out, char *tmp_buffer = NULL; char *privileged = NULL; + int docker_override = 0; //use_entry_point2(command_config); privileged = get_configuration_value("privileged", DOCKER_COMMAND_FILE_SECTION, command_config); if (privileged != NULL && strcasecmp(privileged, "true") == 0 ) { free(privileged); @@ -835,7 +855,11 @@ static int set_group_add(const struct configuration *command_config, char *out, if (group_add != NULL) { for (i = 0; group_add[i] != NULL; ++i) { tmp_buffer = (char *) alloc_and_clear_memory(tmp_buffer_size, sizeof(char)); - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "--group-add ", group_add[i]); +// if (docker_override == 0) { + snprintf(tmp_buffer, tmp_buffer_size, "--group-add %s ", group_add[i]); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "--group-add ", group_add[i]); +// } ret = add_to_buffer(out, outlen, tmp_buffer); if (ret != 0) { return BUFFER_TOO_SMALL; @@ -914,7 +938,7 @@ static int set_capabilities(const struct configuration *command_config, int ret = 0; - ret = add_to_buffer(out, outlen, "--cap-drop='ALL' "); + ret = add_to_buffer(out, outlen, "--cap-drop=ALL "); if (ret != 0) { return BUFFER_TOO_SMALL; } @@ -953,9 +977,19 @@ static int set_devices(const struct configuration *command_config, const struct return ret; } -static int set_env(char *out, const size_t outlen) { +static int set_env(const struct configuration *command_config, char *out, const size_t outlen) { int ret = 0; - ret = add_to_buffer(out, outlen, "$YARN_CONTAINER_RUNTIME_DOCKER_RUN_USER_ENV "); + char *value = get_configuration_value("environ", DOCKER_COMMAND_FILE_SECTION, command_config); + for (int i = 0; i <= strlen(value); i++) { + if (value[i]=='|') { + value[i]='='; + } + } + ret = add_to_buffer(out, outlen, value); + if (ret != 0) { + return BUFFER_TOO_SMALL; + } + ret = add_to_buffer(out, outlen, " "); if (ret != 0) { return BUFFER_TOO_SMALL; } @@ -1077,6 +1111,7 @@ static char* get_mount_source(const char *mount) { static int add_mounts(const struct configuration *command_config, const struct configuration *conf, const char *key, const int ro, char *out, const size_t outlen) { + int docker_override = 0; size_t tmp_buffer_size = 1024; const char *ro_suffix = ""; const char *tmp_path_buffer[2] = {NULL, NULL}; @@ -1158,7 +1193,11 @@ static int add_mounts(const struct configuration *command_config, const struct c tmp_buffer_2 = (char *) alloc_and_clear_memory(strlen(values[i]) + strlen(ro_suffix) + 1, sizeof(char)); strncpy(tmp_buffer_2, values[i], strlen(values[i])); strncpy(tmp_buffer_2 + strlen(values[i]), ro_suffix, strlen(ro_suffix)); - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "-v ", tmp_buffer_2); +// if (docker_override == 0) { + snprintf(tmp_buffer, tmp_buffer_size, "-v %s ", tmp_buffer_2); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "-v ", tmp_buffer_2); +// } ret = add_to_buffer(out, outlen, tmp_buffer); free(tmp_buffer_2); free(mount_src); @@ -1201,8 +1240,9 @@ static int check_privileges(const char *user) { char tmpl[] = "id -G -n %s"; char buffer[4096]; if (fork()==0) { - char *cmd = (char *) alloc_and_clear_memory(strlen(tmpl) + strlen(user), sizeof(char)); - sprintf(cmd, tmpl, user); + int cmd_size = strlen(tmpl) + strlen(user); + char *cmd = (char *) alloc_and_clear_memory(cmd_size, sizeof(char)); + snprintf(cmd, cmd_size, tmpl, user); fp = popen(cmd, "r"); if (fp == NULL) { exit(127); @@ -1301,6 +1341,7 @@ int get_docker_run_command(const char *command_file, const struct configuration char *tmp_buffer = NULL; char **launch_command = NULL; char *privileged = NULL; + int docker_override = use_entry_point(command_file); struct configuration command_config = {0, NULL}; ret = read_and_verify_command_file(command_file, DOCKER_RUN_COMMAND, &command_config); if (ret != 0) { @@ -1333,7 +1374,11 @@ int get_docker_run_command(const char *command_file, const struct configuration tmp_buffer = (char *) alloc_and_clear_memory(tmp_buffer_size, sizeof(char)); - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, " --name=", container_name); +// if (docker_override == 0) { + snprintf(tmp_buffer, tmp_buffer_size, " --name=%s ", container_name); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, " --name=", container_name); +// } ret = add_to_buffer(out, outlen, tmp_buffer); if (ret != 0) { return BUFFER_TOO_SMALL; @@ -1343,7 +1388,11 @@ int get_docker_run_command(const char *command_file, const struct configuration privileged = get_configuration_value("privileged", DOCKER_COMMAND_FILE_SECTION, &command_config); if (privileged == NULL || strcmp(privileged, "false") == 0) { - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "--user=", user); +// if (docker_override == 0) { + snprintf(tmp_buffer, tmp_buffer_size, "--user=%s ", user); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "--user=", user); +// } ret = add_to_buffer(out, outlen, tmp_buffer); if (ret != 0) { return BUFFER_TOO_SMALL; @@ -1417,14 +1466,16 @@ int get_docker_run_command(const char *command_file, const struct configuration return ret; } - if (use_entry_point(command_file)) { - ret = set_env(out, outlen); +// if (docker_override == 0) { + ret = set_env(&command_config, out, outlen); if (ret != 0) { - return ret; + return BUFFER_TOO_SMALL; } - } + snprintf(tmp_buffer, tmp_buffer_size, "%s ", image); +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "", image); +// } - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "", image); ret = add_to_buffer(out, outlen, tmp_buffer); if (ret != 0) { return BUFFER_TOO_SMALL; @@ -1440,12 +1491,12 @@ int get_docker_run_command(const char *command_file, const struct configuration if (launch_command != NULL) { for (i = 0; launch_command[i] != NULL; ++i) { memset(tmp_buffer, 0, tmp_buffer_size); - if (use_entry_point(command_file)) { +// if (docker_override == 0) { ret = add_to_buffer(out, outlen, launch_command[i]); - } else { - quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "", launch_command[i]); - ret = add_to_buffer(out, outlen, tmp_buffer); - } +// } else { +// quote_and_append_arg(&tmp_buffer, &tmp_buffer_size, "", launch_command[i]); +// ret = add_to_buffer(out, outlen, tmp_buffer); +// } if (ret != 0) { free_values(launch_command); free(tmp_buffer);