diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/SystemdLCEResourceHandler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/SystemdLCEResourceHandler.java new file mode 100644 index 0000000..e809a85 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/SystemdLCEResourceHandler.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.io.IOException; +import java.util.Arrays; + +public class SystemdLCEResourceHandler implements LCEResourcesHandler { + + final static Log LOG = LogFactory.getLog(SystemdLCEResourceHandler.class); + + private Configuration conf; + private String rootSliceName; + private boolean cpuWeightEnabled = true; + private final int CPU_DEFAULT_WEIGHT = 1024; + + Clock clock; + + public SystemdLCEResourceHandler() { + clock = new SystemClock(); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @VisibleForTesting + void initConfig() throws IOException { + rootSliceName = conf.get(YarnConfiguration. + NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "hadoop_yarn"); + rootSliceName = rootSliceName.replaceAll("-", "_"); + if (rootSliceName.startsWith("/")) { + rootSliceName = rootSliceName.substring(1); + } + } + + @Override + public void init(LinuxContainerExecutor lce) throws IOException { + initConfig(); + // Here we initialize a Hadoop YARN slice and all following YARN containers + // will run inside this slice. + startSystemdSlice(rootSliceName + ".slice"); + setSliceProperty(rootSliceName + ".slice", "CPUAccounting=true"); + } + + boolean isCpuWeightEnabled() { + return this.cpuWeightEnabled; + } + + @Override + public void preExecute(ContainerId containerId, Resource containerResource) + throws IOException { + if (isCpuWeightEnabled()) { + // Create a slice, and set the cpu sharing for this container + String containerSliceName = nameForSystemdSlice(containerId); + startSystemdSlice(containerSliceName); + // Setup the cpuShare + int containerVCores = containerResource.getVirtualCores(); + setSliceProperty(containerSliceName, + "CPUShares=" + (CPU_DEFAULT_WEIGHT * containerVCores) + + " CPUAccounting=true"); + } + } + + @Override + public void postExecute(ContainerId containerId) { + if (isCpuWeightEnabled()) { + try { + stopSystemdSlice(nameForSystemdSlice(containerId)); + } catch (IOException e) { + LOG.warn("Error when stop the slice for the container " + containerId); + e.printStackTrace(); + } + } + } + + @Override + public String getResourcesOption(ContainerId containerId) { + StringBuilder sb = new StringBuilder("systemd="); + + if (isCpuWeightEnabled()) { + sb.append(nameForSystemdSlice(containerId)); + } + return sb.toString(); + } + + /** + * Start a new Systemd slice with the given slice name + */ + private void startSystemdSlice(String sliceName) throws IOException { + String[] commandArray = new String[] {"/bin/sh", "-c", + "sudo systemctl start " + sliceName}; + Shell.ShellCommandExecutor sce = new Shell.ShellCommandExecutor(commandArray); + if (LOG.isDebugEnabled()) { + LOG.debug("Start a System slice: " + Arrays.toString(commandArray)); + } + sce.execute(); + } + + /** + * Stop a Systemd slice + */ + private void stopSystemdSlice(String sliceName) throws IOException { + String[] commandArray = new String[] {"/bin/sh", "-c", + "sudo systemctl stop " + sliceName}; + Shell.ShellCommandExecutor sce = new Shell.ShellCommandExecutor(commandArray); + if (LOG.isDebugEnabled()) { + LOG.debug("Stop a Systemd slice: " + Arrays.toString(commandArray)); + } + sce.execute(); + } + + /** + * Set properties for the given slice + */ + private void setSliceProperty(String sliceName, String property) + throws IOException { + String[] commandArray = new String[] {"/bin/sh", "-c", + "sudo systemctl set-property " + sliceName + " " + property}; + Shell.ShellCommandExecutor sce = new Shell.ShellCommandExecutor(commandArray); + if (LOG.isDebugEnabled()) { + LOG.debug("Set property for a Systemd slice: " + + Arrays.toString(commandArray)); + } + sce.execute(); + } + + /** + * Generate the slice name for the given container + */ + private String nameForSystemdSlice(ContainerId containerId) { + String containerName = containerId.toString().replaceAll("-", "_"); + return rootSliceName + "-" + containerName + ".slice"; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c index f582d85..9488695 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c @@ -1120,6 +1120,153 @@ int launch_container_as_user(const char *user, const char *app_id, return exit_code; } +int launch_container_as_user_systemd(const char *user, const char *app_id, + const char *container_id, const char *work_dir, + const char *script_name, const char *cred_file, + const char* pid_file, char* const* local_dirs, + char* const* log_dirs, const char *resources_key, + const char *resources_value) { + int exit_code = -1; + char *script_file_dest = NULL; + char *cred_file_dest = NULL; + char *exit_code_file = NULL; + + script_file_dest = get_container_launcher_file(work_dir); + if (script_file_dest == NULL) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + cred_file_dest = get_container_credentials_file(work_dir); + if (NULL == cred_file_dest) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + exit_code_file = get_exit_code_file(pid_file); + if (NULL == exit_code_file) { + exit_code = OUT_OF_MEMORY; + goto cleanup; + } + + // open launch script + int container_file_source = open_file_as_nm(script_name); + if (container_file_source == -1) { + goto cleanup; + } + + // open credentials + int cred_file_source = open_file_as_nm(cred_file); + if (cred_file_source == -1) { + goto cleanup; + } + + pid_t child_pid = fork(); + if (child_pid != 0) { + // parent + exit_code = wait_and_write_exit_code(child_pid, exit_code_file); + goto cleanup; + } + + // setsid + pid_t pid = setsid(); + if (pid == -1) { + exit_code = SETSID_OPER_FAILED; + goto cleanup; + } + + // write pid to pidfile + if (pid_file == NULL + || write_pid_to_file_as_nm(pid_file, pid) != 0) { + exit_code = WRITE_PIDFILE_FAILED; + goto cleanup; + } + + // create the user directory on all disks + int result = initialize_user(user, local_dirs); + if (result != 0) { + return result; + } + + // initializing log dirs + int log_create_result = create_log_dirs(app_id, log_dirs); + if (log_create_result != 0) { + return log_create_result; + } + + // Create container specific directories as user. If there are no resources + // to localize for this container, app-directories and log-directories are + // also created automatically as part of this call. + if (create_container_directories(user, app_id, container_id, local_dirs, + log_dirs, work_dir) != 0) { + fprintf(LOGFILE, "Could not create container dirs"); + goto cleanup; + } + + // 700 + if (copy_file(container_file_source, script_name, script_file_dest,S_IRWXU) != 0) { + goto cleanup; + } + + // 600 + if (copy_file(cred_file_source, cred_file, cred_file_dest, + S_IRUSR | S_IWUSR) != 0) { + goto cleanup; + } + +#if HAVE_FCLOSEALL + fcloseall(); +#else + // only those fds are opened assuming no bug + fclose(LOGFILE); + fclose(ERRORFILE); + fclose(stdin); + fclose(stdout); + fclose(stderr); +#endif + umask(0027); + if (chdir(work_dir) != 0) { + fprintf(LOGFILE, "Can't change directory to %s -%s\n", work_dir, + strerror(errno)); + goto cleanup; + } + + // Change to root user + if (change_effective_user(0, 0) != 0) { + fprintf(LOGFILE, "Can't change to root user, %s\n", strerror(errno)); + goto cleanup; + } + + char* commands[4]; + commands[0] = "sh"; + commands[1] = "-c"; + int size = 55 + strlen(resources_value) + strlen(user) + strlen(script_file_dest); + commands[2] = malloc(size); + memset(commands[2], '\0', size); + strcat(commands[2], "systemd-run --scope --slice "); + strcat(commands[2], resources_value); + strcat(commands[2], " /usr/sbin/runuser -u "); + strcat(commands[2], user); + strcat(commands[2], " sh "); + strcat(commands[2], script_file_dest); + commands[3] = NULL; + + if (execvp(commands[0], commands) != 0) { + fprintf(LOGFILE, "Couldn't execute the container launch file %s - %s", + script_file_dest, strerror(errno)); + exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT; + goto cleanup; + } + + exit_code = 0; + + cleanup: + free(exit_code_file); + free(script_file_dest); + free(cred_file_dest); + return exit_code; +} + int signal_container_as_user(const char *user, int pid, int sig) { if(pid <= 0) { return INVALID_CONTAINER_PID; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c index 9b5e784..1a765eb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c @@ -240,12 +240,21 @@ int main(int argc, char **argv) { free(resources_value); return INVALID_ARGUMENT_NUMBER; } - char** resources_values = extract_values(resources_value); - exit_code = launch_container_as_user(yarn_user_name, app_id, - container_id, current_dir, script_file, cred_file, - pid_file, extract_values(local_dirs), - extract_values(log_dirs), resources_key, - resources_values); + + if (strcmp(resources_key, "cgroups") == 0) { + char** resources_values = extract_values(resources_value); + exit_code = launch_container_as_user(yarn_user_name, app_id, + container_id, current_dir, script_file, cred_file, + pid_file, extract_values(local_dirs), + extract_values(log_dirs), resources_key, + resources_values); + } else if (strcmp(resources_key, "systemd") == 0) { + exit_code = launch_container_as_user_systemd(yarn_user_name, app_id, + container_id, current_dir, script_file, cred_file, + pid_file, extract_values(local_dirs), + extract_values(log_dirs), resources_key, + resources_value); + } free(resources_key); free(resources_value); break;