diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java index e8a5714..8a72ea6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -88,6 +88,15 @@ public static SysInfo newInstance() { public abstract int getNumCores(); /** + * Obtain total number of socket on the system. + * + * @return number of cpu socket + */ + public int getNumCpuSocket() { + return 2; + } + + /** * Obtain the CPU frequency of on the system. * * @return CPU frequency in kHz diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java index 7fd1990..3b5ecf2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -27,11 +27,13 @@ import java.nio.charset.Charset; import java.util.HashMap; import java.util.HashSet; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -139,6 +141,7 @@ private int numProcessors = 0; /* number of physical cores on the system. */ private int numCores = 0; + private int numCpuSocket = 0; private long cpuFrequency = 0L; // CPU frequency on the system (kHz) private long numNetBytesRead = 0L; // aggregated bytes read from network private long numNetBytesWritten = 0L; // aggregated bytes written to network @@ -310,6 +313,7 @@ private void readProcCpuInfoFile() { return; } Matcher mat; + Set physicalIds = Sets.newTreeSet(); try { numProcessors = 0; numCores = 1; @@ -327,6 +331,7 @@ private void readProcCpuInfoFile() { mat = PHYSICAL_ID_FORMAT.matcher(str); if (mat.find()) { currentPhysicalId = str; + physicalIds.add(currentPhysicalId); } mat = CORE_ID_FORMAT.matcher(str); if (mat.find()) { @@ -335,6 +340,7 @@ private void readProcCpuInfoFile() { } str = in.readLine(); } + numCpuSocket = physicalIds.size(); } catch (IOException io) { LOG.warn("Error reading the stream " + io); } finally { @@ -616,6 +622,12 @@ public int getNumCores() { } /** {@inheritDoc} */ + public int getNumCpuSocket() { + readProcCpuInfoFile(); + return numCpuSocket; + } + + /** {@inheritDoc} */ @Override public long getCpuFrequency() { readProcCpuInfoFile(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java index e279504..9fddd62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java @@ -83,6 +83,7 @@ public String getName() { String CGROUP_CPU_PERIOD_US = "cfs_period_us"; String CGROUP_CPU_QUOTA_US = "cfs_quota_us"; String CGROUP_CPU_SHARES = "shares"; + String CGROUP_PARAM_CPUSET_CPUS = "cpus"; /** * Mounts or initializes a cgroup controller. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CGroupsCpusetResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CGroupsCpusetResourceHandlerImpl.java new file mode 100644 index 0000000..813afaf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CGroupsCpusetResourceHandlerImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.cpuset; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGroupController; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CGroupsCpusetResourceHandlerImpl implements CpusetResourceHandler { + public static final Logger LOG = LoggerFactory.getLogger( + CGroupsCpusetResourceHandlerImpl.class); + + private CpusetAllocator cpusetAllocator; + private CGroupsHandler cGroupsHandler; + + CGroupsCpusetResourceHandlerImpl(CGroupsHandler cGroupsHandler, + Context context) { + cpusetAllocator = new CpusetAllocator(cGroupsHandler, context); + this.cGroupsHandler = cGroupsHandler; + } + + @Override public List bootstrap( + Configuration configuration) throws ResourceHandlerException { + return null; + } + + @Override public List preStart(Container container) + throws ResourceHandlerException { + String cgroupId = container.getContainerId().toString(); + cGroupsHandler.createCGroup(CGroupController.CPUSET, cgroupId); + try { + //allocate processor + cpusetAllocator.allocateContainerCpuset(container); + } catch (ResourceHandlerException e) { + cpusetAllocator.releaseContainerCpuset(container.getContainerId()); + cGroupsHandler.deleteCGroup(CGroupController.CPUSET, cgroupId); + LOG.warn("Could not update cgroup for container", e); + throw e; + } + return null; + } + + @Override public List updateContainer( + Container container) throws ResourceHandlerException { + cpusetAllocator.updateContainer(container); + return null; + } + + @Override public List reacquireContainer( + ContainerId containerId) throws ResourceHandlerException { + cpusetAllocator.recoverContainerCpuset(containerId); + return null; + } + + @Override public List postComplete( + ContainerId containerId) throws ResourceHandlerException { + cpusetAllocator.releaseContainerCpuset(containerId); + cGroupsHandler.deleteCGroup(CGroupController.CPUSET, + containerId.toString()); + return null; + } + + @Override public List teardown() + throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CpusetAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CpusetAllocator.java new file mode 100644 index 0000000..b5686b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CpusetAllocator.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.cpuset; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.shaded.com.google.common.collect.Sets; +import org.apache.hadoop.util.SysInfo; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGroupController; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CpusetAllocator { + public static final int VCORE_RATIO = 100; + public static final String CPUSET_RESOURCE_TYPE = "cpuset"; + public static final String CPUSET_MODE_ENV = "cpu_share_mode"; + public static final String PROCESSOR_ID_SEPARATOR = ","; + private final static Logger LOG = LoggerFactory.getLogger( + CpusetAllocator.class); + + private int[] cpus; + private int cpuHTNum; + private int cpuSocketNum; + private int physicalCoreNum; + private int physicalCorePerSocket; + + private Context context; + private CGroupsHandler cGroupsHandler; + private Map> assignedContainers = new HashMap<>(); + + public enum CpuShareMode { + EXCLUSIVE, + RESERVED, + SHARE, + NONE, + } + + public CpusetAllocator(CGroupsHandler cGroupsHandler, Context context) { + SysInfo sysInfo = SysInfo.newInstance(); + init(sysInfo.getNumProcessors(), sysInfo.getNumCpuSocket(), + sysInfo.getNumCores(), cGroupsHandler, context); + } + + @VisibleForTesting public CpusetAllocator(int cpuProcessorNum, + int cpuSocketNum, int physicalCoreNum, CGroupsHandler cGroupsHandler, + Context context) { + init(cpuProcessorNum, cpuSocketNum, physicalCoreNum, cGroupsHandler, + context); + } + + private void init(int cpuProcessorNum, int cpuSocketNum, int physicalCoreNum, + CGroupsHandler cGroupsHandler, Context context) { + this.cpuSocketNum = cpuSocketNum; + this.physicalCoreNum = physicalCoreNum; + this.cpuHTNum = cpuProcessorNum / physicalCoreNum; + cpus = new int[cpuProcessorNum]; + for (int i = 0; i < cpus.length; i++) { + cpus[i] = 1; + } + physicalCorePerSocket = physicalCoreNum / cpuSocketNum; + this.cGroupsHandler = cGroupsHandler; + this.context = context; + this.assignedContainers = new HashMap<>(); + } + + public synchronized void allocateContainerCpuset(Container container) + throws ResourceHandlerException { + CpuShareMode cpuShareMode = getCpuShareMode(container); + if (cpuShareMode == CpuShareMode.SHARE + || cpuShareMode == CpuShareMode.NONE) { + // no need to do allocate for SHARE/NONE + updateContainerCgroups(container); + return; + } + + Deque allocatedProcessorIds = new LinkedList<>(); + int allocateProcessorNum = container.getResource().getVirtualCores() + / VCORE_RATIO; + + if (allocateProcessor(allocateProcessorNum, allocatedProcessorIds)) { + String cpusetStr = getCpusetStr(allocatedProcessorIds); + assignedContainers.put(container.getContainerId(), allocatedProcessorIds); + //store allocation result + try { + context.getNMStateStore().storeAssignedResources(container, + CPUSET_RESOURCE_TYPE, Arrays.asList(cpusetStr)); + } catch (IOException ioe) { + LOG.warn("store cpuset result failed", ioe); + throw new ResourceHandlerException(ioe); + } + updateContainerCgroups(container); + } else { + String failErrorMsg = String.format( + "fail to allocate cpuset containerId=%s, require processNum=%d", + container.getContainerId().toString(), allocateProcessorNum); + LOG.info(failErrorMsg); + throw new ResourceHandlerException(failErrorMsg); + } + + // update all SHARE/NONE containers + updateShareOrNoneModeContainers(); + LOG.info("succeed to allocate cpuset containerId={}, processorIds={}, " + + "processorNum={}", container.getContainerId(), + allocatedProcessorIds, + allocatedProcessorIds.size()); + } + + public synchronized void recoverContainerCpuset(ContainerId containerId) { + Container container = context.getContainers().get(containerId); + ResourceMappings resourceMappings = container.getResourceMappings(); + List assignedResource = resourceMappings.getAssignedResources( + CPUSET_RESOURCE_TYPE); + if (assignedResource.size() == 1) { + String cpusetStr = (String) assignedResource.get(0); + String[] processorIdsStr = cpusetStr.split(PROCESSOR_ID_SEPARATOR); + Deque processorIds = new LinkedList<>(); + for (String processorIdStr : processorIdsStr) { + processorIds.add(Integer.parseInt(processorIdStr)); + } + assignedContainers.put(container.getContainerId(), processorIds); + } + } + + public synchronized void releaseContainerCpuset(ContainerId containerId) { + Deque processorIds = assignedContainers.get(containerId); + if (processorIds == null) { + return; + } + + for (Integer processorId : assignedContainers.get(containerId)) { + cpus[processorId]++; + } + LOG.info("release cpuset, containerId={}, processorIds={}, processorNum={}", + containerId, assignedContainers.get(containerId), + assignedContainers.get(containerId).size()); + assignedContainers.remove(containerId); + updateShareOrNoneModeContainers(); + } + + public synchronized void updateContainer(Container container) + throws ResourceHandlerException { + //TODO support updating CpuShareMode + CpuShareMode cpuShareMode = getCpuShareMode(container); + ContainerId containerId = container.getContainerId(); + if (cpuShareMode != CpuShareMode.RESERVED + && cpuShareMode != CpuShareMode.EXCLUSIVE) { + return; + } + if (assignedContainers.containsKey(containerId)) { + int assignedProcessorNum = assignedContainers.get(containerId).size(); + int targetProcessorNum = container.getResource().getVirtualCores() + / VCORE_RATIO; + if (targetProcessorNum > assignedProcessorNum) { + extendContainerCpuset(container); + } else if (targetProcessorNum < assignedProcessorNum) { + shrinkContainerCpuset(container); + } + //ignore error when updating cgroups failed + updateContainerCgroups(container); + updateShareOrNoneModeContainers(); + } + } + + private void shrinkContainerCpuset(Container container) + throws ResourceHandlerException { + CpuShareMode cpuShareMode = getCpuShareMode(container); + if (CpuShareMode.NONE == cpuShareMode + || CpuShareMode.SHARE == cpuShareMode) { + return; + } + + ContainerId containerId = container.getContainerId(); + int assignedProcessorNum = assignedContainers.get(containerId).size(); + int targetProcessorNum = container.getResource().getVirtualCores() + / VCORE_RATIO; + if (targetProcessorNum >= assignedProcessorNum) { + return; + } + + Deque originalProcessor = new LinkedList<>(assignedContainers.get(containerId)); + int[] originalCpus = backupCpus(); + for (int i = 0; i < assignedProcessorNum - targetProcessorNum; i++) { + Deque cpuList = assignedContainers.get(containerId); + cpus[cpuList.removeLast()]++; + } + + try { + context.getNMStateStore().storeAssignedResources(container, + CPUSET_RESOURCE_TYPE, + Arrays.asList(getCpusetStr(assignedContainers.get(containerId)))); + } catch (IOException ioe) { + cpus = originalCpus; + assignedContainers.put(containerId, originalProcessor); + LOG.warn("store cpuset result failed", ioe); + throw new ResourceHandlerException(ioe); + } + LOG.info("shrink cpuset, containerId={}, processorIds={}, processorNum={}", + containerId, assignedContainers.get(containerId), + assignedContainers.get(containerId).size()); + } + + + public synchronized void extendContainerCpuset(Container container) + throws ResourceHandlerException { + ContainerId containerId = container.getContainerId(); + CpuShareMode cpuShareMode = getCpuShareMode(container); + if (CpuShareMode.NONE == cpuShareMode + || CpuShareMode.SHARE == cpuShareMode) { + return; + } + + int assignedProcessorNum = assignedContainers.get(containerId).size(); + int targetProcessorNum = container.getResource().getVirtualCores() + / VCORE_RATIO; + if (targetProcessorNum <= assignedProcessorNum) { + return; + } + + Deque allocatedProcessorIds = assignedContainers.get(containerId); + Deque originalProcessorIds = new LinkedList<>(allocatedProcessorIds); + int[] originalCpus = backupCpus(); + if (allocateProcessor(targetProcessorNum - assignedProcessorNum, + allocatedProcessorIds)) { + assignedContainers.put(containerId, allocatedProcessorIds); + try { + context.getNMStateStore().storeAssignedResources(container, + CPUSET_RESOURCE_TYPE, + Arrays.asList(getCpusetStr(allocatedProcessorIds))); + } catch (IOException ioe) { + LOG.warn("store cpuset result failed", ioe); + assignedContainers.put(containerId, originalProcessorIds); + cpus = originalCpus; + throw new ResourceHandlerException(ioe); + } + LOG.info("succeed to extend cpuset containerId={}, processorIds={}, " + + "processorNum={}", containerId, allocatedProcessorIds, + allocatedProcessorIds.size()); + } else { + LOG.info("fail to extend cpuset containerId={}, from {} to {}", + containerId, assignedProcessorNum, targetProcessorNum); + } + } + + private int[] backupCpus() { + int[] backupCpus = new int[cpus.length]; + System.arraycopy(cpus, 0, backupCpus, 0, cpus.length); + return backupCpus; + } + + private void updateContainerCgroups(Container container) + throws ResourceHandlerException { + String cpusetParamValue; + CpuShareMode cpuShareMode = getCpuShareMode(container); + if (CpuShareMode.RESERVED == cpuShareMode + || cpuShareMode.EXCLUSIVE == cpuShareMode) { + cpusetParamValue = getCpusetStr( + assignedContainers.get(container.getContainerId())); + + } else if (CpuShareMode.SHARE == cpuShareMode) { + cpusetParamValue = getShareModeProcessor(); + } else { + cpusetParamValue = getNoneModeProcessor(); + } + cGroupsHandler.updateCGroupParam(CGroupController.CPUSET, + container.getContainerId().toString(), + CGroupsHandler.CGROUP_PARAM_CPUSET_CPUS, cpusetParamValue); + } + + private void updateShareOrNoneModeContainers() { + Map containers = new HashMap<>(); + containers.putAll(context.getContainers()); + + //set cgroups for share/none mode containers + String noneCpusetResult = getNoneModeProcessor(); + String shareCpusetResult = getShareModeProcessor(); + for (Container container : containers.values()) { + CpuShareMode cpuShareMode = getCpuShareMode(container); + if (cpuShareMode == CpuShareMode.RESERVED + || cpuShareMode.EXCLUSIVE == cpuShareMode) { + continue; + } + + String cpusetParamValue; + if (CpuShareMode.NONE == cpuShareMode) { + cpusetParamValue = noneCpusetResult; + } else { + cpusetParamValue = shareCpusetResult; + } + + try { + cGroupsHandler.updateCGroupParam(CGroupController.CPUSET, + container.getContainerId().toString(), + CGroupsHandler.CGROUP_PARAM_CPUSET_CPUS, cpusetParamValue); + } catch (ResourceHandlerException e) { + LOG.info( + "update cgroup params fail, containerId={}, cpusetParamsValue={}", + container.getContainerId(), cpusetParamValue); + } + } + } + + private CpuShareMode getCpuShareMode(Container containerId) { + Container container = context.getContainers().get(containerId); + CpuShareMode cpusetMode = CpuShareMode.NONE; + if (container.getContainerTokenIdentifier().getExecutionType() + == ExecutionType.GUARANTEED) { + if (container.getResource().getVirtualCores() % VCORE_RATIO == 0) { + cpusetMode = CpuShareMode.RESERVED; + } else { + cpusetMode = CpuShareMode.SHARE; + } + } + + String cpusetModeStr = container.getLaunchContext().getEnvironment().get( + CPUSET_MODE_ENV); + if (!StringUtils.isEmpty(cpusetModeStr)) { + cpusetMode = CpuShareMode.valueOf(cpusetModeStr); + } + return cpusetMode; + } + + private String getCpusetStr(Iterable allocatedProcessorIds) { + return Joiner.on(PROCESSOR_ID_SEPARATOR).join(allocatedProcessorIds); + } + + private String getNoneModeProcessor() { + Set noneProcessorIds = Sets.newTreeSet(); + Map containers = new HashMap<>(); + containers.putAll(context.getContainers()); + + //get processors for share/none mode + for (Map.Entry> cpusEntry : assignedContainers + .entrySet()) { + ContainerId containerId = cpusEntry.getKey(); + if (!containers.containsKey(containerId)) { + // skip newly release container + continue; + } + CpuShareMode cpusetMode = getCpuShareMode(containers.get(containerId)); + if (cpusetMode == CpuShareMode.RESERVED) { + noneProcessorIds.addAll(cpusEntry.getValue()); + } + } + for (int i = 0; i < cpus.length; i++) { + if (cpus[i] > 0) { + noneProcessorIds.add(i); + } + } + return getCpusetStr(noneProcessorIds); + } + + private String getShareModeProcessor() { + Set shareProcessorIds = Sets.newTreeSet(); + for (int i = 0; i < cpus.length; i++) { + if (cpus[i] > 0) { + shareProcessorIds.add(i); + } + } + return getCpusetStr(shareProcessorIds); + } + + private boolean allocateProcessor(int needProcessorNum, + Deque allocatedProcessorIds) { + if (needProcessorNum <= 0) { + return true; + } + int bestSocketId = cpuSocketNum - 1; + if (allocatedProcessorIds.size() > 0) { + bestSocketId = (allocatedProcessorIds.getFirst() % physicalCoreNum) + / physicalCorePerSocket; + int needMore = tryAllocateOneSocketWithFreePhysicalCore(bestSocketId, + needProcessorNum, allocatedProcessorIds); + if (needMore == 0) { + return true; + } + } else { + int minNeedMore = Integer.MAX_VALUE; + //try to allocate all logical processors of a physical core in a socket + for (int cpuSocketId = cpuSocketNum - 1; cpuSocketId >= 0; + cpuSocketId--) { + int needMore = tryAllocateOneSocketWithFreePhysicalCore(cpuSocketId, + needProcessorNum, allocatedProcessorIds); + if (needMore == 0) { + return true; + } + // find most free socket + if (needMore < minNeedMore) { + minNeedMore = needMore; + bestSocketId = cpuSocketId; + } + } + } + + //try to allocate a part of logical processor physical core in a socket + int needMore = allocateOneSocketWithFreeLogicalCore(bestSocketId, + needProcessorNum, allocatedProcessorIds); + if (needMore == 0) { + return true; + } + + //allocate cpuset in multiple socket + for (int cpuSocketId = cpuSocketNum - 1; + cpuSocketId >= 0 && cpuSocketId != bestSocketId; cpuSocketId--) { + needMore = allocateOneSocketWithFreeLogicalCore(cpuSocketId, needMore, + allocatedProcessorIds); + if (needMore == 0) { + return true; + } + } + return false; + } + + private int allocateOneSocketWithFreeLogicalCore(int cpuSocketId, + int processorNum, Deque allocatedProcessorIds) { + int needMore = processorNum; + int maxPhysicalCoreId = (cpuSocketId + 1) * physicalCorePerSocket - 1; + int minPhysicalCoreId = maxPhysicalCoreId - physicalCorePerSocket; + for (int physicalCoreId = maxPhysicalCoreId; + physicalCoreId >= minPhysicalCoreId && needMore > 0; + physicalCoreId--) { + for (int cpuHTId = cpuHTNum - 1; cpuHTId >= 0; cpuHTId--) { + int processorId = physicalCoreId + physicalCoreNum * cpuHTId; + if (cpus[processorId] > 0) { + allocatedProcessorIds.add(processorId); + cpus[processorId]--; + if (--needMore <= 0) { + break; + } + } + } + } + return needMore; + } + + private int tryAllocateOneSocketWithFreePhysicalCore(int cpuSocketId, + int processorNum, Deque allocatedProcessIds) { + Deque tmpProcessorIds = new LinkedList<>(); + int needMore = processorNum; + int maxPhysicalCoreId = (cpuSocketId + 1) * physicalCorePerSocket - 1; + int minPhysicalCoreId = maxPhysicalCoreId - physicalCorePerSocket; + for (int physicalCoreId = maxPhysicalCoreId; + physicalCoreId >= minPhysicalCoreId && needMore > 0; + physicalCoreId--) { + //check all processors in one physical core are free + boolean logicalProcessorAllFree = true; + for (int cpuHTId = cpuHTNum - 1; cpuHTId >= 0; cpuHTId--) { + int processorId = physicalCoreId + physicalCoreNum * cpuHTId; + if (cpus[processorId] <= 0) { + logicalProcessorAllFree = false; + break; + } + } + + if (!logicalProcessorAllFree) { + continue; + } + + for (int cpuHTId = cpuHTNum - 1; cpuHTId >= 0; cpuHTId--) { + int processorId = physicalCoreId + physicalCoreNum * cpuHTId; + tmpProcessorIds.add(processorId); + if (--needMore <= 0) { + break; + } + } + } + + //this socket has enough free physical cores + if (0 >= needMore) { + for (Integer processorId : tmpProcessorIds) { + allocatedProcessIds.add(processorId); + cpus[processorId]--; + } + } + return needMore; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CpusetResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CpusetResourceHandler.java new file mode 100644 index 0000000..d3907f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/cpuset/CpusetResourceHandler.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.cpuset; + + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux + .resources.ResourceHandler; + +public interface CpusetResourceHandler extends ResourceHandler { +}