diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 71a71346321..e1af74cacfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1276,6 +1276,31 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT = NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit"; + /** + * Prefix for disk configurations. Work in progress: This configuration + * parameter may be changed/removed in the future. + */ + @Private + public static final String NM_GPU_RESOURCE_PREFIX = NM_PREFIX + + "resource.gpu."; + /** + * This setting controls if resource handling for GPU operations is enabled. + */ + @Private + public static final String NM_GPU_RESOURCE_ENABLED = + NM_GPU_RESOURCE_PREFIX + "enabled"; + + @Private + public static final String NM_GPU_ALLOWED_DEVICES = + NM_GPU_RESOURCE_PREFIX + "allowed-gpu-devices"; + + /** + * Disk as a resource is disabled by default. + **/ + @Private + public static final boolean DEFAULT_NM_GPU_RESOURCE_ENABLED = false; + + /** NM Webapp address.**/ public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final int DEFAULT_NM_WEBAPP_PORT = 8042; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 000e8929db6..90b2a7d9a47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3271,4 +3271,20 @@ false + + + Enable GPU on this node manager or not + + yarn.nodemanager.resource.gpu.enabled + false + + + + + Specify GPU devices which can be managed by YARN + + yarn.nodemanager.resource.gpu.allowed-gpu-devices + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index a50a769b0eb..8cf607210c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -29,7 +29,7 @@ ${project.parent.parent.basedir} - ../etc/hadoop + /etc/hadoop/conf diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 3c0e4984efa..a653cad0e6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.GpuResourceAllocator; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; @@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -403,6 +405,13 @@ protected void serviceInit(Configuration conf) throws Exception { addService(nodeStatusUpdater); ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + // Initialize local allocators + if (conf.getBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, + YarnConfiguration.DEFAULT_NM_GPU_RESOURCE_ENABLED)) { + LocalResourceAllocators.setGpuResourceAllocator( + new GpuResourceAllocator(context)); + } + super.serviceInit(conf); // TODO add local dirs to del } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index bd3f06d1fcb..1370a02abcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -94,4 +94,6 @@ void sendKillEvent(int exitStatus, String description); boolean isRecovering(); + + ResourceMappings getResourceMappings(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c0aa6b0d63f..7dd66d91668 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -185,6 +185,7 @@ private ReInitializationContext createContextForRollback() { private boolean recoveredAsKilled = false; private Context context; private ResourceSet resourceSet; + private ResourceMappings resourceMappings; public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, @@ -233,6 +234,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, stateMachine = stateMachineFactory.make(this); this.context = context; this.resourceSet = new ResourceSet(); + this.resourceMappings = new ResourceMappings(); } private static ContainerRetryContext configureRetryContext( @@ -280,6 +282,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); + this.resourceMappings = rcs.getResourceMappings(); } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -1775,4 +1778,9 @@ public boolean isRecovering() { getContainerState() == ContainerState.NEW); return isRecovering; } + + @Override + public ResourceMappings getResourceMappings() { + return resourceMappings; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java new file mode 100644 index 00000000000..14ec7302cee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class is added to store assigned resource to a single container by resource + * types. + * + * Assigned resource could be list of String + * + * For example, we can assign container to: + * "numa": ["numa0"] + * "gpu": ["0", "1", "2", "3"] + * "fpga": ["1", "3"] + * + * This will be used for NM restart container recovery + */ +public class ResourceMappings { + /** + * Record resources assigned to a container for a given resource type + */ + public static class AssignedResources implements Serializable { + private List list = Collections.emptyList(); + + public List getAssignedResources() { + return Collections.unmodifiableList(list); + } + + public void updateAssignedResources(List list) { + this.list = new ArrayList<>(list); + } + + @SuppressWarnings("unchecked") + public static AssignedResources fromBytes(byte[] bytes) + throws IOException { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bis); + List stringArray; + try { + stringArray = (List) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + AssignedResources ar = new AssignedResources(); + ar.updateAssignedResources(stringArray); + return ar; + } + + public byte[] toBytes() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(list); + byte[] bytes = bos.toByteArray(); + return bytes; + } + } + + private Map assignedResourcesMap = new HashMap<>(); + + /** + * Get all resource mappings. + * @param resourceType resourceType + * @return map of resource mapping + */ + public List getAssignedResources(String resourceType) { + AssignedResources ar = assignedResourcesMap.get(resourceType); + if (null == ar) { + return Collections.emptyList(); + } + return ar.getAssignedResources(); + } + + public void addAssignedResources(String resourceType, + AssignedResources assigned) { + assignedResourcesMap.put(resourceType, assigned); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java index 8402a16339d..7e3a53a778b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java @@ -51,6 +51,7 @@ TC_READ_STATS("--tc-read-stats"), ADD_PID_TO_CGROUP(""), //no CLI switch supported yet. RUN_DOCKER_CMD("--run-docker"), + GPU("gpu"), LIST_AS_USER(""); //no CLI switch supported yet. private final String option; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceAllocator.java new file mode 100644 index 00000000000..6b41be8a554 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceAllocator.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Allocate GPU resources according to requirements + */ +public class GpuResourceAllocator { + final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class); + private final static String RESOURCE_TYPE = "gpu"; + + private Set allowedGpuDevices = new TreeSet<>(); + private Map usedDevices = new TreeMap<>(); + private Context nmContext; + + public GpuResourceAllocator(Context ctx) { + this.nmContext = ctx; + } + + /** + * Contains allowed and denied devices with minor number. + * Denied devices will be useful for cgroups devices module to do blacklisting + */ + static class GpuAllocation { + private Set allowed = Collections.emptySet(); + private Set denied = Collections.emptySet(); + + GpuAllocation(Set allowed, Set denied) { + if (allowed != null) { + this.allowed = ImmutableSet.copyOf(allowed); + } + if (denied != null) { + this.denied = ImmutableSet.copyOf(denied); + } + } + + public Set getAllowedGPUs() { + return allowed; + } + + public Set getDeniedGPUs() { + return denied; + } + } + + /** + * Add GPU to allowed list + * @param minorNumber minor number of the GPU device. + */ + public synchronized void addGpu(int minorNumber) { + allowedGpuDevices.add(minorNumber); + } + + private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices, + ContainerId containerId) { + return "Failed to find enough GPUs, requestor=" + containerId + + ", #RequestedGPUs=" + numRequestedGpuDevices + ", #availableGpus=" + + getAvailableGpus(); + } + + @VisibleForTesting + public synchronized int getAvailableGpus() { + return allowedGpuDevices.size() - usedDevices.size(); + } + + public synchronized void recoverAssignedGpus(ContainerId containerId) + throws ResourceHandlerException { + Container c = nmContext.getContainers().get(containerId); + if (null == c) { + throw new ResourceHandlerException( + "This shouldn't happen, cannot find container with id=" + + containerId); + } + + for (String deviceId : c.getResourceMappings().getAssignedResources( + RESOURCE_TYPE)){ + int devId = Integer.parseInt(deviceId); + + // Make sure it is in allowed GPU device. + if (!allowedGpuDevices.contains(devId)) { + throw new ResourceHandlerException("Try to recover device id = " + devId + + " however it is not in allowed device list:" + StringUtils + .join(",", allowedGpuDevices)); + } + + // Make sure it is not occupied by anybody else + if (usedDevices.containsKey(devId)) { + throw new ResourceHandlerException("Try to recover device id = " + devId + + " however it is already assigned to container=" + usedDevices + .get(devId) + ", please double check what happened."); + } + + usedDevices.put(devId, containerId); + } + } + + /** + * Assign GPU to requestor + * @param numRequestedGpuDevices How many GPU to request + * @param containerId who is requesting the resources + * @return List of denied Gpus with minor numbers + * @throws ResourceHandlerException When failed to + */ + public synchronized GpuAllocation assignGpus(int numRequestedGpuDevices, + ContainerId containerId) throws ResourceHandlerException { + // Assign Gpus to container if requested some. + if (numRequestedGpuDevices > 0) { + if (numRequestedGpuDevices > getAvailableGpus()) { + throw new ResourceHandlerException( + getResourceHandlerExceptionMessage(numRequestedGpuDevices, + containerId)); + } + + Set assignedGpus = new HashSet<>(); + + for (int deviceNum : allowedGpuDevices) { + if (!usedDevices.containsKey(deviceNum)) { + usedDevices.put(deviceNum, containerId); + assignedGpus.add(deviceNum); + if (assignedGpus.size() == numRequestedGpuDevices) { + break; + } + } + } + + // Record in state store if we allocated anything + if (!assignedGpus.isEmpty()) { + List allocatedDevices = new ArrayList<>(); + for (int gpu : assignedGpus) { + allocatedDevices.add(String.valueOf(gpu)); + } + try { + nmContext.getNMStateStore().storeAssignedResources(containerId, + RESOURCE_TYPE, allocatedDevices); + } catch (IOException e) { + throw new ResourceHandlerException(e); + } + } + + return new GpuAllocation(assignedGpus, + Sets.difference(allowedGpuDevices, assignedGpus)); + } + return new GpuAllocation(null, allowedGpuDevices); + } + + /** + * Clean up all Gpus assigned to containerId + * @param containerId containerId + */ + public synchronized void cleanupAssignGpus(ContainerId containerId) { + Iterator> iter = + usedDevices.entrySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getValue().equals(containerId)) { + iter.remove(); + } + } + } + + @VisibleForTesting + public synchronized Map getDeviceAllocationMapping() { + return new HashMap<>(usedDevices); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceHandlerImpl.java new file mode 100644 index 00000000000..471cad825bb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceHandlerImpl.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class GpuResourceHandlerImpl implements ResourceHandler { + final static Log LOG = LogFactory + .getLog(GpuResourceHandlerImpl.class); + + private final String REQUEST_GPU_NUM_ENV_KEY = "REQUESTED_GPU_NUM"; + + // This will be used by container-executor to add necessary clis + public static final String EXCLUDED_GPUS_CLI_OPTION = "--excluded_gpus"; + public static final String CONTAINER_ID_CLI_OPTION = "--container_id"; + + private GpuResourceAllocator gpuAllocator; + private CGroupsHandler cGroupsHandler; + private PrivilegedOperationExecutor privilegedOperationExecutor; + + GpuResourceHandlerImpl(CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperationExecutor, + GpuResourceAllocator gpuResourceAllocator) { + this.cGroupsHandler = cGroupsHandler; + this.privilegedOperationExecutor = privilegedOperationExecutor; + gpuAllocator = gpuResourceAllocator; + } + + @Override + public List bootstrap(Configuration configuration) + throws ResourceHandlerException { + String allowedDevicesStr = configuration.get( + YarnConfiguration.NM_GPU_ALLOWED_DEVICES); + + if (null != allowedDevicesStr) { + for (String s : allowedDevicesStr.split(",")) { + if (s.trim().length() > 0) { + Integer minorNum = Integer.valueOf(s.trim()); + gpuAllocator.addGpu(minorNum); + } + } + } + LOG.info("Allowed GPU devices with minor numbers:" + allowedDevicesStr); + + // And initialize cgroups + this.cGroupsHandler.initializeCGroupController( + CGroupsHandler.CGroupController.DEVICES); + + return null; + } + + private int getRequestedGpu(Container container) { + // TODO, use YARN-3926 after it merged + ContainerLaunchContext clc = container.getLaunchContext(); + Map envs = clc.getEnvironment(); + if (null != envs.get(REQUEST_GPU_NUM_ENV_KEY)) { + return Integer.parseInt(envs.get(REQUEST_GPU_NUM_ENV_KEY)); + } + return 0; + } + + @Override + public synchronized List preStart(Container container) + throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + + int requestedGpu = getRequestedGpu(container); + + // Assign Gpus to container if requested some. + GpuResourceAllocator.GpuAllocation allocation = gpuAllocator.assignGpus( + requestedGpu, container.getContainerId()); + + // Create device cgroups for the container + cGroupsHandler.createCGroup(CGroupsHandler.CGroupController.DEVICES, + containerIdStr); + try { + // Execute c-e to setup GPU isolation before launch the container + PrivilegedOperation privilegedOperation = new PrivilegedOperation( + PrivilegedOperation.OperationType.GPU, Arrays + .asList(CONTAINER_ID_CLI_OPTION, containerIdStr)); + if (!allocation.getDeniedGPUs().isEmpty()) { + privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_GPUS_CLI_OPTION, + StringUtils.join(",", allocation.getDeniedGPUs()))); + } + + privilegedOperationExecutor.executePrivilegedOperation( + privilegedOperation, true); + } catch (PrivilegedOperationException e) { + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, + containerIdStr); + LOG.warn("Could not update cgroup for container", e); + throw new ResourceHandlerException(e); + } + + List ret = new ArrayList<>(); + ret.add(new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + + cGroupsHandler.getPathForCGroupTasks( + CGroupsHandler.CGroupController.DEVICES, containerIdStr))); + + return ret; + } + + @VisibleForTesting + public GpuResourceAllocator getGpuAllocator() { + return gpuAllocator; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + gpuAllocator.recoverAssignedGpus(containerId); + return null; + } + + @Override + public synchronized List postComplete( + ContainerId containerId) throws ResourceHandlerException { + gpuAllocator.cleanupAssignGpus(containerId); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, + containerId.toString()); + return null; + } + + @Override + public List teardown() throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java index 4d137f0e1d6..d4cab2eb4f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators; import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; @@ -94,6 +95,19 @@ public static CGroupsHandler getCGroupsHandler() { return cGroupsHandler; } + private static GpuResourceHandlerImpl getGpuResourceHandler( + Configuration conf) throws ResourceHandlerException { + boolean cgroupsGpuEnabled = conf.getBoolean( + YarnConfiguration.NM_GPU_RESOURCE_ENABLED, + YarnConfiguration.DEFAULT_NM_GPU_RESOURCE_ENABLED); + if (cgroupsGpuEnabled) { + return new GpuResourceHandlerImpl(getInitializedCGroupsHandler(conf), + PrivilegedOperationExecutor.getInstance(conf), + LocalResourceAllocators.getGpuResourceAllocator()); + } + return null; + } + private static CGroupsCpuResourceHandlerImpl getCGroupsCpuResourceHandler( Configuration conf) throws ResourceHandlerException { boolean cgroupsCpuEnabled = @@ -212,6 +226,7 @@ private static void initializeConfiguredResourceHandlerChain( addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf)); + addHandlerIfNotNull(handlerList, getGpuResourceHandler(conf)); resourceHandlerChain = new ResourceHandlerChain(handlerList); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java index e058d6ef14d..3d8acfd83e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.GpuResourceHandlerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerInspectCommand; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index c556b39d165..88d782d0862 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; @@ -145,6 +147,9 @@ private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; + private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX = + "/assignedResources_"; + private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; @@ -285,6 +290,13 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { rcs.setLogDir(asString(entry.getValue())); + } else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) { + String resourceType = suffix.substring( + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length()); + ResourceMappings.AssignedResources assignedResources = + ResourceMappings.AssignedResources.fromBytes(entry.getValue()); + rcs.getResourceMappings().addAssignedResources(resourceType, + assignedResources); } else { LOG.warn("the container " + containerId + " will be killed because of the unknown key " + key @@ -1084,6 +1096,33 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { } } + @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("storeAssignedResources: containerId=" + containerId + + ", assignedResources=" + StringUtils.join(",", assignedResources)); + } + + String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; + try { + WriteBatch batch = db.createWriteBatch(); + try { + ResourceMappings.AssignedResources res = new ResourceMappings.AssignedResources(); + res.updateAssignedResources(assignedResources); + + // New value will overwrite old values for the same key + batch.put(bytes(keyResChng), res.toBytes()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + @SuppressWarnings("deprecation") private void cleanupDeprecatedFinishedApps() { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 96c3f9e5c6f..09df3e50009 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -258,6 +258,11 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) } @Override + public void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) throws IOException { + } + + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 9f87279fb25..83c49a7bf22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @Private @Unstable @@ -87,6 +88,7 @@ public NMStateStoreService(String name) { int version; private RecoveredContainerType recoveryType = RecoveredContainerType.RECOVER; + ResourceMappings resMappings = new ResourceMappings(); public RecoveredContainerStatus getStatus() { return status; @@ -162,6 +164,10 @@ public RecoveredContainerType getRecoveryType() { public void setRecoveryType(RecoveredContainerType recoveryType) { this.recoveryType = recoveryType; } + + public ResourceMappings getResourceMappings() { + return resMappings; + } } public static class LocalResourceTrackerState { @@ -635,7 +641,7 @@ public abstract void storeLogDeleter(ApplicationId appId, */ public abstract void removeLogDeleter(ApplicationId appId) throws IOException; - + /** * Load the state of AMRMProxy. * @return recovered state of AMRMProxy @@ -688,6 +694,9 @@ public abstract void removeAMRMProxyAppContextEntry( public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt) throws IOException; + public abstract void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) throws IOException; + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java new file mode 100644 index 00000000000..3da3515a9d5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.GpuResourceAllocator; + +/** + * Manages local resource allocators such as GPU resource allocator. + */ +public class LocalResourceAllocators { + private static GpuResourceAllocator gpuResourceAllocator; + + public static GpuResourceAllocator getGpuResourceAllocator() { + return gpuResourceAllocator; + } + + public static void setGpuResourceAllocator(GpuResourceAllocator allocator) { + gpuResourceAllocator = allocator; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index d2bd79ced22..f54a77056c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -108,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -393,23 +394,8 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure() cm.stop(); } - @Test - public void testContainerResizeRecovery() throws Exception { - conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); - conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); - NMStateStoreService stateStore = new NMMemoryStateStoreService(); - stateStore.init(conf); - stateStore.start(); - Context context = createContext(conf, stateStore); - ContainerManagerImpl cm = createContainerManager(context, delSrvc); - cm.dispatcher.disableExitOnDispatchException(); - cm.init(conf); - cm.start(); - // add an application by starting a container - ApplicationId appId = ApplicationId.newInstance(0, 1); - ApplicationAttemptId attemptId = - ApplicationAttemptId.newInstance(appId, 1); - ContainerId cid = ContainerId.newContainerId(attemptId, 1); + private void commonLaunchContainer(ApplicationId appId, ContainerId cid, + ContainerManagerImpl cm) throws Exception { Map containerEnv = new HashMap<>(); setFlowContext(containerEnv, "app_name1", appId); Map serviceData = Collections.emptyMap(); @@ -453,12 +439,35 @@ public void testContainerResizeRecovery() throws Exception { context, cm, cid, clc, null); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); - Application app = context.getApplications().get(appId); - assertNotNull(app); // make sure the container reaches RUNNING state waitForNMContainerState(cm, cid, org.apache.hadoop.yarn.server.nodemanager .containermanager.container.ContainerState.RUNNING); + + } + + @Test + public void testContainerResizeRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + + commonLaunchContainer(appId, cid, cm); + + Application app = context.getApplications().get(appId); + assertNotNull(app); + Resource targetResource = Resource.newInstance(2048, 2); ContainerUpdateResponse updateResponse = updateContainers(context, cm, cid, targetResource); @@ -481,6 +490,52 @@ public void testContainerResizeRecovery() throws Exception { } @Test + public void testResourceMappingRecoveryForContainer() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + + commonLaunchContainer(appId, cid, cm); + + Application app = context.getApplications().get(appId); + assertNotNull(app); + + // store resource mapping of the container + stateStore.storeAssignedResources(cid, "gpu", + Arrays.asList("1", "2", "3")); + + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + + Container nmContainer = context.getContainers().get(cid); + Assert.assertNotNull(nmContainer); + List assignedResource = + nmContainer.getResourceMappings().getAssignedResources("gpu"); + Assert.assertNotNull(assignedResource); + Assert.assertEquals(3, assignedResource.size()); + Assert.assertTrue( + assignedResource.containsAll(Arrays.asList("1", "2", "3"))); + } + + @Test public void testContainerCleanupOnShutdown() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationAttemptId attemptId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestGpuResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestGpuResourceHandler.java new file mode 100644 index 00000000000..97367d54bea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestGpuResourceHandler.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.Matchers.anyList; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestGpuResourceHandler { + private CGroupsHandler mockCGroupsHandler; + private PrivilegedOperationExecutor mockPrivilegedExecutor; + private GpuResourceHandlerImpl gpuResourceHandler; + private NMStateStoreService mockNMStateStore; + private ConcurrentHashMap runningContainersMap; + + @Before + public void setup() { + mockCGroupsHandler = mock(CGroupsHandler.class); + mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class); + mockNMStateStore = mock(NMStateStoreService.class); + + Context nmctx = mock(Context.class); + when(nmctx.getNMStateStore()).thenReturn(mockNMStateStore); + runningContainersMap = new ConcurrentHashMap<>(); + when(nmctx.getContainers()).thenReturn(runningContainersMap); + GpuResourceAllocator gpuResourceAllocator = new GpuResourceAllocator(nmctx); + + gpuResourceHandler = new GpuResourceHandlerImpl(mockCGroupsHandler, + mockPrivilegedExecutor, gpuResourceAllocator); + } + + @Test + public void testBootStrap() throws ResourceHandlerException { + Configuration conf = new YarnConfiguration(); + + gpuResourceHandler.bootstrap(conf); + verify(mockCGroupsHandler, times(1)).initializeCGroupController( + CGroupsHandler.CGroupController.DEVICES); + } + + private static ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), id); + } + + private static Container mockContainerWithGpuRequest(int id, int numGpuRequest) { + Container c = mock(Container.class); + when(c.getContainerId()).thenReturn(getContainerId(id)); + ContainerLaunchContext clc = mock(ContainerLaunchContext.class); + Map envs = new HashMap<>(); + when(clc.getEnvironment()).thenReturn(envs); + envs.put("REQUESTED_GPU_NUM", String.valueOf(numGpuRequest)); + when(c.getLaunchContext()).thenReturn(clc); + return c; + } + + private void verifyDeniedDevices(ContainerId containerId, List deniedDevices) + throws ResourceHandlerException, PrivilegedOperationException { + verify(mockCGroupsHandler, times(1)).createCGroup( + CGroupsHandler.CGroupController.DEVICES, containerId.toString()); + + if (null != deniedDevices && !deniedDevices.isEmpty()) { + verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation( + new PrivilegedOperation(PrivilegedOperation.OperationType.GPU, Arrays + .asList(GpuResourceHandlerImpl.CONTAINER_ID_CLI_OPTION, + containerId.toString(), + GpuResourceHandlerImpl.EXCLUDED_GPUS_CLI_OPTION, + StringUtils.join(",", deniedDevices))), true); + } + } + + @Test + public void testAllocation() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); + conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true); + + gpuResourceHandler.bootstrap(conf); + Assert.assertEquals(4, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + + /* Start container 1, asks 3 containers */ + gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 3)); + + // Only device=4 will be blocked. + verifyDeniedDevices(getContainerId(1), Arrays.asList(4)); + + /* Start container 2, asks 2 containers. Excepted to fail */ + boolean failedToAllocate = false; + try { + gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 2)); + } catch (ResourceHandlerException e) { + failedToAllocate = true; + } + Assert.assertTrue(failedToAllocate); + + /* Start container 3, ask 1 container, succeeded */ + gpuResourceHandler.preStart(mockContainerWithGpuRequest(3, 1)); + + // devices = 0/1/3 will be blocked + verifyDeniedDevices(getContainerId(3), Arrays.asList(0, 1, 3)); + + /* Start container 4, ask 0 container, succeeded */ + gpuResourceHandler.preStart(mockContainerWithGpuRequest(4, 0)); + + // All devices will be blocked + verifyDeniedDevices(getContainerId(4), Arrays.asList(0, 1, 3, 4)); + + /* Release container-1, expect cgroups deleted */ + gpuResourceHandler.postComplete(getContainerId(1)); + + verify(mockCGroupsHandler, times(1)).createCGroup( + CGroupsHandler.CGroupController.DEVICES, getContainerId(1).toString()); + Assert.assertEquals(3, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + + /* Release container-3, expect cgroups deleted */ + gpuResourceHandler.postComplete(getContainerId(3)); + + verify(mockCGroupsHandler, times(1)).createCGroup( + CGroupsHandler.CGroupController.DEVICES, getContainerId(3).toString()); + Assert.assertEquals(4, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + } + + @Test + public void testAllocationWithoutAllowedGpus() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, ""); + conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true); + + gpuResourceHandler.bootstrap(conf); + Assert.assertEquals(0, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + + /* Start container 1, asks 0 containers */ + gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 0)); + verifyDeniedDevices(getContainerId(1), Collections.emptyList()); + + /* Start container 2, asks 1 containers. Excepted to fail */ + boolean failedToAllocate = false; + try { + gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 1)); + } catch (ResourceHandlerException e) { + failedToAllocate = true; + } + Assert.assertTrue(failedToAllocate); + + /* Release container 1, expect cgroups deleted */ + gpuResourceHandler.postComplete(getContainerId(1)); + + verify(mockCGroupsHandler, times(1)).createCGroup( + CGroupsHandler.CGroupController.DEVICES, getContainerId(1).toString()); + Assert.assertEquals(0, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + } + + @Test + public void testAllocationStored() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); + conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true); + + gpuResourceHandler.bootstrap(conf); + Assert.assertEquals(4, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + + /* Start container 1, asks 3 containers */ + gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 3)); + + verify(mockNMStateStore).storeAssignedResources(getContainerId(1), "gpu", + Arrays.asList("0", "1", "3")); + + // Only device=4 will be blocked. + verifyDeniedDevices(getContainerId(1), Arrays.asList(4)); + + /* Start container 2, ask 0 container, succeeded */ + gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 0)); + + verifyDeniedDevices(getContainerId(2), Arrays.asList(0, 1, 3, 4)); + + // Store assigned resource will not be invoked. + verify(mockNMStateStore, never()).storeAssignedResources( + eq(getContainerId(2)), eq("gpu"), anyList()); + } + + @Test + public void testRecoverResourceAllocation() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); + conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true); + + gpuResourceHandler.bootstrap(conf); + Assert.assertEquals(4, + gpuResourceHandler.getGpuAllocator().getAvailableGpus()); + + Container nmContainer = mock(Container.class); + ResourceMappings rmap = new ResourceMappings(); + ResourceMappings.AssignedResources ar = + new ResourceMappings.AssignedResources(); + ar.updateAssignedResources(Arrays.asList("1", "3")); + rmap.addAssignedResources("gpu", ar); + when(nmContainer.getResourceMappings()).thenReturn(rmap); + + runningContainersMap.put(getContainerId(1), nmContainer); + + // TEST CASE + // Reacquire container restore state of GPU Resource Allocator. + gpuResourceHandler.reacquireContainer(getContainerId(1)); + + Map deviceAllocationMapping = + gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); + Assert.assertEquals(2, deviceAllocationMapping.size()); + Assert.assertTrue( + deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3))); + Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1)); + + // TEST CASE + // Try to reacquire a container but requested device is not in allowed list. + nmContainer = mock(Container.class); + rmap = new ResourceMappings(); + ar = new ResourceMappings.AssignedResources(); + // id=5 is not in allowed list. + ar.updateAssignedResources(Arrays.asList("4", "5")); + rmap.addAssignedResources("gpu", ar); + when(nmContainer.getResourceMappings()).thenReturn(rmap); + + runningContainersMap.put(getContainerId(2), nmContainer); + + boolean caughtException = false; + try { + gpuResourceHandler.reacquireContainer(getContainerId(1)); + } catch (ResourceHandlerException e) { + caughtException = true; + } + Assert.assertTrue( + "Should fail since requested device Id is not in allowed list", + caughtException); + + // Make sure internal state not changed. + deviceAllocationMapping = + gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); + Assert.assertEquals(2, deviceAllocationMapping.size()); + Assert.assertTrue( + deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3))); + Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1)); + + // TEST CASE + // Try to reacquire a container but requested device is already assigned. + nmContainer = mock(Container.class); + rmap = new ResourceMappings(); + ar = new ResourceMappings.AssignedResources(); + // id=3 is already assigned + ar.updateAssignedResources(Arrays.asList("4", "3")); + rmap.addAssignedResources("gpu", ar); + when(nmContainer.getResourceMappings()).thenReturn(rmap); + + runningContainersMap.put(getContainerId(2), nmContainer); + + caughtException = false; + try { + gpuResourceHandler.reacquireContainer(getContainerId(1)); + } catch (ResourceHandlerException e) { + caughtException = true; + } + Assert.assertTrue( + "Should fail since requested device Id is not in allowed list", + caughtException); + + // Make sure internal state not changed. + deviceAllocationMapping = + gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); + Assert.assertEquals(2, deviceAllocationMapping.size()); + Assert.assertTrue( + deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3))); + Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 0e039947c49..55c7757eb7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; public class NMMemoryStateStoreService extends NMStateStoreService { private Map apps; @@ -119,6 +120,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); + rcsCopy.resMappings = rcs.getResourceMappings(); result.add(rcsCopy); } return result; @@ -478,6 +480,15 @@ public synchronized void removeAMRMProxyAppContext( amrmProxyState.getAppContexts().remove(attempt); } + public void storeAssignedResources(ContainerId containerId, + String resourceType, List assignedResources) throws IOException { + ResourceMappings.AssignedResources ar = + new ResourceMappings.AssignedResources(); + ar.updateAssignedResources(assignedResources); + containerStates.get(containerId).getResourceMappings().addAssignedResources( + resourceType, ar); + } + private static class TrackerState { Map inProgressMap = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 01331560e6c..ef07972a833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; +import com.sun.tools.javadoc.Start; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -952,18 +953,8 @@ protected DB openDatabase(Configuration conf) { store.close(); } - @Test - public void testUnexpectedKeyDoesntThrowException() throws IOException { - // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); - assertTrue(recoveredContainers.isEmpty()); - + private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException { // create a container request - ApplicationId appId = ApplicationId.newInstance(1234, 3); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, - 4); - ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); LocalResource lrsrc = LocalResource.newInstance( URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L, @@ -997,8 +988,23 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { "tokenservice"); StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, containerToken); - stateStore.storeContainer(containerId, 0, containerReq); + return containerReq; + } + + @Test + public void testUnexpectedKeyDoesntThrowException() throws IOException { + // test empty when no state + List recoveredContainers = stateStore + .loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + + StartContainerRequest startContainerRequest = storeMockContainer(containerId); // add a invalid key byte[] invalidKey = ("ContainerManager/containers/" @@ -1011,7 +1017,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(startContainerRequest, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType()); // assert unknown keys are cleaned up finally @@ -1119,6 +1125,43 @@ public void testAMRMProxyStorage() throws IOException { } } + public void testStateStoreForResourceMapping() throws IOException { + // test empty when no state + List recoveredContainers = stateStore + .loadContainersState(); + assertTrue(recoveredContainers.isEmpty()); + + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + storeMockContainer(containerId); + + // Store ResourceMapping + stateStore.storeAssignedResources(containerId, "gpu", + Arrays.asList("1", "2", "3")); + // This will overwrite above + stateStore.storeAssignedResources(containerId, "gpu", + Arrays.asList("1", "2", "4")); + stateStore.storeAssignedResources(containerId, "fpga", + Arrays.asList("3", "4", "5", "6")); + + // add a invalid key + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + List res = rcs.getResourceMappings().getAssignedResources("gpu"); + Assert.assertNotNull(res); + Assert.assertEquals(3, res.size()); + Assert.assertTrue(res.containsAll(Arrays.asList("1", "2", "4"))); + + res = rcs.getResourceMappings().getAssignedResources("fpga"); + Assert.assertNotNull(res); + Assert.assertEquals(4, res.size()); + Assert.assertTrue(res.containsAll(Arrays.asList("3", "4", "5", "6"))); + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 022baeac492..cc94288595e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -235,4 +236,9 @@ public void sendKillEvent(int exitStatus, String description) { public boolean isRecovering() { return false; } + + @Override + public ResourceMappings getResourceMappings() { + return null; + } }