diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dc7c629..f57735d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2747,6 +2747,20 @@ public static boolean areNodeLabelsEnabled( public static final String TIMELINE_XFS_OPTIONS = TIMELINE_XFS_PREFIX + "xframe-options"; + // NUMA awareness props + public static final String NM_NUMA_AWARENESS_ENABLED = NM_PREFIX + + "numa-awareness.enabled"; + public static final boolean DEFAULT_NM_NUMA_AWARENESS_ENABLED = false; + public static final String NM_NUMA_AWARENESS_READ_TOPOLOGY = NM_PREFIX + + "numa-awareness.read-topology"; + public static final boolean DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY = false; + public static final String NM_NUMA_AWARENESS_NODE_IDS = NM_PREFIX + + "numa-awareness.node-ids"; + public static final String NM_NUMA_AWARENESS_NODE_MEMORY = NM_PREFIX + + "numa-awareness..memory"; + public static final String NM_NUMA_AWARENESS_NODE_CPUS = NM_PREFIX + + "numa-awareness..cpus"; + public YarnConfiguration() { super(); } @@ -2901,6 +2915,17 @@ public static boolean systemMetricsPublisherEnabled(Configuration conf) { YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); } + /** + * Returns whether the NUMA awareness is enabled. + * + * @param conf the configuration + * @return whether the NUMA awareness is enabled. + */ + public static boolean numaAwarenessEnabled(Configuration conf) { + return conf.getBoolean(NM_NUMA_AWARENESS_ENABLED, + DEFAULT_NM_NUMA_AWARENESS_ENABLED); + } + /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c8c4edd..5c9b9f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2999,4 +2999,39 @@ 60000 + + + Whether to enable the NUMA awareness for containers in Node Manager. + + yarn.nodemanager.numa-awareness.enabled + false + + + + + Whether to read the NUMA topology from the system or from the + configurations. If the value is true then NM reads the NUMA topology from + system using the command 'numactl --hardware'. If the value is true then NM + reads the topology from the configurations + 'yarn.nodemanager.numa-awareness.node-ids'(for node id's), + 'yarn.nodemanager.numa-awareness.<NODE_ID>.memory'(for each node memory), + 'yarn.nodemanager.numa-awareness.<NODE_ID>.cpus'(for each node cpus). + + yarn.nodemanager.numa-awareness.read-topology + false + + + + + NUMA node id's in the form of comma separated list. Memory and No of CPUs + will be read using the properties + 'yarn.nodemanager.numa-awareness.<NODE_ID>.memory' and + 'yarn.nodemanager.numa-awareness.<NODE_ID>.cpus' for each id specified + in this value. This property value will be read only when + 'yarn.nodemanager.numa-awareness.read-topology=false'. + + yarn.nodemanager.numa-awareness.node-ids + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index f880506..ddbb050 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.numa.NUMAResourcesManager; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; @@ -90,6 +91,8 @@ private final ReadLock readLock = lock.readLock(); private final WriteLock writeLock = lock.writeLock(); + private NUMAResourcesManager numaResourcesManager; + @Override public void setConf(Configuration conf) { this.conf = conf; @@ -707,6 +710,20 @@ public String getProcessId(ContainerId containerID) { } /** + * NUMA Resources Manager for assigning numa nodes for a container. + * + * @param numaResourcesManager instance of the NUMAResourcesManager + */ + public void setNumaResourcesManager( + NUMAResourcesManager numaResourcesManager) { + this.numaResourcesManager = numaResourcesManager; + } + + public NUMAResourcesManager getNumaResourcesManager() { + return numaResourcesManager; + } + + /** * This class will signal a target container after a specified delay. * @see #signalContainer */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 420035d..a8720b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.numa.AssignedNumaNodeInfo; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; @@ -361,7 +362,18 @@ protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, String[] command = getRunCommand(wrapperScriptPath, containerIdStr, user, pidFile, this.getConf(), resource); - + if (YarnConfiguration.numaAwarenessEnabled(this.getConf())) { + String[] cmd = new String[command.length + 3]; + AssignedNumaNodeInfo numaNodeInfo = getNumaResourcesManager() + .assignNumaNode(ContainerId.fromString(containerIdStr), resource); + if (numaNodeInfo != null) { + cmd[0] = "numactl"; + cmd[1] = "--membind=" + numaNodeInfo.getMemNode(); + cmd[2] = "--cpunodebind=" + numaNodeInfo.getCpuNode(); + System.arraycopy(command, 0, cmd, 3, command.length); + command = cmd; + } + } LOG.info("launchContainer: " + Arrays.toString(command)); return new ShellCommandExecutor( command, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index 323c443..7b9d5f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.numa.AssignedNumaNodeInfo; import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; @@ -222,6 +223,19 @@ protected void addSchedPriorityCommand(List command) { } } + private void addNumaAwarenessParams(List command, + Container container) { + if (YarnConfiguration.numaAwarenessEnabled(getConf())) { + AssignedNumaNodeInfo numaNodeInfo = getNumaResourcesManager() + .assignNumaNode(container.getContainerId(), container.getResource()); + if (numaNodeInfo != null) { + command.addAll( + Arrays.asList("numactl", "--membind=" + numaNodeInfo.getMemNode(), + "--cpunodebind=" + numaNodeInfo.getCpuNode())); + } + } + } + @Override public void init() throws IOException { Configuration conf = super.getConf(); @@ -440,7 +454,7 @@ public int launchContainer(ContainerStartContext ctx) throws IOException { List prefixCommands = new ArrayList<>(); ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext .Builder(container); - + addNumaAwarenessParams(prefixCommands, container); addSchedPriorityCommand(prefixCommands); if (prefixCommands.size() > 0) { builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 72875a9..a96bb99 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabelsProvider; +import org.apache.hadoop.yarn.server.nodemanager.numa.NUMAResourcesManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -383,6 +384,12 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nmCollectorService); } + if (YarnConfiguration.numaAwarenessEnabled(conf)) { + NUMAResourcesManager numaNodesScheduler = new NUMAResourcesManager(); + addService(numaNodesScheduler); + containerManager.setNumaNodesScheduler(numaNodesScheduler); + exec.setNumaResourcesManager(numaNodesScheduler); + } // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e4d9346..0259fe5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -140,6 +140,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.numa.NUMAResourcesManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; @@ -215,6 +216,8 @@ // NM metrics publisher is set only if the timeline service v.2 is enabled private NMTimelinePublisher nmMetricsPublisher; + private NUMAResourcesManager numaNodesScheduler; + public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { @@ -1708,4 +1711,12 @@ private void internalSignalToContainer(SignalContainerRequest request, public ContainerScheduler getContainerScheduler() { return this.containerScheduler; } + + public NUMAResourcesManager getNumaNodesScheduler() { + return numaNodesScheduler; + } + + public void setNumaNodesScheduler(NUMAResourcesManager numaNodesScheduler) { + this.numaNodesScheduler = numaNodesScheduler; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index d4a7bfd..e8ce1bb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -32,6 +32,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -139,6 +140,11 @@ public void handle(ContainersLauncherEvent event) { case CLEANUP_CONTAINER: case CLEANUP_CONTAINER_FOR_REINIT: ContainerLaunch launcher = running.remove(containerId); + // Remove from NUMA Resources Manager if enabled + if (YarnConfiguration.numaAwarenessEnabled(getConfig())) { + containerManager.getNumaNodesScheduler() + .releaseNumaResources(containerId); + } if (launcher == null) { // Container not launched. So nothing needs to be done. return; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/AssignedNumaNodeInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/AssignedNumaNodeInfo.java new file mode 100644 index 0000000..6a35432 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/AssignedNumaNodeInfo.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.numa; + +/** + * AssignedNumaNodeInfo contains Memory nodes and CPU nodes. + */ +public class AssignedNumaNodeInfo { + private String memNodes; + private String cpuNodes; + + public AssignedNumaNodeInfo(String memNodes, String cpuNodes) { + this.memNodes = memNodes; + this.cpuNodes = cpuNodes; + } + + public String getMemNode() { + return memNodes; + } + + public String getCpuNode() { + return cpuNodes; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/NUMAResourcesManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/NUMAResourcesManager.java new file mode 100644 index 0000000..9ddeb32 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/NUMAResourcesManager.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.numa; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import com.google.common.annotations.VisibleForTesting; + +/** + * NUMA Resources Manager reads the NUMA topology and assigns NUMA nodes to the + * containers. + */ +public class NUMAResourcesManager extends CompositeService { + + private static final Log LOG = LogFactory.getLog(NUMAResourcesManager.class); + + // Regex to find node ids, Ex: 'available: 2 nodes (0-1)' + private static final String NUMA_NODEIDS_REGEX = + "available:\\s*[0-9]+\\s*nodes\\s*\\(([0-9\\-,]*)\\)"; + + // Regex to find node memory, Ex: 'node 0 size: 73717 MB' + private static final String NUMA_NODE_MEMORY_REGEX = + "node\\s*\\s*size:\\s*([0-9]*)\\s*([KMG]B)"; + + // Regex to find node cpus, Ex: 'node 0 cpus: 0 2 4 6' + private static final String NUMA_NODE_CPUS_REGEX = + "node\\s*\\s*cpus:\\s*([0-9\\s]*)"; + + private static final String GB = "GB"; + private static final String KB = "KB"; + private static final String NUMA_NODE = ""; + private static final String SPACE = "\\s"; + + private List numaNodesList = new ArrayList<>(); + private int noOfNumaNodes; + private int currentAssignNode; + + public NUMAResourcesManager() { + super(NUMAResourcesManager.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + if (conf.getBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY)) { + LOG.info("Reading NUMA topology using 'numactl --hardware' command."); + String cmdOutput = executeNGetCmdOutput(); + String[] outputLines = cmdOutput.split("\\n"); + Pattern pattern = Pattern.compile(NUMA_NODEIDS_REGEX); + String nodeIdsStr = null; + for (String line : outputLines) { + Matcher matcher = pattern.matcher(line); + if (matcher.find()) { + nodeIdsStr = matcher.group(1); + break; + } + } + if (nodeIdsStr == null) { + throw new YarnRuntimeException("Failed to get numa nodes from" + + " 'numactl --hardware' output and output is:\n" + cmdOutput); + } + String[] nodeIdCommaSplits = nodeIdsStr.split("[,\\s]"); + for (String nodeIdRange : nodeIdCommaSplits) { + if (nodeIdRange.contains("-")) { + String[] beginNEnd = nodeIdRange.split("-"); + for (int nodeId = Integer.parseInt(beginNEnd[0]); nodeId <= Integer + .parseInt(beginNEnd[1]); nodeId++) { + long memory = parseMemory(outputLines, String.valueOf(nodeId)); + int cpus = parseCpus(outputLines, String.valueOf(nodeId)); + numaNodesList + .add(new NumaNode(String.valueOf(nodeId), memory, cpus)); + } + } else { + long memory = parseMemory(outputLines, nodeIdRange); + int cpus = parseCpus(outputLines, nodeIdRange); + numaNodesList.add(new NumaNode(nodeIdRange, memory, cpus)); + } + } + } else { + LOG.info("Reading NUMA topology using configurations."); + Collection nodeIds = conf + .getStringCollection(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS); + for (String nodeId : nodeIds) { + long mem = conf.getLong( + "yarn.nodemanager.numa-awareness." + nodeId + ".memory", 0); + int cpus = conf + .getInt("yarn.nodemanager.numa-awareness." + nodeId + ".cpus", 0); + numaNodesList.add(new NumaNode(nodeId, mem, cpus)); + } + } + if (numaNodesList.isEmpty()) { + throw new YarnRuntimeException("There are no available NUMA nodes" + + " for making containers NUMA aware."); + } + noOfNumaNodes = numaNodesList.size(); + LOG.info("Available numa nodes with capacities : " + numaNodesList); + } + + @VisibleForTesting + String executeNGetCmdOutput() throws YarnRuntimeException { + String[] args = new String[] {"numactl", "--hardware"}; + ShellCommandExecutor shExec = new ShellCommandExecutor(args); + try { + shExec.execute(); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to read the numa configurations.", + e); + } + String cmdOutput = shExec.getOutput(); + return cmdOutput; + } + + private int parseCpus(String[] outputLines, String nodeId) { + int cpus = 0; + Pattern patternNodeCPUs = Pattern + .compile(NUMA_NODE_CPUS_REGEX.replace(NUMA_NODE, nodeId)); + for (String line : outputLines) { + Matcher matcherNodeCPUs = patternNodeCPUs.matcher(line); + if (matcherNodeCPUs.find()) { + String cpusStr = matcherNodeCPUs.group(1); + cpus = cpusStr.split(SPACE).length; + break; + } + } + return cpus; + } + + private long parseMemory(String[] outputLines, String nodeId) { + long memory = 0; + String units; + Pattern patternNodeMem = Pattern + .compile(NUMA_NODE_MEMORY_REGEX.replace(NUMA_NODE, nodeId)); + for (String line : outputLines) { + Matcher matcherNodeMem = patternNodeMem.matcher(line); + if (matcherNodeMem.find()) { + try { + memory = Long.parseLong(matcherNodeMem.group(1)); + units = matcherNodeMem.group(2); + if (GB.equals(units)) { + memory = memory * 1024; + } else if (KB.equals(units)) { + memory = memory / 1024; + } + } catch (Exception ex) { + throw new YarnRuntimeException( + "Failed to get memory for node:" + nodeId, ex); + } + break; + } + } + return memory; + } + + /** + * Assigns the available NUMA nodes for the requested containerId with + * resource in a round robin fashion. + * + * @param containerId the container ID + * @param resource resource for the Numa Node + * @return the assigned NUMA Node info or null if resources not available. + */ + public synchronized AssignedNumaNodeInfo assignNumaNode( + ContainerId containerId, Resource resource) { + for (int index = 0; index < noOfNumaNodes; index++) { + NumaNode numaNode = numaNodesList + .get((currentAssignNode + index) % noOfNumaNodes); + if (numaNode.isResourcesAvailable(resource)) { + numaNode.assignResources(resource, containerId); + LOG.info("Assigning NUMA node " + numaNode.getNodeId() + " for memory, " + + numaNode.getNodeId() + " for cpus for the " + containerId); + currentAssignNode = (currentAssignNode + index + 1) % noOfNumaNodes; + return new AssignedNumaNodeInfo(numaNode.getNodeId(), + numaNode.getNodeId()); + } + } + + // If there is no single node matched for the container resource + // Check the NUMA nodes for Memory resources + String memNodes = ""; + long memreq = resource.getMemorySize(); + for (NumaNode numaNode : numaNodesList) { + memreq = numaNode.assignAvailableMemory(memreq, containerId); + memNodes += numaNode.getNodeId(); + if (memreq == 0) { + break; + } else { + memNodes += ","; + } + } + if (memreq != 0) { + LOG.info("There is no available memory:" + resource.getMemorySize() + + " in numa nodes for " + containerId); + releaseNumaResources(containerId); + return null; + } + + // Check the NUMA nodes for CPU resources + String cpuNodes = ""; + int cpusreq = resource.getVirtualCores(); + for (int index = 0; index < noOfNumaNodes; index++) { + NumaNode numaNode = numaNodesList + .get((currentAssignNode + index) % noOfNumaNodes); + cpusreq = numaNode.assignAvailableCpus(cpusreq, containerId); + cpuNodes += numaNode.getNodeId(); + if (cpusreq == 0) { + currentAssignNode = (currentAssignNode + index + 1) % noOfNumaNodes; + break; + } else { + cpuNodes += ","; + } + } + + if (cpusreq != 0) { + LOG.info("There are no available cpus:" + resource.getVirtualCores() + + " in numa nodes for " + containerId); + releaseNumaResources(containerId); + return null; + } + LOG.info("Assigning multiple NUMA nodes (" + memNodes + ") for memory, (" + + cpuNodes + ") for cpus for " + containerId); + return new AssignedNumaNodeInfo(memNodes, cpuNodes); + } + + /** + * Release assigned NUMA resources for the container. + * + * @param containerId the container ID + */ + public void releaseNumaResources(ContainerId containerId) { + LOG.info("Releasing the assigned NUMA resources for " + containerId); + for (NumaNode numaNode : numaNodesList) { + numaNode.releaseResources(containerId); + } + } + + @VisibleForTesting + Collection getNumaNodesList() { + return numaNodesList; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/NumaNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/NumaNode.java new file mode 100644 index 0000000..0c9f7a0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/NumaNode.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.numa; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * NumaNode class holds the NUMA node topology with the total and used + * resources. + */ +public class NumaNode { + private String nodeId; + private long totalMemory; + private int totalCpus; + private long usedMemory; + private int usedCpus; + + private static final Log LOG = LogFactory.getLog(NumaNode.class); + + private Map containerVsMemUsage = + new ConcurrentHashMap<>(); + private Map containerVsCpusUsage = + new ConcurrentHashMap<>(); + + public NumaNode(String nodeId, long totalMemory, int totalCpus) { + this.nodeId = nodeId; + this.totalMemory = totalMemory; + this.totalCpus = totalCpus; + } + + public boolean isResourcesAvailable(Resource resource) { + LOG.info( + "Memory available:" + (totalMemory - usedMemory) + ", CPUs available:" + + (totalCpus - usedCpus) + ", requested:" + resource); + if ((totalMemory - usedMemory) >= resource.getMemorySize() + && (totalCpus - usedCpus) >= resource.getVirtualCores()) { + return true; + } + return false; + } + + public synchronized long assignAvailableMemory(long memreq, + ContainerId containerId) { + long memAvailable = totalMemory - usedMemory; + if (memAvailable >= memreq) { + containerVsMemUsage.put(containerId, memreq); + usedMemory += memreq; + return 0; + } else { + usedMemory += memAvailable; + containerVsMemUsage.put(containerId, memAvailable); + return memreq - memAvailable; + } + } + + public synchronized int assignAvailableCpus(int cpusreq, + ContainerId containerId) { + int cpusAvailable = totalCpus - usedCpus; + if (cpusAvailable >= cpusreq) { + containerVsCpusUsage.put(containerId, cpusreq); + usedCpus += cpusreq; + return 0; + } else { + usedCpus += cpusAvailable; + containerVsCpusUsage.put(containerId, cpusAvailable); + return cpusreq - cpusAvailable; + } + } + + public synchronized void assignResources(Resource resource, + ContainerId containerId) { + containerVsMemUsage.put(containerId, resource.getMemorySize()); + containerVsCpusUsage.put(containerId, resource.getVirtualCores()); + usedMemory += resource.getMemorySize(); + usedCpus += resource.getVirtualCores(); + } + + public synchronized void releaseResources(ContainerId containerId) { + if (containerVsMemUsage.containsKey(containerId)) { + usedMemory -= containerVsMemUsage.get(containerId); + containerVsMemUsage.remove(containerId); + } + if (containerVsCpusUsage.containsKey(containerId)) { + usedCpus -= containerVsCpusUsage.get(containerId); + containerVsCpusUsage.remove(containerId); + } + } + + @Override + public String toString() { + return "Node Id:" + nodeId + "\tMemory:" + totalMemory + "\tCPus:" + + totalCpus; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + result = prime * result + (int) (totalMemory ^ (totalMemory >>> 32)); + result = prime * result + totalCpus; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + NumaNode other = (NumaNode) obj; + if (nodeId == null) { + if (other.nodeId != null) { + return false; + } + } else if (!nodeId.equals(other.nodeId)) { + return false; + } + if (totalMemory != other.totalMemory) { + return false; + } + if (totalCpus != other.totalCpus) { + return false; + } + return true; + } + + public String getNodeId() { + return nodeId; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/package-info.java new file mode 100644 index 0000000..bbbafa3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/numa/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.nodemanager.numa contains classes + * related to NUMA aware containers functionality. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.numa; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/numa/TestNUMAResourcesManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/numa/TestNUMAResourcesManager.java new file mode 100644 index 0000000..e7959ec --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/numa/TestNUMAResourcesManager.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.numa; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for NUMAResourcesManager. + */ +public class TestNUMAResourcesManager { + + private Configuration conf; + private NUMAResourcesManager numaResourcesManager; + + @Before + public void setUp() throws IOException { + conf = new YarnConfiguration(); + numaResourcesManager = new NUMAResourcesManager(); + } + + @Test + public void testReadNumaTopologyFromConfigurations() throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + Collection nodesList = numaResourcesManager.getNumaNodesList(); + Collection expectedNodesList = getExpectedNumaNodesList(); + Assert.assertEquals(expectedNodesList, nodesList); + } + + @Test + public void testReadNumaTopologyFromCmdOutput() throws Exception { + conf.setBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, true); + String cmdOutput = "available: 2 nodes (0-1)\n\t" + + "node 0 cpus: 0 2 4 6\n\t" + + "node 0 size: 73717 MB\n\t" + + "node 0 free: 17272 MB\n\t" + + "node 1 cpus: 1 3 5 7\n\t" + + "node 1 size: 73727 MB\n\t" + + "node 1 free: 10699 MB\n\t" + + "node distances:\n\t" + + "node 0 1\n\t" + + "0: 10 20\n\t" + + "1: 20 10"; + numaResourcesManager = new NUMAResourcesManager() { + @Override + String executeNGetCmdOutput() throws YarnRuntimeException { + return cmdOutput; + } + }; + numaResourcesManager.serviceInit(conf); + Collection nodesList = numaResourcesManager.getNumaNodesList(); + Collection expectedNodesList = getExpectedNumaNodesList(); + Assert.assertEquals(expectedNodesList, nodesList); + } + + @Test + public void testAssignNumaNode() throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("0", nodeInfo.getMemNode()); + Assert.assertEquals("0", nodeInfo.getCpuNode()); + } + + @Test + public void testAssignNumaNodeWithRoundRobinFashionAssignment() + throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo1 = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("0", nodeInfo1.getMemNode()); + Assert.assertEquals("0", nodeInfo1.getCpuNode()); + + AssignedNumaNodeInfo nodeInfo2 = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000002"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("1", nodeInfo2.getMemNode()); + Assert.assertEquals("1", nodeInfo2.getCpuNode()); + + AssignedNumaNodeInfo nodeInfo3 = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000003"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("0", nodeInfo3.getMemNode()); + Assert.assertEquals("0", nodeInfo3.getCpuNode()); + + AssignedNumaNodeInfo nodeInfo4 = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000003"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("1", nodeInfo4.getMemNode()); + Assert.assertEquals("1", nodeInfo4.getCpuNode()); + } + + @Test + public void testAssignNumaNodeWithMultipleNodesForMemory() throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(102400, 2)); + Assert.assertEquals("0,1", nodeInfo.getMemNode()); + Assert.assertEquals("0", nodeInfo.getCpuNode()); + } + + @Test + public void testAssignNumaNodeWithMultipleNodesForCpus() throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 6)); + Assert.assertEquals("0", nodeInfo.getMemNode()); + Assert.assertEquals("0,1", nodeInfo.getCpuNode()); + } + + @Test + public void testAssignNumaNodeWhenNoNumaMemResourcesAvailable() + throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048000, 6)); + Assert.assertNull("Should not assign numa nodes when there" + + " are no sufficient memory resources available.", nodeInfo); + } + + @Test + public void testAssignNumaNodeWhenNoNumaCpuResourcesAvailable() + throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 600)); + Assert.assertNull("Should not assign numa nodes when there" + + " are no sufficient cpu resources available.", nodeInfo); + } + + @Test + public void testReleaseNumaResourcess() throws Exception { + setNumaTopologyConfigs(); + numaResourcesManager.serviceInit(conf); + AssignedNumaNodeInfo nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 8)); + Assert.assertEquals("0", nodeInfo.getMemNode()); + Assert.assertEquals("0,1", nodeInfo.getCpuNode()); + + // Request the resource when all cpu nodes occupied + nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000002"), + Resource.newInstance(2048, 4)); + Assert.assertNull("Should not assign numa nodes when there" + + " are no sufficient cpu resources available.", nodeInfo); + + // Release the resources + numaResourcesManager.releaseNumaResources( + ContainerId.fromString("container_1481156246874_0001_01_000001")); + // Request the resources + nodeInfo = numaResourcesManager.assignNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000003"), + Resource.newInstance(1024, 2)); + Assert.assertEquals("0", nodeInfo.getMemNode()); + Assert.assertEquals("0", nodeInfo.getCpuNode()); + } + + private void setNumaTopologyConfigs() { + conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1"); + conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717"); + conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4"); + conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727"); + conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4"); + } + + private Collection getExpectedNumaNodesList() { + Collection expectedNodesList = new ArrayList<>(2); + expectedNodesList.add(new NumaNode("0", 73717, 4)); + expectedNodesList.add(new NumaNode("1", 73727, 4)); + return expectedNodesList; + } +}