diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 16bd73a..90f2a45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3042,6 +3042,24 @@ public static boolean areNodeLabelsEnabled( public static final String TIMELINE_XFS_OPTIONS = TIMELINE_XFS_PREFIX + "xframe-options"; + // NUMA awareness props + public static final String NM_NUMA_AWARENESS_ENABLED = NM_PREFIX + + "numa-awareness.enabled"; + public static final boolean DEFAULT_NM_NUMA_AWARENESS_ENABLED = false; + public static final String NM_NUMA_AWARENESS_READ_TOPOLOGY = NM_PREFIX + + "numa-awareness.read-topology"; + public static final boolean DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY = false; + public static final String NM_NUMA_AWARENESS_NODE_IDS = NM_PREFIX + + "numa-awareness.node-ids"; + public static final String NM_NUMA_AWARENESS_NODE_MEMORY = NM_PREFIX + + "numa-awareness..memory"; + public static final String NM_NUMA_AWARENESS_NODE_CPUS = NM_PREFIX + + "numa-awareness..cpus"; + public static final String NM_NUMA_AWARENESS_NUMACTL_CMD = NM_PREFIX + + "numa-awareness.numactl.cmd"; + public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD = + "/usr/bin/numactl"; + public YarnConfiguration() { super(); } @@ -3196,6 +3214,17 @@ public static boolean systemMetricsPublisherEnabled(Configuration conf) { YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); } + /** + * Returns whether the NUMA awareness is enabled. + * + * @param conf the configuration + * @return whether the NUMA awareness is enabled. + */ + public static boolean numaAwarenessEnabled(Configuration conf) { + return conf.getBoolean(NM_NUMA_AWARENESS_ENABLED, + DEFAULT_NM_NUMA_AWARENESS_ENABLED); + } + /* For debugging. mp configurations to system output as XML format. */ public static void main(String[] args) throws Exception { new YarnConfiguration(new Configuration()).writeXml(System.out); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0823dfe..27b9e8f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3245,4 +3245,48 @@ false + + + Whether to enable the NUMA awareness for containers in Node Manager. + + yarn.nodemanager.numa-awareness.enabled + false + + + + + Whether to read the NUMA topology from the system or from the + configurations. If the value is true then NM reads the NUMA topology from + system using the command 'numactl --hardware'. If the value is false then NM + reads the topology from the configurations + 'yarn.nodemanager.numa-awareness.node-ids'(for node id's), + 'yarn.nodemanager.numa-awareness.<NODE_ID>.memory'(for each node memory), + 'yarn.nodemanager.numa-awareness.<NODE_ID>.cpus'(for each node cpus). + + yarn.nodemanager.numa-awareness.read-topology + false + + + + + NUMA node id's in the form of comma separated list. Memory and No of CPUs + will be read using the properties + 'yarn.nodemanager.numa-awareness.<NODE_ID>.memory' and + 'yarn.nodemanager.numa-awareness.<NODE_ID>.cpus' for each id specified + in this value. This property value will be read only when + 'yarn.nodemanager.numa-awareness.read-topology=false'. + + yarn.nodemanager.numa-awareness.node-ids + + + + + + The numactl command path which controls NUMA policy for processes or + shared memory. + + yarn.nodemanager.numa-awareness.numactl.cmd + /usr/bin/numactl + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index dc68680..f84c4ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -455,6 +455,7 @@ public int launchContainer(ContainerStartContext ctx) container.getResource()); String resourcesOptions = resourcesHandler.getResourcesOption(containerId); String tcCommandFile = null; + List numaArgs = null; try { if (resourceHandlerChain != null) { @@ -476,6 +477,9 @@ public int launchContainer(ContainerStartContext ctx) case TC_MODIFY_STATE: tcCommandFile = op.getArguments().get(0); break; + case ADD_NUMA_PARAMS: + numaArgs = op.getArguments(); + break; default: LOG.warn("PrivilegedOperation type unsupported in launch: " + op.getOperationType()); @@ -506,7 +510,7 @@ public int launchContainer(ContainerStartContext ctx) if (pidFilePath != null) { ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext( - ctx, pidFilePath, resourcesOptions, tcCommandFile); + ctx, pidFilePath, resourcesOptions, tcCommandFile, numaArgs); linuxContainerRuntime.launchContainer(runtimeContext); } else { @@ -581,11 +585,12 @@ public int launchContainer(ContainerStartContext ctx) } private ContainerRuntimeContext buildContainerRuntimeContext( - ContainerStartContext ctx, Path pidFilePath, - String resourcesOptions, String tcCommandFile) { + ContainerStartContext ctx, Path pidFilePath, String resourcesOptions, + String tcCommandFile, List numaArgs) { List prefixCommands = new ArrayList<>(); addSchedPriorityCommand(prefixCommands); + addNumaArgsToCommand(prefixCommands, numaArgs); Container container = ctx.getContainer(); @@ -624,6 +629,13 @@ private ContainerRuntimeContext buildContainerRuntimeContext( return builder.build(); } + private void addNumaArgsToCommand(List prefixCommands, + List numaArgs) { + if (numaArgs != null) { + prefixCommands.addAll(numaArgs); + } + } + @Override public String[] getIpAndHost(Container container) { return linuxContainerRuntime.getIpAndHost(container); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index bf4b43c..9c3fe62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -70,6 +70,8 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.NumaResourceAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -404,6 +406,12 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nodeStatusUpdater); ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + // Intialize Numa Resource Allocator + if (YarnConfiguration.numaAwarenessEnabled(conf)) { + LocalResourceAllocators + .setNumaResourceAllocator(new NumaResourceAllocator(context)); + } + super.serviceInit(conf); // TODO add local dirs to del } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java index 8402a16..aaa3f93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java @@ -51,7 +51,8 @@ TC_READ_STATS("--tc-read-stats"), ADD_PID_TO_CGROUP(""), //no CLI switch supported yet. RUN_DOCKER_CMD("--run-docker"), - LIST_AS_USER(""); //no CLI switch supported yet. + LIST_AS_USER(""), //no CLI switch supported yet. + ADD_NUMA_PARAMS(""); private final String option; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NumaResourceHandlerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NumaResourceHandlerImpl.java new file mode 100644 index 0000000..0722a43 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NumaResourceHandlerImpl.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation.OperationType; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.NumaResourceAllocation; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.NumaResourceAllocator; + +public class NumaResourceHandlerImpl implements ResourceHandler { + + private NumaResourceAllocator numaResourceAllocator; + private String numaCtlCmd; + + public NumaResourceHandlerImpl(Configuration conf) { + numaResourceAllocator = LocalResourceAllocators.getNumaResourceAllocator(); + numaCtlCmd = conf.get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD); + } + + @Override + public List bootstrap(Configuration configuration) + throws ResourceHandlerException { + try { + numaResourceAllocator.init(configuration); + } catch (YarnException e) { + throw new ResourceHandlerException(e); + } + return null; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + List ret = null; + NumaResourceAllocation numaAllocation = numaResourceAllocator + .allocateNumaNode(container.getContainerId(), container.getResource()); + if (numaAllocation != null) { + ret = new ArrayList<>(); + ArrayList args = new ArrayList<>(); + args.add(numaCtlCmd); + args.add( + "--interleave=" + String.join(",", numaAllocation.getMemNodes())); + args.add( + "--cpunodebind=" + String.join(",", numaAllocation.getCpuNodes())); + ret.add(new PrivilegedOperation(OperationType.ADD_NUMA_PARAMS, args)); + } + return ret; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + try { + numaResourceAllocator.recoverNumaResource(containerId); + } catch (Throwable e) { + throw new ResourceHandlerException( + "Failed to recover numa resource for " + containerId, e); + } + return null; + } + + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + numaResourceAllocator.releaseNumaResource(containerId); + return null; + } + + @Override + public List teardown() throws ResourceHandlerException { + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java index a81a77b..df2befb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java @@ -198,6 +198,13 @@ public static MemoryResourceHandler getMemoryResourceHandler( return cGroupsMemoryResourceHandler; } + private static ResourceHandler getNumaResourceHandler(Configuration conf) { + if (YarnConfiguration.numaAwarenessEnabled(conf)) { + return new NumaResourceHandlerImpl(conf); + } + return null; + } + private static void addHandlerIfNotNull(List handlerList, ResourceHandler handler) { if (handler != null) { @@ -213,6 +220,7 @@ private static void initializeConfiguredResourceHandlerChain( addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf)); + addHandlerIfNotNull(handlerList, getNumaResourceHandler(conf)); resourceHandlerChain = new ResourceHandlerChain(handlerList); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java new file mode 100644 index 0000000..fd40c36 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +public class LocalResourceAllocators { + + private static NumaResourceAllocator numaResourceAllocator; + + public static void setNumaResourceAllocator( + NumaResourceAllocator numaResourceAllocator) { + LocalResourceAllocators.numaResourceAllocator = numaResourceAllocator; + } + + public static NumaResourceAllocator getNumaResourceAllocator() { + return numaResourceAllocator; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaNodeResource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaNodeResource.java new file mode 100644 index 0000000..b18bef6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaNodeResource.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * NumaNodeResource class holds the NUMA node topology with the total and used + * resources. + */ +public class NumaNodeResource { + private String nodeId; + private long totalMemory; + private int totalCpus; + private long usedMemory; + private int usedCpus; + + private static final Log LOG = LogFactory.getLog(NumaNodeResource.class); + + private Map containerVsMemUsage = + new ConcurrentHashMap<>(); + private Map containerVsCpusUsage = + new ConcurrentHashMap<>(); + + public NumaNodeResource(String nodeId, long totalMemory, int totalCpus) { + this.nodeId = nodeId; + this.totalMemory = totalMemory; + this.totalCpus = totalCpus; + } + + public boolean isResourcesAvailable(Resource resource) { + LOG.debug( + "Memory available:" + (totalMemory - usedMemory) + ", CPUs available:" + + (totalCpus - usedCpus) + ", requested:" + resource); + if ((totalMemory - usedMemory) >= resource.getMemorySize() + && (totalCpus - usedCpus) >= resource.getVirtualCores()) { + return true; + } + return false; + } + + public long assignAvailableMemory(long memreq, ContainerId containerId) { + long memAvailable = totalMemory - usedMemory; + if (memAvailable >= memreq) { + containerVsMemUsage.put(containerId, memreq); + usedMemory += memreq; + return 0; + } else { + usedMemory += memAvailable; + containerVsMemUsage.put(containerId, memAvailable); + return memreq - memAvailable; + } + } + + public int assignAvailableCpus(int cpusreq, ContainerId containerId) { + int cpusAvailable = totalCpus - usedCpus; + if (cpusAvailable >= cpusreq) { + containerVsCpusUsage.put(containerId, cpusreq); + usedCpus += cpusreq; + return 0; + } else { + usedCpus += cpusAvailable; + containerVsCpusUsage.put(containerId, cpusAvailable); + return cpusreq - cpusAvailable; + } + } + + public void assignResources(Resource resource, ContainerId containerId) { + containerVsMemUsage.put(containerId, resource.getMemorySize()); + containerVsCpusUsage.put(containerId, resource.getVirtualCores()); + usedMemory += resource.getMemorySize(); + usedCpus += resource.getVirtualCores(); + } + + public void releaseResources(ContainerId containerId) { + if (containerVsMemUsage.containsKey(containerId)) { + usedMemory -= containerVsMemUsage.get(containerId); + containerVsMemUsage.remove(containerId); + } + if (containerVsCpusUsage.containsKey(containerId)) { + usedCpus -= containerVsCpusUsage.get(containerId); + containerVsCpusUsage.remove(containerId); + } + } + + public void recoverMemory(ContainerId containerId, long memory) { + containerVsMemUsage.put(containerId, memory); + usedMemory += memory; + } + + public void recoverCpus(ContainerId containerId, int cpus) { + containerVsCpusUsage.put(containerId, cpus); + usedCpus += cpus; + } + + @Override + public String toString() { + return "Node Id:" + nodeId + "\tMemory:" + totalMemory + "\tCPus:" + + totalCpus; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode()); + result = prime * result + (int) (totalMemory ^ (totalMemory >>> 32)); + result = prime * result + totalCpus; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + NumaNodeResource other = (NumaNodeResource) obj; + if (nodeId == null) { + if (other.nodeId != null) { + return false; + } + } else if (!nodeId.equals(other.nodeId)) { + return false; + } + if (totalMemory != other.totalMemory) { + return false; + } + if (totalCpus != other.totalCpus) { + return false; + } + return true; + } + + public String getNodeId() { + return nodeId; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaResourceAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaResourceAllocation.java new file mode 100644 index 0000000..65569c3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaResourceAllocation.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * NumaResourceAllocation contains Memory nodes and CPU nodes assigned to a + * container. + */ +public class NumaResourceAllocation implements Serializable { + private static final long serialVersionUID = 6339719798446595123L; + private Map nodeVsMemory; + private Map nodeVsCpus; + + public NumaResourceAllocation() { + nodeVsMemory = new HashMap<>(); + nodeVsCpus = new HashMap<>(); + } + + public NumaResourceAllocation(String memNodeId, long memory, String cpuNodeId, + int cpus) { + this(); + nodeVsMemory.put(memNodeId, memory); + nodeVsCpus.put(cpuNodeId, cpus); + } + + public void addMemoryNode(String memNodeId, long memory) { + nodeVsMemory.put(memNodeId, memory); + } + + public void addCpuNode(String cpuNodeId, int cpus) { + nodeVsCpus.put(cpuNodeId, cpus); + } + + public Set getMemNodes() { + return nodeVsMemory.keySet(); + } + + public Set getCpuNodes() { + return nodeVsCpus.keySet(); + } + + public Map getNodeVsMemory() { + return nodeVsMemory; + } + + public Map getNodeVsCpus() { + return nodeVsCpus; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaResourceAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaResourceAllocator.java new file mode 100644 index 0000000..6e6840e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/NumaResourceAllocator.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; + +import com.google.common.annotations.VisibleForTesting; + +/** + * NUMA Resources Allocator reads the NUMA topology and assigns NUMA nodes to + * the containers. + */ +public class NumaResourceAllocator { + + private static final Log LOG = LogFactory.getLog(NumaResourceAllocator.class); + + // Regex to find node ids, Ex: 'available: 2 nodes (0-1)' + private static final String NUMA_NODEIDS_REGEX = + "available:\\s*[0-9]+\\s*nodes\\s*\\(([0-9\\-,]*)\\)"; + + // Regex to find node memory, Ex: 'node 0 size: 73717 MB' + private static final String NUMA_NODE_MEMORY_REGEX = + "node\\s*\\s*size:\\s*([0-9]+)\\s*([KMG]B)"; + + // Regex to find node cpus, Ex: 'node 0 cpus: 0 2 4 6' + private static final String NUMA_NODE_CPUS_REGEX = + "node\\s*\\s*cpus:\\s*([0-9\\s]+)"; + + private static final String GB = "GB"; + private static final String KB = "KB"; + private static final String NUMA_NODE = ""; + private static final String SPACE = "\\s"; + private static final long DEFAULT_NUMA_NODE_MEMORY = 1024; + private static final int DEFAULT_NUMA_NODE_CPUS = 1; +// private static final String NUMA_RESOURCE_TYPE = "numa"; + + private List numaNodesList = new ArrayList<>(); + private Map numaNodeIdVsResource = new HashMap<>(); + private int noOfNumaNodes; + private int currentAssignNode; + +// private Context context; + + public NumaResourceAllocator(Context context) { +// this.context = context; + } + + public void init(Configuration conf) throws YarnException { + if (conf.getBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, + YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_READ_TOPOLOGY)) { + LOG.info("Reading NUMA topology using 'numactl --hardware' command."); + String cmdOutput = executeNGetCmdOutput(); + String[] outputLines = cmdOutput.split("\\n"); + Pattern pattern = Pattern.compile(NUMA_NODEIDS_REGEX); + String nodeIdsStr = null; + for (String line : outputLines) { + Matcher matcher = pattern.matcher(line); + if (matcher.find()) { + nodeIdsStr = matcher.group(1); + break; + } + } + if (nodeIdsStr == null) { + throw new YarnException("Failed to get numa nodes from" + + " 'numactl --hardware' output and output is:\n" + cmdOutput); + } + String[] nodeIdCommaSplits = nodeIdsStr.split("[,\\s]"); + for (String nodeIdOrRange : nodeIdCommaSplits) { + if (nodeIdOrRange.contains("-")) { + String[] beginNEnd = nodeIdOrRange.split("-"); + for (int nodeId = Integer.parseInt(beginNEnd[0]); nodeId <= Integer + .parseInt(beginNEnd[1]); nodeId++) { + long memory = parseMemory(outputLines, String.valueOf(nodeId)); + int cpus = parseCpus(outputLines, String.valueOf(nodeId)); + addToCollection(String.valueOf(nodeId), memory, cpus); + } + } else { + long memory = parseMemory(outputLines, nodeIdOrRange); + int cpus = parseCpus(outputLines, nodeIdOrRange); + addToCollection(nodeIdOrRange, memory, cpus); + } + } + } else { + LOG.info("Reading NUMA topology using configurations."); + Collection nodeIds = conf + .getStringCollection(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS); + for (String nodeId : nodeIds) { + long mem = conf.getLong( + "yarn.nodemanager.numa-awareness." + nodeId + ".memory", + DEFAULT_NUMA_NODE_MEMORY); + int cpus = conf.getInt( + "yarn.nodemanager.numa-awareness." + nodeId + ".cpus", + DEFAULT_NUMA_NODE_CPUS); + addToCollection(nodeId, mem, cpus); + } + } + if (numaNodesList.isEmpty()) { + throw new YarnException("There are no available NUMA nodes" + + " for making containers NUMA aware."); + } + noOfNumaNodes = numaNodesList.size(); + LOG.info("Available numa nodes with capacities : " + numaNodesList); + } + + @VisibleForTesting + String executeNGetCmdOutput() throws YarnException { + String[] args = new String[] {"numactl", "--hardware"}; + ShellCommandExecutor shExec = new ShellCommandExecutor(args); + try { + shExec.execute(); + } catch (IOException e) { + throw new YarnException("Failed to read the numa configurations.", e); + } + String cmdOutput = shExec.getOutput(); + return cmdOutput; + } + + private int parseCpus(String[] outputLines, String nodeId) { + int cpus = 0; + Pattern patternNodeCPUs = Pattern + .compile(NUMA_NODE_CPUS_REGEX.replace(NUMA_NODE, nodeId)); + for (String line : outputLines) { + Matcher matcherNodeCPUs = patternNodeCPUs.matcher(line); + if (matcherNodeCPUs.find()) { + String cpusStr = matcherNodeCPUs.group(1); + cpus = cpusStr.split(SPACE).length; + break; + } + } + return cpus; + } + + private long parseMemory(String[] outputLines, String nodeId) + throws YarnException { + long memory = 0; + String units; + Pattern patternNodeMem = Pattern + .compile(NUMA_NODE_MEMORY_REGEX.replace(NUMA_NODE, nodeId)); + for (String line : outputLines) { + Matcher matcherNodeMem = patternNodeMem.matcher(line); + if (matcherNodeMem.find()) { + try { + memory = Long.parseLong(matcherNodeMem.group(1)); + units = matcherNodeMem.group(2); + if (GB.equals(units)) { + memory = memory * 1024; + } else if (KB.equals(units)) { + memory = memory / 1024; + } + } catch (Exception ex) { + throw new YarnException("Failed to get memory for node:" + nodeId, + ex); + } + break; + } + } + return memory; + } + + private void addToCollection(String nodeId, long memory, int cpus) { + NumaNodeResource numaNode = new NumaNodeResource(nodeId, memory, cpus); + numaNodesList.add(numaNode); + numaNodeIdVsResource.put(nodeId, numaNode); + } + + /** + * Allocates the available NUMA nodes for the requested containerId with + * resource in a round robin fashion. + * + * @param containerId the container ID + * @param resource resource for the Numa Node + * @return the assigned NUMA Node info or null if resources not available. + */ + public synchronized NumaResourceAllocation allocateNumaNode( + ContainerId containerId, Resource resource) { + NumaResourceAllocation numaNode = allocate(containerId, resource); +// if (numaNode != null) { +// AssignedResources assigned = new AssignedResources(); +// assigned.updateAssignedResources(Arrays.asList(numaNode)); +// context.getContainers().get(containerId).getResourceMappings() +// .addAssignedResources(NUMA_RESOURCE_TYPE, assigned); +// } + return numaNode; + } + + private NumaResourceAllocation allocate(ContainerId containerId, + Resource resource) { + for (int index = 0; index < noOfNumaNodes; index++) { + NumaNodeResource numaNode = numaNodesList + .get((currentAssignNode + index) % noOfNumaNodes); + if (numaNode.isResourcesAvailable(resource)) { + numaNode.assignResources(resource, containerId); + LOG.info("Assigning NUMA node " + numaNode.getNodeId() + " for memory, " + + numaNode.getNodeId() + " for cpus for the " + containerId); + currentAssignNode = (currentAssignNode + index + 1) % noOfNumaNodes; + return new NumaResourceAllocation(numaNode.getNodeId(), + resource.getMemorySize(), numaNode.getNodeId(), + resource.getVirtualCores()); + } + } + + // If there is no single node matched for the container resource + // Check the NUMA nodes for Memory resources + NumaResourceAllocation assignedNumaNodeInfo = new NumaResourceAllocation(); + StringBuilder memNodes = new StringBuilder(); + long memreq = resource.getMemorySize(); + for (NumaNodeResource numaNode : numaNodesList) { + long memrem = numaNode.assignAvailableMemory(memreq, containerId); + assignedNumaNodeInfo.addMemoryNode(numaNode.getNodeId(), memreq - memrem); + memreq = memrem; + if (memreq == 0) { + break; + } + } + if (memreq != 0) { + LOG.info("There is no available memory:" + resource.getMemorySize() + + " in numa nodes for " + containerId); + releaseNumaResource(containerId); + return null; + } + + // Check the NUMA nodes for CPU resources + StringBuilder cpuNodes = new StringBuilder(); + int cpusreq = resource.getVirtualCores(); + for (int index = 0; index < noOfNumaNodes; index++) { + NumaNodeResource numaNode = numaNodesList + .get((currentAssignNode + index) % noOfNumaNodes); + int cpusrem = numaNode.assignAvailableCpus(cpusreq, containerId); + assignedNumaNodeInfo.addCpuNode(numaNode.getNodeId(), cpusreq - cpusrem); + cpusreq = cpusrem; + if (cpusreq == 0) { + currentAssignNode = (currentAssignNode + index + 1) % noOfNumaNodes; + break; + } + } + + if (cpusreq != 0) { + LOG.info("There are no available cpus:" + resource.getVirtualCores() + + " in numa nodes for " + containerId); + releaseNumaResource(containerId); + return null; + } + LOG.info("Assigning multiple NUMA nodes (" + memNodes.toString() + + ") for memory, (" + cpuNodes.toString() + ") for cpus for " + + containerId); + return assignedNumaNodeInfo; + } + + /** + * Release assigned NUMA resources for the container. + * + * @param containerId the container ID + */ + public synchronized void releaseNumaResource(ContainerId containerId) { + LOG.info("Releasing the assigned NUMA resources for " + containerId); + for (NumaNodeResource numaNode : numaNodesList) { + numaNode.releaseResources(containerId); + } + } + + /** + * Recovers assigned numa resources + * + * @param containerId + */ + public synchronized void recoverNumaResource(ContainerId containerId) { +// Container container = context.getContainers().get(containerId); +// ResourceMappings resourceMappings = container.getResourceMappings(); +// List assignedResources = resourceMappings +// .getAssignedResources(NUMA_RESOURCE_TYPE); +// if (assignedResources.size() == 1) { +// NumaResourceAllocation numaResourceAllocation = +// (NumaResourceAllocation) assignedResources.get(0); +// for (Entry nodeAndMemory : numaResourceAllocation +// .getNodeVsMemory().entrySet()) { +// numaNodeIdVsResource.get(nodeAndMemory.getKey()) +// .recoverMemory(containerId, nodeAndMemory.getValue()); +// } +// for (Entry nodeAndCpus : numaResourceAllocation +// .getNodeVsCpus().entrySet()) { +// numaNodeIdVsResource.get(nodeAndCpus.getKey()).recoverCpus(containerId, +// nodeAndCpus.getValue()); +// } +// } else { +// LOG.error("Unexpected number:" + assignedResources.size() +// + " of assigned numa resources for " + containerId +// + " while recovering."); +// } + } + + @VisibleForTesting + Collection getNumaNodesList() { + return numaNodesList; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/package-info.java new file mode 100644 index 0000000..5d01cf1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators + * contains classes related to NM local scheduler allocators. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestNumaResourceHandlerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestNumaResourceHandlerImpl.java new file mode 100644 index 0000000..76778a6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestNumaResourceHandlerImpl.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +//import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +//import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.NumaResourceAllocator; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; + +/** + * Test class for NumaResourceHandlerImpl. + * + */ +public class TestNumaResourceHandlerImpl { + + private YarnConfiguration conf; + private NumaResourceHandlerImpl numaResourceHandler; + private Container mockContainer; + + @Before + public void setUp() throws IOException, ResourceHandlerException { + conf = new YarnConfiguration(); + setNumaTopologyConfigs(); + Context mockContext = createAndGetMockContext(); + NumaResourceAllocator numaResourceAllocator = new NumaResourceAllocator( + mockContext); + LocalResourceAllocators.setNumaResourceAllocator(numaResourceAllocator); + numaResourceHandler = new NumaResourceHandlerImpl(conf); + numaResourceHandler.bootstrap(conf); + mockContainer = mock(Container.class); + } + + @Test + public void testAllocateNumaMemoryResource() throws ResourceHandlerException { + // allocates node 0 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000001", + Resource.newInstance(2048, 2), "0", "0"); + // allocates node 1 for memory and cpu since allocator uses round + // robin assignment + testAllocateNumaResource("container_1481156246874_0001_01_000002", + Resource.newInstance(60000, 2), "1", "1"); + // allocates node 0,1 for memory since there is no sufficient memory in any + // one node + testAllocateNumaResource("container_1481156246874_0001_01_000003", + Resource.newInstance(80000, 2), "0,1", "0"); + // returns null since there are no sufficient resources available for the + // request + when(mockContainer.getContainerId()).thenReturn( + ContainerId.fromString("container_1481156246874_0001_01_000004")); + when(mockContainer.getResource()) + .thenReturn(Resource.newInstance(80000, 2)); + assertNull(numaResourceHandler.preStart(mockContainer)); + // allocates node 1 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000005", + Resource.newInstance(1024, 2), "1", "1"); + } + + @Test + public void testAllocateNumaCpusResource() throws ResourceHandlerException { + // allocates node 0 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000001", + Resource.newInstance(2048, 2), "0", "0"); + // allocates node 1 for memory and cpu since allocator uses round + // robin assignment + testAllocateNumaResource("container_1481156246874_0001_01_000002", + Resource.newInstance(2048, 2), "1", "1"); + // allocates node 0,1 for cpus since there is are no sufficient cpus + // available in any one node + testAllocateNumaResource("container_1481156246874_0001_01_000003", + Resource.newInstance(2048, 3), "0", "0,1"); + // returns null since there are no sufficient resources available for the + // request + when(mockContainer.getContainerId()).thenReturn( + ContainerId.fromString("container_1481156246874_0001_01_000004")); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 2)); + assertNull(numaResourceHandler.preStart(mockContainer)); + // allocates node 1 for memory and cpu + testAllocateNumaResource("container_1481156246874_0001_01_000005", + Resource.newInstance(2048, 1), "1", "1"); + } + +// @Test +// public void testReacquireContainer() throws Exception { +// @SuppressWarnings("unchecked") +// ConcurrentHashMap mockContainers = mock( +// ConcurrentHashMap.class); +// Context mockContext = mock(Context.class); +// ResourceMappings resourceMappings = new ResourceMappings(); +// AssignedResources assignedRscs = new AssignedResources(); +// NumaResourceAllocation numaResourceAllocation = new NumaResourceAllocation( +// "0", 70000, "0", 4); +// assignedRscs.updateAssignedResources(Arrays.asList(numaResourceAllocation)); +// resourceMappings.addAssignedResources("numa", assignedRscs); +// when(mockContainer.getResourceMappings()).thenReturn(resourceMappings); +// when(mockContainers.get(Matchers.any())).thenReturn(mockContainer); +// when(mockContext.getContainers()).thenReturn(mockContainers); +// NumaResourceAllocator numaResourceAllocator = new NumaResourceAllocator( +// mockContext); +// LocalResourceAllocators.setNumaResourceAllocator(numaResourceAllocator); +// numaResourceHandler = new NumaResourceHandlerImpl(conf); +// numaResourceHandler.bootstrap(conf); +// // recovered numa resources should be added to the used resources and +// // remaining will be available for further allocation. +// numaResourceHandler.reacquireContainer( +// ContainerId.fromString("container_1481156246874_0001_01_000001")); +// +// testAllocateNumaResource("container_1481156246874_0001_01_000005", +// Resource.newInstance(2048, 1), "1", "1"); +// when(mockContainer.getContainerId()).thenReturn( +// ContainerId.fromString("container_1481156246874_0001_01_000005")); +// when(mockContainer.getResource()).thenReturn(Resource.newInstance(2048, 4)); +// List preStart = numaResourceHandler +// .preStart(mockContainer); +// assertNull(preStart); +// } + + private void setNumaTopologyConfigs() { + conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1"); + conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717"); + conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4"); + conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727"); + conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4"); + } + + private Context createAndGetMockContext() { + Context mockContext = mock(Context.class); + @SuppressWarnings("unchecked") + ConcurrentHashMap mockContainers = mock( + ConcurrentHashMap.class); + Container mockContainer = mock(Container.class); +// when(mockContainer.getResourceMappings()) +// .thenReturn(new ResourceMappings()); + when(mockContainers.get(Matchers.any())).thenReturn(mockContainer); + when(mockContext.getContainers()).thenReturn(mockContainers); + return mockContext; + } + + private void testAllocateNumaResource(String containerId, Resource resource, + String memNodes, String cpuNodes) throws ResourceHandlerException { + when(mockContainer.getContainerId()) + .thenReturn(ContainerId.fromString(containerId)); + when(mockContainer.getResource()).thenReturn(resource); + List preStart = numaResourceHandler + .preStart(mockContainer); + List arguments = preStart.get(0).getArguments(); + assertEquals(arguments, Arrays.asList("/usr/bin/numactl", + "--interleave=" + memNodes, "--cpunodebind=" + cpuNodes)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/TestNumaResourceAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/TestNumaResourceAllocator.java new file mode 100644 index 0000000..05d4712 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/TestNumaResourceAllocator.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +//import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +//import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings.AssignedResources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; + +/** + * Test class for NumaResourceAllocator. + */ +public class TestNumaResourceAllocator { + + private Configuration conf; + private NumaResourceAllocator numaResourceAllocator; + + @Before + public void setUp() throws IOException, YarnException { + conf = new YarnConfiguration(); + Context mockContext = mock(Context.class); + @SuppressWarnings("unchecked") + ConcurrentHashMap mockContainers = mock( + ConcurrentHashMap.class); + Container mockContainer = mock(Container.class); +// when(mockContainer.getResourceMappings()) +// .thenReturn(new ResourceMappings()); + when(mockContainers.get(Matchers.any())).thenReturn(mockContainer); + when(mockContext.getContainers()).thenReturn(mockContainers); + numaResourceAllocator = new NumaResourceAllocator(mockContext); + setNumaTopologyConfigs(); + numaResourceAllocator.init(conf); + } + + @Test + public void testReadNumaTopologyFromConfigurations() throws Exception { + Collection nodesList = numaResourceAllocator.getNumaNodesList(); + Collection expectedNodesList = getExpectedNumaNodesList(); + Assert.assertEquals(expectedNodesList, nodesList); + } + + @Test + public void testReadNumaTopologyFromCmdOutput() throws Exception { + conf.setBoolean(YarnConfiguration.NM_NUMA_AWARENESS_READ_TOPOLOGY, true); + String cmdOutput = "available: 2 nodes (0-1)\n\t" + + "node 0 cpus: 0 2 4 6\n\t" + + "node 0 size: 73717 MB\n\t" + + "node 0 free: 17272 MB\n\t" + + "node 1 cpus: 1 3 5 7\n\t" + + "node 1 size: 73727 MB\n\t" + + "node 1 free: 10699 MB\n\t" + + "node distances:\n\t" + + "node 0 1\n\t" + + "0: 10 20\n\t" + + "1: 20 10"; + numaResourceAllocator = new NumaResourceAllocator(mock(Context.class)) { + @Override + String executeNGetCmdOutput() throws YarnRuntimeException { + return cmdOutput; + } + }; + numaResourceAllocator.init(conf); + Collection nodesList = numaResourceAllocator.getNumaNodesList(); + Collection expectedNodesList = getExpectedNumaNodesList(); + Assert.assertEquals(expectedNodesList, nodesList); + } + + @Test + public void testAllocateNumaNode() throws Exception { + NumaResourceAllocation nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes())); + Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes())); + } + + @Test + public void testAllocateNumaNodeWithRoundRobinFashionAssignment() + throws Exception { + NumaResourceAllocation nodeInfo1 = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("0", String.join(",", nodeInfo1.getMemNodes())); + Assert.assertEquals("0", String.join(",", nodeInfo1.getCpuNodes())); + + NumaResourceAllocation nodeInfo2 = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000002"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("1", String.join(",", nodeInfo2.getMemNodes())); + Assert.assertEquals("1", String.join(",", nodeInfo2.getCpuNodes())); + + NumaResourceAllocation nodeInfo3 = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000003"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("0", String.join(",", nodeInfo3.getMemNodes())); + Assert.assertEquals("0", String.join(",", nodeInfo3.getCpuNodes())); + + NumaResourceAllocation nodeInfo4 = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000003"), + Resource.newInstance(2048, 2)); + Assert.assertEquals("1", String.join(",", nodeInfo4.getMemNodes())); + Assert.assertEquals("1", String.join(",", nodeInfo4.getCpuNodes())); + } + + @Test + public void testAllocateNumaNodeWithMultipleNodesForMemory() + throws Exception { + NumaResourceAllocation nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(102400, 2)); + Assert.assertEquals("0,1", String.join(",", nodeInfo.getMemNodes())); + Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes())); + } + + @Test + public void testAllocateNumaNodeWithMultipleNodesForCpus() throws Exception { + NumaResourceAllocation nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 6)); + Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes())); + Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes())); + } + + @Test + public void testAllocateNumaNodeWhenNoNumaMemResourcesAvailable() + throws Exception { + NumaResourceAllocation nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048000, 6)); + Assert.assertNull("Should not assign numa nodes when there" + + " are no sufficient memory resources available.", nodeInfo); + } + + @Test + public void testAllocateNumaNodeWhenNoNumaCpuResourcesAvailable() + throws Exception { + NumaResourceAllocation nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 600)); + Assert.assertNull("Should not assign numa nodes when there" + + " are no sufficient cpu resources available.", nodeInfo); + } + + @Test + public void testReleaseNumaResourcess() throws Exception { + NumaResourceAllocation nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000001"), + Resource.newInstance(2048, 8)); + Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes())); + Assert.assertEquals("0,1", String.join(",", nodeInfo.getCpuNodes())); + + // Request the resource when all cpu nodes occupied + nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000002"), + Resource.newInstance(2048, 4)); + Assert.assertNull("Should not assign numa nodes when there" + + " are no sufficient cpu resources available.", nodeInfo); + + // Release the resources + numaResourceAllocator.releaseNumaResource( + ContainerId.fromString("container_1481156246874_0001_01_000001")); + // Request the resources + nodeInfo = numaResourceAllocator.allocateNumaNode( + ContainerId.fromString("container_1481156246874_0001_01_000003"), + Resource.newInstance(1024, 2)); + Assert.assertEquals("0", String.join(",", nodeInfo.getMemNodes())); + Assert.assertEquals("0", String.join(",", nodeInfo.getCpuNodes())); + } + +// @Test +// public void testRecoverNumaResource() throws Exception { +// @SuppressWarnings("unchecked") +// ConcurrentHashMap mockContainers = mock( +// ConcurrentHashMap.class); +// Context mockContext = mock(Context.class); +// Container mockContainer = mock(Container.class); +// ResourceMappings value = new ResourceMappings(); +// AssignedResources assignedResources = new AssignedResources(); +// assignedResources.updateAssignedResources( +// Arrays.asList(new NumaResourceAllocation("0", 70000, "0", 4))); +// value.addAssignedResources("numa", assignedResources); +// when(mockContainer.getResourceMappings()).thenReturn(value); +// when(mockContainers.get(Matchers.any())).thenReturn(mockContainer); +// when(mockContext.getContainers()).thenReturn(mockContainers); +// +// numaResourceAllocator = new NumaResourceAllocator(mockContext); +// numaResourceAllocator.init(conf); +// // Recover the resources +// numaResourceAllocator.recoverNumaResource( +// ContainerId.fromString("container_1481156246874_0001_01_000001")); +// +// // Request resources based on the availability +// NumaResourceAllocation numaNode = numaResourceAllocator.allocateNumaNode( +// ContainerId.fromString("container_1481156246874_0001_01_000005"), +// Resource.newInstance(2048, 1)); +// assertEquals("1", String.join(",", numaNode.getMemNodes())); +// assertEquals("1", String.join(",", numaNode.getCpuNodes())); +// +// // Request resources more than the available +// numaNode = numaResourceAllocator.allocateNumaNode( +// ContainerId.fromString("container_1481156246874_0001_01_000006"), +// Resource.newInstance(2048, 4)); +// assertNull(numaNode); +// } + + private void setNumaTopologyConfigs() { + conf.set(YarnConfiguration.NM_NUMA_AWARENESS_NODE_IDS, "0,1"); + conf.set("yarn.nodemanager.numa-awareness.0.memory", "73717"); + conf.set("yarn.nodemanager.numa-awareness.0.cpus", "4"); + conf.set("yarn.nodemanager.numa-awareness.1.memory", "73727"); + conf.set("yarn.nodemanager.numa-awareness.1.cpus", "4"); + } + + private Collection getExpectedNumaNodesList() { + Collection expectedNodesList = new ArrayList<>(2); + expectedNodesList.add(new NumaNodeResource("0", 73717, 4)); + expectedNodesList.add(new NumaNodeResource("1", 73727, 4)); + return expectedNodesList; + } +}