diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 71a71346321..e1af74cacfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1276,6 +1276,31 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";
+ /**
+ * Prefix for disk configurations. Work in progress: This configuration
+ * parameter may be changed/removed in the future.
+ */
+ @Private
+ public static final String NM_GPU_RESOURCE_PREFIX = NM_PREFIX
+ + "resource.gpu.";
+ /**
+ * This setting controls if resource handling for GPU operations is enabled.
+ */
+ @Private
+ public static final String NM_GPU_RESOURCE_ENABLED =
+ NM_GPU_RESOURCE_PREFIX + "enabled";
+
+ @Private
+ public static final String NM_GPU_ALLOWED_DEVICES =
+ NM_GPU_RESOURCE_PREFIX + "allowed-gpu-devices";
+
+ /**
+ * Disk as a resource is disabled by default.
+ **/
+ @Private
+ public static final boolean DEFAULT_NM_GPU_RESOURCE_ENABLED = false;
+
+
/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index a50a769b0eb..8cf607210c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -29,7 +29,7 @@
${project.parent.parent.basedir}
- ../etc/hadoop
+ /etc/hadoop/conf
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 3c0e4984efa..a653cad0e6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.GpuResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@@ -70,6 +71,7 @@
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@@ -403,6 +405,13 @@ protected void serviceInit(Configuration conf) throws Exception {
addService(nodeStatusUpdater);
((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
+ // Initialize local allocators
+ if (conf.getBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED,
+ YarnConfiguration.DEFAULT_NM_GPU_RESOURCE_ENABLED)) {
+ LocalResourceAllocators.setGpuResourceAllocator(
+ new GpuResourceAllocator(context));
+ }
+
super.serviceInit(conf);
// TODO add local dirs to del
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index bd3f06d1fcb..1370a02abcd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -94,4 +94,6 @@
void sendKillEvent(int exitStatus, String description);
boolean isRecovering();
+
+ ResourceMappings getResourceMappings();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index c0aa6b0d63f..7dd66d91668 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -185,6 +185,7 @@ private ReInitializationContext createContextForRollback() {
private boolean recoveredAsKilled = false;
private Context context;
private ResourceSet resourceSet;
+ private ResourceMappings resourceMappings;
public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds,
@@ -233,6 +234,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
stateMachine = stateMachineFactory.make(this);
this.context = context;
this.resourceSet = new ResourceSet();
+ this.resourceMappings = new ResourceMappings();
}
private static ContainerRetryContext configureRetryContext(
@@ -280,6 +282,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
+ this.resourceMappings = rcs.getResourceMappings();
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
@@ -1775,4 +1778,9 @@ public boolean isRecovering() {
getContainerState() == ContainerState.NEW);
return isRecovering;
}
+
+ @Override
+ public ResourceMappings getResourceMappings() {
+ return resourceMappings;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
new file mode 100644
index 00000000000..d227ae385cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ResourceMappings.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is added to store assigned resource to a single container by resource
+ * types.
+ *
+ * Assigned resource could be list of String
+ *
+ * For example, we can assign container to:
+ * "numa": ["numa0"]
+ * "gpu": ["0", "1", "2", "3"]
+ * "fpga": ["1", "3"]
+ *
+ * This will be used for NM restart container recovery
+ */
+public class ResourceMappings {
+ /**
+ * Record resources assigned to a container for a given resource type
+ */
+ public static class AssignedResources implements Serializable {
+ private List list = Collections.emptyList();
+
+ public List getAssignedResources() {
+ return Collections.unmodifiableList(list);
+ }
+
+ public void updateAssignedResources(List list) {
+ this.list = new ArrayList<>(list);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static AssignedResources fromBytes(byte[] bytes)
+ throws IOException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis);
+ List stringArray;
+ try {
+ stringArray = (List) ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ AssignedResources ar = new AssignedResources();
+ ar.updateAssignedResources(stringArray);
+ return ar;
+ }
+
+ public byte[] toBytes() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(list);
+ byte[] bytes = bos.toByteArray();
+ return bytes;
+ }
+ }
+
+ private Map assignedResourcesMap = new HashMap<>();
+
+ /**
+ * Get all resource mappings.
+ * @return map of resource mapping
+ */
+ public List getAssignedResources(String resourceType) {
+ AssignedResources ar = assignedResourcesMap.get(resourceType);
+ if (null == ar) {
+ return Collections.emptyList();
+ }
+ return ar.getAssignedResources();
+ }
+
+ public void addAssignedResources(String resourceType,
+ AssignedResources assigned) {
+ assignedResourcesMap.put(resourceType, assigned);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
index 8402a16339d..7e3a53a778b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
@@ -51,6 +51,7 @@
TC_READ_STATS("--tc-read-stats"),
ADD_PID_TO_CGROUP(""), //no CLI switch supported yet.
RUN_DOCKER_CMD("--run-docker"),
+ GPU("gpu"),
LIST_AS_USER(""); //no CLI switch supported yet.
private final String option;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceAllocator.java
new file mode 100644
index 00000000000..9a34f08bf47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceAllocator.java
@@ -0,0 +1,179 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Allocate GPU resources according to requirements
+ */
+public class GpuResourceAllocator {
+ final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class);
+ private final static String RESOURCE_TYPE = "gpu";
+
+ private Set allowedGpuDevices = new TreeSet<>();
+ private Map usedDevices = new TreeMap<>();
+ private Context nmContext;
+
+ public GpuResourceAllocator(Context ctx) {
+ this.nmContext = ctx;
+ }
+
+ /**
+ * Contains allowed and denied devices with minor number.
+ * Denied devices will be useful for cgroups devices module to do blacklisting
+ */
+ static class GpuAllocation {
+ private Set allowed = Collections.emptySet();
+ private Set denied = Collections.emptySet();
+
+ GpuAllocation(Set allowed, Set denied) {
+ if (allowed != null) {
+ this.allowed = ImmutableSet.copyOf(allowed);
+ }
+ if (denied != null) {
+ this.denied = ImmutableSet.copyOf(denied);
+ }
+ }
+
+ public Set getAllowedGPUs() {
+ return allowed;
+ }
+
+ public Set getDeniedGPUs() {
+ return denied;
+ }
+ }
+
+ /**
+ * Add GPU to allowed list
+ * @param minorNumber minor number of the GPU device.
+ */
+ public synchronized void addGpu(int minorNumber) {
+ allowedGpuDevices.add(minorNumber);
+ }
+
+ private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices,
+ ContainerId containerId) {
+ return "Failed to find enough GPUs, requestor=" + containerId
+ + ", #RequestedGPUs=" + numRequestedGpuDevices + ", #availableGpus="
+ + getAvailableGpus();
+ }
+
+ @VisibleForTesting
+ public synchronized int getAvailableGpus() {
+ return allowedGpuDevices.size() - usedDevices.size();
+ }
+
+ public synchronized void recoverAssignedGpus(ContainerId containerId)
+ throws ResourceHandlerException {
+ Container c = nmContext.getContainers().get(containerId);
+
+ for (String dev : c.getResourceMappings().getAssignedResources(
+ RESOURCE_TYPE)){
+ int devId = Integer.valueOf(dev);
+
+ // Make sure it is in allowed GPU device.
+ if (!allowedGpuDevices.contains(devId)) {
+ throw new ResourceHandlerException("Try to recover device id = " + devId
+ + " however it is not in allowed device list:" + StringUtils
+ .join(",", allowedGpuDevices));
+ }
+
+ // Make sure it is not occupied by anybody else
+ if (usedDevices.containsKey(devId)) {
+ throw new ResourceHandlerException("Try to recover device id = " + devId
+ + " however it is already assigned to container=" + usedDevices
+ .get(devId) + ", please double check what happened.");
+ }
+
+ usedDevices.put(devId, containerId);
+ }
+ }
+
+ /**
+ * Assign GPU to requestor
+ * @param numRequestedGpuDevices How many GPU to request
+ * @param containerId who is requesting the resources
+ * @return List of denied Gpus with minor numbers
+ * @throws ResourceHandlerException When failed to
+ */
+ public synchronized GpuAllocation assignGpus(int numRequestedGpuDevices,
+ ContainerId containerId) throws ResourceHandlerException {
+ // Assign Gpus to container if requested some.
+ if (numRequestedGpuDevices > 0) {
+ if (numRequestedGpuDevices > getAvailableGpus()) {
+ throw new ResourceHandlerException(
+ getResourceHandlerExceptionMessage(numRequestedGpuDevices,
+ containerId));
+ }
+
+ Set assignedGpus = new HashSet<>();
+
+ for (int deviceNum : allowedGpuDevices) {
+ if (!usedDevices.containsKey(deviceNum)) {
+ usedDevices.put(deviceNum, containerId);
+ assignedGpus.add(deviceNum);
+ if (assignedGpus.size() == numRequestedGpuDevices) {
+ break;
+ }
+ }
+ }
+
+ // Record in state store if we allocated anything
+ if (!assignedGpus.isEmpty()) {
+ List allocatedDevices = new ArrayList<>();
+ for (int gpu : assignedGpus) {
+ allocatedDevices.add(String.valueOf(gpu));
+ }
+ try {
+ nmContext.getNMStateStore().storeAssignedResources(containerId,
+ RESOURCE_TYPE, allocatedDevices);
+ } catch (IOException e) {
+ throw new ResourceHandlerException(e);
+ }
+ }
+
+ return new GpuAllocation(assignedGpus,
+ Sets.difference(allowedGpuDevices, assignedGpus));
+ }
+ return new GpuAllocation(null, allowedGpuDevices);
+ }
+
+ /**
+ * Clean up all Gpus assigned to containerId
+ * @param containerId containerId
+ */
+ public synchronized void cleanupAssignGpus(ContainerId containerId) {
+ Iterator> iter =
+ usedDevices.entrySet().iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getValue().equals(containerId)) {
+ iter.remove();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized Map getDeviceAllocationMapping() {
+ return new HashMap<>(usedDevices);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceHandlerImpl.java
new file mode 100644
index 00000000000..095b0ee8fd0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/GpuResourceHandlerImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class GpuResourceHandlerImpl implements ResourceHandler {
+ final static Log LOG = LogFactory
+ .getLog(GpuResourceHandlerImpl.class);
+
+ private final String REQUEST_GPU_NUM_ENV_KEY = "REQUESTED_GPU_NUM";
+
+ // This will be used by container-executor to add necessary clis
+ public static final String EXCLUDED_GPUS_CLI_OPTION = "--excluded_gpus";
+ public static final String CONTAINER_ID_CLI_OPTION = "--container_id";
+
+ private GpuResourceAllocator gpuAllocator;
+ private CGroupsHandler cGroupsHandler;
+ private PrivilegedOperationExecutor privilegedOperationExecutor;
+
+ GpuResourceHandlerImpl(CGroupsHandler cGroupsHandler,
+ PrivilegedOperationExecutor privilegedOperationExecutor,
+ GpuResourceAllocator gpuResourceAllocator) {
+ this.cGroupsHandler = cGroupsHandler;
+ this.privilegedOperationExecutor = privilegedOperationExecutor;
+ gpuAllocator = gpuResourceAllocator;
+ }
+
+ @Override
+ public List bootstrap(Configuration configuration)
+ throws ResourceHandlerException {
+ String allowedDevicesStr = configuration.get(
+ YarnConfiguration.NM_GPU_ALLOWED_DEVICES);
+
+ if (null != allowedDevicesStr) {
+ for (String s : allowedDevicesStr.split(",")) {
+ if (s.trim().length() > 0) {
+ Integer minorNum = Integer.valueOf(s.trim());
+ gpuAllocator.addGpu(minorNum);
+ }
+ }
+ }
+ LOG.info("Allowed GPU devices with minor numbers:" + allowedDevicesStr);
+
+ // And initialize cgroups
+ this.cGroupsHandler.initializeCGroupController(
+ CGroupsHandler.CGroupController.DEVICES);
+
+ return null;
+ }
+
+ private int getRequestedGpu(Container container) {
+ // TODO, use YARN-3926 after it merged
+ ContainerLaunchContext clc = container.getLaunchContext();
+ Map envs = clc.getEnvironment();
+ if (null != envs.get(REQUEST_GPU_NUM_ENV_KEY)) {
+ return Integer.valueOf(envs.get(REQUEST_GPU_NUM_ENV_KEY));
+ }
+ return 0;
+ }
+
+ @Override
+ public synchronized List preStart(Container container)
+ throws ResourceHandlerException {
+ String containerIdStr = container.getContainerId().toString();
+
+ int requestedGpu = getRequestedGpu(container);
+
+ // Assign Gpus to container if requested some.
+ GpuResourceAllocator.GpuAllocation allocation = gpuAllocator.assignGpus(
+ requestedGpu, container.getContainerId());
+
+ // Create device cgroups for the container
+ cGroupsHandler.createCGroup(CGroupsHandler.CGroupController.DEVICES,
+ containerIdStr);
+ try {
+ // Execute c-e to setup GPU isolation before launch the container
+ PrivilegedOperation privilegedOperation = new PrivilegedOperation(
+ PrivilegedOperation.OperationType.GPU, Arrays
+ .asList(CONTAINER_ID_CLI_OPTION, containerIdStr));
+ if (!allocation.getDeniedGPUs().isEmpty()) {
+ privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_GPUS_CLI_OPTION,
+ StringUtils.join(",", allocation.getDeniedGPUs())));
+ }
+
+ privilegedOperationExecutor.executePrivilegedOperation(
+ privilegedOperation, true);
+ } catch (PrivilegedOperationException e) {
+ cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
+ containerIdStr);
+ LOG.warn("Could not update cgroup for container", e);
+ throw new ResourceHandlerException(e);
+ }
+
+ List ret = new ArrayList<>();
+ ret.add(new PrivilegedOperation(
+ PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ PrivilegedOperation.CGROUP_ARG_PREFIX
+ + cGroupsHandler.getPathForCGroupTasks(
+ CGroupsHandler.CGroupController.DEVICES, containerIdStr)));
+
+ return ret;
+ }
+
+ @VisibleForTesting
+ public GpuResourceAllocator getGpuAllocator() {
+ return gpuAllocator;
+ }
+
+ @Override
+ public List reacquireContainer(ContainerId containerId)
+ throws ResourceHandlerException {
+ gpuAllocator.recoverAssignedGpus(containerId);
+ return null;
+ }
+
+ @Override
+ public synchronized List postComplete(
+ ContainerId containerId) throws ResourceHandlerException {
+ gpuAllocator.cleanupAssignGpus(containerId);
+ cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
+ containerId.toString());
+ return null;
+ }
+
+ @Override
+ public List teardown() throws ResourceHandlerException {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
index 4d137f0e1d6..d4cab2eb4f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators.LocalResourceAllocators;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
@@ -94,6 +95,19 @@ public static CGroupsHandler getCGroupsHandler() {
return cGroupsHandler;
}
+ private static GpuResourceHandlerImpl getGpuResourceHandler(
+ Configuration conf) throws ResourceHandlerException {
+ boolean cgroupsGpuEnabled = conf.getBoolean(
+ YarnConfiguration.NM_GPU_RESOURCE_ENABLED,
+ YarnConfiguration.DEFAULT_NM_GPU_RESOURCE_ENABLED);
+ if (cgroupsGpuEnabled) {
+ return new GpuResourceHandlerImpl(getInitializedCGroupsHandler(conf),
+ PrivilegedOperationExecutor.getInstance(conf),
+ LocalResourceAllocators.getGpuResourceAllocator());
+ }
+ return null;
+ }
+
private static CGroupsCpuResourceHandlerImpl getCGroupsCpuResourceHandler(
Configuration conf) throws ResourceHandlerException {
boolean cgroupsCpuEnabled =
@@ -212,6 +226,7 @@ private static void initializeConfiguredResourceHandlerChain(
addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf));
+ addHandlerIfNotNull(handlerList, getGpuResourceHandler(conf));
resourceHandlerChain = new ResourceHandlerChain(handlerList);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
index e058d6ef14d..3d8acfd83e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.GpuResourceHandlerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerClient;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerInspectCommand;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java
index b6457540b3a..8cc8ee7ee12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/docker/DockerRunCommand.java
@@ -31,6 +31,7 @@
private static final String RUN_COMMAND = "run";
private final String image;
private List overrrideCommandWithArgs;
+ private List additionalEnvars = new ArrayList<>();
/** The following are mandatory: */
public DockerRunCommand(String containerId, String user, String image) {
@@ -120,6 +121,10 @@ public DockerRunCommand setOverrideCommandWithArgs(
return this;
}
+ public void addDockerRunEnvars(String key, String value) {
+ additionalEnvars.add(key + "=" + value);
+ }
+
@Override
public String getCommandWithArguments() {
List argList = new ArrayList<>();
@@ -131,6 +136,10 @@ public String getCommandWithArguments() {
argList.addAll(overrrideCommandWithArgs);
}
- return StringUtils.join(" ", argList);
+ String cmd = StringUtils.join(" ", argList);
+ for (String envar : additionalEnvars) {
+ cmd = cmd +"\n" + envar;
+ }
+ return cmd;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index c556b39d165..88d782d0862 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestPBImpl;
@@ -61,6 +62,7 @@
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
@@ -145,6 +147,9 @@
private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/";
+ private static final String CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX =
+ "/assignedResources_";
+
private static final byte[] EMPTY_VALUE = new byte[0];
private DB db;
@@ -285,6 +290,13 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
rcs.setLogDir(asString(entry.getValue()));
+ } else if (suffix.startsWith(CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX)) {
+ String resourceType = suffix.substring(
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX.length());
+ ResourceMappings.AssignedResources assignedResources =
+ ResourceMappings.AssignedResources.fromBytes(entry.getValue());
+ rcs.getResourceMappings().addAssignedResources(resourceType,
+ assignedResources);
} else {
LOG.warn("the container " + containerId
+ " will be killed because of the unknown key " + key
@@ -1084,6 +1096,33 @@ public void removeLogDeleter(ApplicationId appId) throws IOException {
}
}
+ @Override
+ public void storeAssignedResources(ContainerId containerId,
+ String resourceType, List assignedResources) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("storeAssignedResources: containerId=" + containerId
+ + ", assignedResources=" + StringUtils.join(",", assignedResources));
+ }
+
+ String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ ResourceMappings.AssignedResources res = new ResourceMappings.AssignedResources();
+ res.updateAssignedResources(assignedResources);
+
+ // New value will overwrite old values for the same key
+ batch.put(bytes(keyResChng), res.toBytes());
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
@SuppressWarnings("deprecation")
private void cleanupDeprecatedFinishedApps() {
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 96c3f9e5c6f..09df3e50009 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -258,6 +258,11 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
}
@Override
+ public void storeAssignedResources(ContainerId containerId,
+ String resourceType, List assignedResources) throws IOException {
+ }
+
+ @Override
protected void initStorage(Configuration conf) throws IOException {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index 9f87279fb25..83c49a7bf22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@Private
@Unstable
@@ -87,6 +88,7 @@ public NMStateStoreService(String name) {
int version;
private RecoveredContainerType recoveryType =
RecoveredContainerType.RECOVER;
+ ResourceMappings resMappings = new ResourceMappings();
public RecoveredContainerStatus getStatus() {
return status;
@@ -162,6 +164,10 @@ public RecoveredContainerType getRecoveryType() {
public void setRecoveryType(RecoveredContainerType recoveryType) {
this.recoveryType = recoveryType;
}
+
+ public ResourceMappings getResourceMappings() {
+ return resMappings;
+ }
}
public static class LocalResourceTrackerState {
@@ -635,7 +641,7 @@ public abstract void storeLogDeleter(ApplicationId appId,
*/
public abstract void removeLogDeleter(ApplicationId appId)
throws IOException;
-
+
/**
* Load the state of AMRMProxy.
* @return recovered state of AMRMProxy
@@ -688,6 +694,9 @@ public abstract void removeAMRMProxyAppContextEntry(
public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt)
throws IOException;
+ public abstract void storeAssignedResources(ContainerId containerId,
+ String resourceType, List assignedResources) throws IOException;
+
protected abstract void initStorage(Configuration conf) throws IOException;
protected abstract void startStorage() throws IOException;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java
new file mode 100644
index 00000000000..3da3515a9d5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/allocators/LocalResourceAllocators.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.scheduler.allocators;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.GpuResourceAllocator;
+
+/**
+ * Manages local resource allocators such as GPU resource allocator.
+ */
+public class LocalResourceAllocators {
+ private static GpuResourceAllocator gpuResourceAllocator;
+
+ public static GpuResourceAllocator getGpuResourceAllocator() {
+ return gpuResourceAllocator;
+ }
+
+ public static void setGpuResourceAllocator(GpuResourceAllocator allocator) {
+ gpuResourceAllocator = allocator;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index d2bd79ced22..f54a77056c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -108,6 +108,7 @@
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -393,23 +394,8 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
cm.stop();
}
- @Test
- public void testContainerResizeRecovery() throws Exception {
- conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
- NMStateStoreService stateStore = new NMMemoryStateStoreService();
- stateStore.init(conf);
- stateStore.start();
- Context context = createContext(conf, stateStore);
- ContainerManagerImpl cm = createContainerManager(context, delSrvc);
- cm.dispatcher.disableExitOnDispatchException();
- cm.init(conf);
- cm.start();
- // add an application by starting a container
- ApplicationId appId = ApplicationId.newInstance(0, 1);
- ApplicationAttemptId attemptId =
- ApplicationAttemptId.newInstance(appId, 1);
- ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+ private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
+ ContainerManagerImpl cm) throws Exception {
Map containerEnv = new HashMap<>();
setFlowContext(containerEnv, "app_name1", appId);
Map serviceData = Collections.emptyMap();
@@ -453,12 +439,35 @@ public void testContainerResizeRecovery() throws Exception {
context, cm, cid, clc, null);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
- Application app = context.getApplications().get(appId);
- assertNotNull(app);
// make sure the container reaches RUNNING state
waitForNMContainerState(cm, cid,
org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState.RUNNING);
+
+ }
+
+ @Test
+ public void testContainerResizeRecovery() throws Exception {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ context = createContext(conf, stateStore);
+ ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+ cm.init(conf);
+ cm.start();
+ // add an application by starting a container
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+
+ commonLaunchContainer(appId, cid, cm);
+
+ Application app = context.getApplications().get(appId);
+ assertNotNull(app);
+
Resource targetResource = Resource.newInstance(2048, 2);
ContainerUpdateResponse updateResponse =
updateContainers(context, cm, cid, targetResource);
@@ -481,6 +490,52 @@ public void testContainerResizeRecovery() throws Exception {
}
@Test
+ public void testResourceMappingRecoveryForContainer() throws Exception {
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
+ stateStore.init(conf);
+ stateStore.start();
+ context = createContext(conf, stateStore);
+ ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+ cm.init(conf);
+ cm.start();
+
+ // add an application by starting a container
+ ApplicationId appId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId attemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+
+ commonLaunchContainer(appId, cid, cm);
+
+ Application app = context.getApplications().get(appId);
+ assertNotNull(app);
+
+ // store resource mapping of the container
+ stateStore.storeAssignedResources(cid, "gpu",
+ Arrays.asList("1", "2", "3"));
+
+ cm.stop();
+ context = createContext(conf, stateStore);
+ cm = createContainerManager(context);
+ cm.init(conf);
+ cm.start();
+ assertEquals(1, context.getApplications().size());
+ app = context.getApplications().get(appId);
+ assertNotNull(app);
+
+ Container nmContainer = context.getContainers().get(cid);
+ Assert.assertNotNull(nmContainer);
+ List assignedResource =
+ nmContainer.getResourceMappings().getAssignedResources("gpu");
+ Assert.assertNotNull(assignedResource);
+ Assert.assertEquals(3, assignedResource.size());
+ Assert.assertTrue(
+ assignedResource.containsAll(Arrays.asList("1", "2", "3")));
+ }
+
+ @Test
public void testContainerCleanupOnShutdown() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestGpuResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestGpuResourceHandler.java
new file mode 100644
index 00000000000..c83000305fb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestGpuResourceHandler.java
@@ -0,0 +1,305 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestGpuResourceHandler {
+ private CGroupsHandler mockCGroupsHandler;
+ private PrivilegedOperationExecutor mockPrivilegedExecutor;
+ private GpuResourceHandlerImpl gpuResourceHandler;
+ private NMStateStoreService mockNMStateStore;
+ private ConcurrentHashMap runningContainersMap;
+
+ @Before
+ public void setup() {
+ mockCGroupsHandler = mock(CGroupsHandler.class);
+ mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class);
+ mockNMStateStore = mock(NMStateStoreService.class);
+
+ Context nmctx = mock(Context.class);
+ when(nmctx.getNMStateStore()).thenReturn(mockNMStateStore);
+ runningContainersMap = new ConcurrentHashMap<>();
+ when(nmctx.getContainers()).thenReturn(runningContainersMap);
+ GpuResourceAllocator gpuResourceAllocator = new GpuResourceAllocator(nmctx);
+
+ gpuResourceHandler = new GpuResourceHandlerImpl(mockCGroupsHandler,
+ mockPrivilegedExecutor, gpuResourceAllocator);
+ }
+
+ @Test
+ public void testBootStrap() throws ResourceHandlerException {
+ Configuration conf = new YarnConfiguration();
+
+ gpuResourceHandler.bootstrap(conf);
+ verify(mockCGroupsHandler, times(1)).initializeCGroupController(
+ CGroupsHandler.CGroupController.DEVICES);
+ }
+
+ private static ContainerId getContainerId(int id) {
+ return ContainerId.newContainerId(ApplicationAttemptId
+ .newInstance(ApplicationId.newInstance(1234L, 1), 1), id);
+ }
+
+ private static Container mockContainerWithGpuRequest(int id, int numGpuRequest) {
+ Container c = mock(Container.class);
+ when(c.getContainerId()).thenReturn(getContainerId(id));
+ ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+ Map envs = new HashMap<>();
+ when(clc.getEnvironment()).thenReturn(envs);
+ envs.put("REQUESTED_GPU_NUM", String.valueOf(numGpuRequest));
+ when(c.getLaunchContext()).thenReturn(clc);
+ return c;
+ }
+
+ private void verifyDeniedDevices(ContainerId containerId, List deniedDevices)
+ throws ResourceHandlerException, PrivilegedOperationException {
+ verify(mockCGroupsHandler, times(1)).createCGroup(
+ CGroupsHandler.CGroupController.DEVICES, containerId.toString());
+
+ if (null != deniedDevices && !deniedDevices.isEmpty()) {
+ verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation(
+ new PrivilegedOperation(PrivilegedOperation.OperationType.GPU, Arrays
+ .asList(GpuResourceHandlerImpl.CONTAINER_ID_CLI_OPTION,
+ containerId.toString(),
+ GpuResourceHandlerImpl.EXCLUDED_GPUS_CLI_OPTION,
+ StringUtils.join(",", deniedDevices))), true);
+ }
+ }
+
+ @Test
+ public void testAllocation() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4");
+ conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true);
+
+ gpuResourceHandler.bootstrap(conf);
+ Assert.assertEquals(4,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+
+ /* Start container 1, asks 3 containers */
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 3));
+
+ // Only device=4 will be blocked.
+ verifyDeniedDevices(getContainerId(1), Arrays.asList(4));
+
+ /* Start container 2, asks 2 containers. Excepted to fail */
+ boolean failedToAllocate = false;
+ try {
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 2));
+ } catch (ResourceHandlerException e) {
+ failedToAllocate = true;
+ }
+ Assert.assertTrue(failedToAllocate);
+
+ /* Start container 3, ask 1 container, succeeded */
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(3, 1));
+
+ // devices = 0/1/3 will be blocked
+ verifyDeniedDevices(getContainerId(3), Arrays.asList(0, 1, 3));
+
+ /* Start container 4, ask 0 container, succeeded */
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(4, 0));
+
+ // All devices will be blocked
+ verifyDeniedDevices(getContainerId(4), Arrays.asList(0, 1, 3, 4));
+
+ /* Release container-1, expect cgroups deleted */
+ gpuResourceHandler.postComplete(getContainerId(1));
+
+ verify(mockCGroupsHandler, times(1)).createCGroup(
+ CGroupsHandler.CGroupController.DEVICES, getContainerId(1).toString());
+ Assert.assertEquals(3,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+
+ /* Release container-3, expect cgroups deleted */
+ gpuResourceHandler.postComplete(getContainerId(3));
+
+ verify(mockCGroupsHandler, times(1)).createCGroup(
+ CGroupsHandler.CGroupController.DEVICES, getContainerId(3).toString());
+ Assert.assertEquals(4,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+ }
+
+ @Test
+ public void testAllocationWithoutAllowedGpus() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "");
+ conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true);
+
+ gpuResourceHandler.bootstrap(conf);
+ Assert.assertEquals(0,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+
+ /* Start container 1, asks 0 containers */
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 0));
+ verifyDeniedDevices(getContainerId(1), Collections.emptyList());
+
+ /* Start container 2, asks 1 containers. Excepted to fail */
+ boolean failedToAllocate = false;
+ try {
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 1));
+ } catch (ResourceHandlerException e) {
+ failedToAllocate = true;
+ }
+ Assert.assertTrue(failedToAllocate);
+
+ /* Release container 1, expect cgroups deleted */
+ gpuResourceHandler.postComplete(getContainerId(1));
+
+ verify(mockCGroupsHandler, times(1)).createCGroup(
+ CGroupsHandler.CGroupController.DEVICES, getContainerId(1).toString());
+ Assert.assertEquals(0,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+ }
+
+ @Test
+ public void testAllocationStored() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4");
+ conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true);
+
+ gpuResourceHandler.bootstrap(conf);
+ Assert.assertEquals(4,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+
+ /* Start container 1, asks 3 containers */
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 3));
+
+ verify(mockNMStateStore).storeAssignedResources(getContainerId(1), "gpu",
+ Arrays.asList("0", "1", "3"));
+
+ // Only device=4 will be blocked.
+ verifyDeniedDevices(getContainerId(1), Arrays.asList(4));
+
+ /* Start container 2, ask 0 container, succeeded */
+ gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 0));
+
+ verifyDeniedDevices(getContainerId(2), Arrays.asList(0, 1, 3, 4));
+
+ // Store assigned resource will not be invoked.
+ verify(mockNMStateStore, never()).storeAssignedResources(
+ eq(getContainerId(2)), eq("gpu"), anyList());
+ }
+
+ @Test
+ public void testRecoverResourceAllocation() throws Exception {
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4");
+ conf.setBoolean(YarnConfiguration.NM_GPU_RESOURCE_ENABLED, true);
+
+ gpuResourceHandler.bootstrap(conf);
+ Assert.assertEquals(4,
+ gpuResourceHandler.getGpuAllocator().getAvailableGpus());
+
+ Container nmContainer = mock(Container.class);
+ ResourceMappings rmap = new ResourceMappings();
+ ResourceMappings.AssignedResources ar =
+ new ResourceMappings.AssignedResources();
+ ar.updateAssignedResources(Arrays.asList("1", "3"));
+ rmap.addAssignedResources("gpu", ar);
+ when(nmContainer.getResourceMappings()).thenReturn(rmap);
+
+ runningContainersMap.put(getContainerId(1), nmContainer);
+
+ // TEST CASE
+ // Reacquire container restore state of GPU Resource Allocator.
+ gpuResourceHandler.reacquireContainer(getContainerId(1));
+
+ Map deviceAllocationMapping =
+ gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
+ Assert.assertEquals(2, deviceAllocationMapping.size());
+ Assert.assertTrue(
+ deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3)));
+ Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1));
+
+ // TEST CASE
+ // Try to reacquire a container but requested device is not in allowed list.
+ nmContainer = mock(Container.class);
+ rmap = new ResourceMappings();
+ ar = new ResourceMappings.AssignedResources();
+ // id=5 is not in allowed list.
+ ar.updateAssignedResources(Arrays.asList("4", "5"));
+ rmap.addAssignedResources("gpu", ar);
+ when(nmContainer.getResourceMappings()).thenReturn(rmap);
+
+ runningContainersMap.put(getContainerId(2), nmContainer);
+
+ boolean caughtException = false;
+ try {
+ gpuResourceHandler.reacquireContainer(getContainerId(1));
+ } catch (ResourceHandlerException e) {
+ caughtException = true;
+ }
+ Assert.assertTrue(
+ "Should fail since requested device Id is not in allowed list",
+ caughtException);
+
+ // Make sure internal state not changed.
+ deviceAllocationMapping =
+ gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
+ Assert.assertEquals(2, deviceAllocationMapping.size());
+ Assert.assertTrue(
+ deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3)));
+ Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1));
+
+ // TEST CASE
+ // Try to reacquire a container but requested device is already assigned.
+ nmContainer = mock(Container.class);
+ rmap = new ResourceMappings();
+ ar = new ResourceMappings.AssignedResources();
+ // id=3 is already assigned
+ ar.updateAssignedResources(Arrays.asList("4", "3"));
+ rmap.addAssignedResources("gpu", ar);
+ when(nmContainer.getResourceMappings()).thenReturn(rmap);
+
+ runningContainersMap.put(getContainerId(2), nmContainer);
+
+ caughtException = false;
+ try {
+ gpuResourceHandler.reacquireContainer(getContainerId(1));
+ } catch (ResourceHandlerException e) {
+ caughtException = true;
+ }
+ Assert.assertTrue(
+ "Should fail since requested device Id is not in allowed list",
+ caughtException);
+
+ // Make sure internal state not changed.
+ deviceAllocationMapping =
+ gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
+ Assert.assertEquals(2, deviceAllocationMapping.size());
+ Assert.assertTrue(
+ deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3)));
+ Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 0e039947c49..55c7757eb7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
public class NMMemoryStateStoreService extends NMStateStoreService {
private Map apps;
@@ -119,6 +120,7 @@ public synchronized void removeApplication(ApplicationId appId)
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
+ rcsCopy.resMappings = rcs.getResourceMappings();
result.add(rcsCopy);
}
return result;
@@ -478,6 +480,15 @@ public synchronized void removeAMRMProxyAppContext(
amrmProxyState.getAppContexts().remove(attempt);
}
+ public void storeAssignedResources(ContainerId containerId,
+ String resourceType, List assignedResources) throws IOException {
+ ResourceMappings.AssignedResources ar =
+ new ResourceMappings.AssignedResources();
+ ar.updateAssignedResources(assignedResources);
+ containerStates.get(containerId).getResourceMappings().addAssignedResources(
+ resourceType, ar);
+ }
+
private static class TrackerState {
Map inProgressMap =
new HashMap();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 01331560e6c..ef07972a833 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
+import com.sun.tools.javadoc.Start;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -952,18 +953,8 @@ protected DB openDatabase(Configuration conf) {
store.close();
}
- @Test
- public void testUnexpectedKeyDoesntThrowException() throws IOException {
- // test empty when no state
- List recoveredContainers = stateStore
- .loadContainersState();
- assertTrue(recoveredContainers.isEmpty());
-
+ private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException {
// create a container request
- ApplicationId appId = ApplicationId.newInstance(1234, 3);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
- 4);
- ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
LocalResource lrsrc = LocalResource.newInstance(
URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
@@ -997,8 +988,23 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException {
"tokenservice");
StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
containerToken);
-
stateStore.storeContainer(containerId, 0, containerReq);
+ return containerReq;
+ }
+
+ @Test
+ public void testUnexpectedKeyDoesntThrowException() throws IOException {
+ // test empty when no state
+ List recoveredContainers = stateStore
+ .loadContainersState();
+ assertTrue(recoveredContainers.isEmpty());
+
+ ApplicationId appId = ApplicationId.newInstance(1234, 3);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 4);
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+
+ StartContainerRequest startContainerRequest = storeMockContainer(containerId);
// add a invalid key
byte[] invalidKey = ("ContainerManager/containers/"
@@ -1011,7 +1017,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException {
assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
- assertEquals(containerReq, rcs.getStartRequest());
+ assertEquals(startContainerRequest, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
// assert unknown keys are cleaned up finally
@@ -1119,6 +1125,43 @@ public void testAMRMProxyStorage() throws IOException {
}
}
+ public void testStateStoreForResourceMapping() throws IOException {
+ // test empty when no state
+ List recoveredContainers = stateStore
+ .loadContainersState();
+ assertTrue(recoveredContainers.isEmpty());
+
+ ApplicationId appId = ApplicationId.newInstance(1234, 3);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 4);
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+ storeMockContainer(containerId);
+
+ // Store ResourceMapping
+ stateStore.storeAssignedResources(containerId, "gpu",
+ Arrays.asList("1", "2", "3"));
+ // This will overwrite above
+ stateStore.storeAssignedResources(containerId, "gpu",
+ Arrays.asList("1", "2", "4"));
+ stateStore.storeAssignedResources(containerId, "fpga",
+ Arrays.asList("3", "4", "5", "6"));
+
+ // add a invalid key
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ RecoveredContainerState rcs = recoveredContainers.get(0);
+ List res = rcs.getResourceMappings().getAssignedResources("gpu");
+ Assert.assertNotNull(res);
+ Assert.assertEquals(3, res.size());
+ Assert.assertTrue(res.containsAll(Arrays.asList("1", "2", "4")));
+
+ res = rcs.getResourceMappings().getAssignedResources("fpga");
+ Assert.assertNotNull(res);
+ Assert.assertEquals(4, res.size());
+ Assert.assertTrue(res.containsAll(Arrays.asList("3", "4", "5", "6")));
+ }
+
private static class NMTokenSecretManagerForTest extends
BaseNMTokenSecretManager {
public MasterKey generateKey() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 022baeac492..cc94288595e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -235,4 +236,9 @@ public void sendKillEvent(int exitStatus, String description) {
public boolean isRecovering() {
return false;
}
+
+ @Override
+ public ResourceMappings getResourceMappings() {
+ return null;
+ }
}