diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index 13590f46598..c5e5643a1cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceMappingManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; @@ -52,12 +53,13 @@ public class ResourcePluginManager { private static final Logger LOG = LoggerFactory.getLogger(ResourcePluginManager.class); - private static final Set SUPPORTED_RESOURCE_PLUGINS = ImmutableSet.of( - GPU_URI, FPGA_URI); + private static final Set SUPPORTED_RESOURCE_PLUGINS = + ImmutableSet.of(GPU_URI, FPGA_URI); private Map configuredPlugins = Collections.emptyMap(); + private DeviceMappingManager deviceMappingManager = null; public synchronized void initialize(Context context) throws YarnException, ClassNotFoundException { @@ -123,7 +125,7 @@ public void initializePluggableDevicePlugins(Context context, throws YarnRuntimeException, ClassNotFoundException { LOG.info("The pluggable device framework enabled," + "trying to load the vendor plugins"); - + deviceMappingManager = new DeviceMappingManager(context); String[] pluginClassNames = configuration.getStrings( YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES); if (null == pluginClassNames) { @@ -171,7 +173,7 @@ public void initializePluggableDevicePlugins(Context context, resourceName, pluginClassName); DevicePluginAdapter pluginAdapter = new DevicePluginAdapter( - resourceName, dpInstance); + resourceName, dpInstance, deviceMappingManager); LOG.info("Adapter of {} created. Initializing..", pluginClassName); try { pluginAdapter.initialize(context); @@ -232,6 +234,10 @@ public boolean isConfiguredResourceName(String resourceName) { return true; } + public DeviceMappingManager getDeviceMappingManager() { + return deviceMappingManager; + } + public synchronized void cleanup() throws YarnException { for (ResourcePlugin plugin : configuredPlugins.values()) { plugin.cleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceMappingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceMappingManager.java new file mode 100644 index 00000000000..34466ce4951 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceMappingManager.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Schedule device resource based on requirements and do book keeping + * It holds all device type resource and can do scheduling as a default + * scheduler. + * */ +public class DeviceMappingManager { + static final Log LOG = LogFactory.getLog(DeviceMappingManager.class); + + private Context nmContext; + private static final int WAIT_MS_PER_LOOP = 1000; + + /** + * Hold all type of devices. + * key is the device resource name + * value is a sorted set of {@link Device} + * */ + private Map> allAllowedDevices = + new ConcurrentHashMap<>(); + + /** + * Hold used devices. + * key is the device resource name + * value is a sorted map of {@link Device} and {@link ContainerId} pairs + * */ + private Map> allUsedDevices = + new ConcurrentHashMap<>(); + + public DeviceMappingManager(Context context) { + nmContext = context; + } + + @VisibleForTesting + public Map> getAllAllowedDevices() { + return allAllowedDevices; + } + + @VisibleForTesting + public Map> getAllUsedDevices() { + return allUsedDevices; + } + + public synchronized void addDeviceSet(String resourceName, + Set deviceSet) { + LOG.info("Adding new resource: " + "type:" + + resourceName + "," + deviceSet); + allAllowedDevices.put(resourceName, new TreeSet<>(deviceSet)); + allUsedDevices.put(resourceName, new TreeMap<>()); + } + + public DeviceAllocation assignDevices(String resourceName, + Container container) + throws ResourceHandlerException { + DeviceAllocation allocation = internalAssignDevices(resourceName, + container); + // Wait for a maximum of 120 seconds if no available Devices are there + // which are yet to be released. + final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP; + int timeWaiting = 0; + while (allocation == null) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + // Sleep for 1 sec to ensure there are some free devices which are + // getting released. + try { + LOG.info("Container : " + container.getContainerId() + + " is waiting for free " + resourceName + " devices."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + allocation = internalAssignDevices(resourceName, container); + } catch (InterruptedException e) { + // On any interrupt, break the loop and continue execution. + break; + } + } + + if (allocation == null) { + String message = "Could not get valid " + resourceName + + " device for container '" + container.getContainerId() + + "' as some other containers might not releasing them."; + LOG.warn(message); + throw new ResourceHandlerException(message); + } + return allocation; + } + + private synchronized DeviceAllocation internalAssignDevices( + String resourceName, Container container) + throws ResourceHandlerException { + Resource requestedResource = container.getResource(); + ContainerId containerId = container.getContainerId(); + int requestedDeviceCount = getRequestedDeviceCount(resourceName, + requestedResource); + // Assign devices to container if requested some. + if (requestedDeviceCount > 0) { + if (requestedDeviceCount > getAvailableDevices(resourceName)) { + // If there are some devices which are getting released, wait for few + // seconds to get it. + if (requestedDeviceCount <= getReleasingDevices(resourceName) + + getAvailableDevices(resourceName)) { + return null; + } + } + + int availableDeviceCount = getAvailableDevices(resourceName); + if (requestedDeviceCount > availableDeviceCount) { + throw new ResourceHandlerException("Failed to find enough " + + resourceName + + ", requestor=" + containerId + + ", #Requested=" + requestedDeviceCount + ", #available=" + + availableDeviceCount); + } + + Set assignedDevices = new TreeSet<>(); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + + defaultScheduleAction(allowedDevices, usedDevices, + assignedDevices, containerId, requestedDeviceCount); + + // Record in state store if we allocated anything + if (!assignedDevices.isEmpty()) { + try { + // Update state store. + nmContext.getNMStateStore().storeAssignedResources(container, + resourceName, + new ArrayList<>(assignedDevices)); + } catch (IOException e) { + cleanupAssignedDevices(resourceName, containerId); + throw new ResourceHandlerException(e); + } + } + + return new DeviceAllocation(resourceName, assignedDevices, + Sets.difference(allowedDevices, assignedDevices)); + } + return new DeviceAllocation(resourceName, null, + allAllowedDevices.get(resourceName)); + } + + public synchronized void recoverAssignedDevices(String resourceName, + ContainerId containerId) + throws ResourceHandlerException { + Container c = nmContext.getContainers().get(containerId); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + if (null == c) { + throw new ResourceHandlerException( + "This shouldn't happen, cannot find container with id=" + + containerId); + } + + for (Serializable deviceSerializable : c.getResourceMappings() + .getAssignedResources(resourceName)) { + if (!(deviceSerializable instanceof Device)) { + throw new ResourceHandlerException( + "Trying to recover device id, however it" + + " is not Device instance, this shouldn't happen"); + } + + Device device = (Device) deviceSerializable; + + // Make sure it is in allowed device. + if (!allowedDevices.contains(device)) { + throw new ResourceHandlerException( + "Try to recover device = " + device + + " however it is not in allowed device list:" + StringUtils + .join(",", allowedDevices)); + } + + // Make sure it is not occupied by anybody else + if (usedDevices.containsKey(device)) { + throw new ResourceHandlerException( + "Try to recover device id = " + device + + " however it is already assigned to container=" + + usedDevices.get(device) + + ", please double check what happened."); + } + + usedDevices.put(device, containerId); + } + } + + public synchronized void cleanupAssignedDevices(String resourceName, + ContainerId containerId) { + Iterator> iter = + allUsedDevices.get(resourceName).entrySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getValue().equals(containerId)) { + iter.remove(); + } + } + } + + public static int getRequestedDeviceCount(String resourceName, + Resource requestedResource) { + try { + return Long.valueOf(requestedResource.getResourceValue( + resourceName)).intValue(); + } catch (ResourceNotFoundException e) { + return 0; + } + } + + public synchronized int getAvailableDevices(String resourceName) { + return allAllowedDevices.get(resourceName).size() + - allUsedDevices.get(resourceName).size(); + } + + private synchronized long getReleasingDevices(String resourceName) { + long releasingDevices = 0; + Map used = allUsedDevices.get(resourceName); + Iterator> iter = used.entrySet() + .iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next().getValue(); + Container container = nmContext.getContainers().get(containerId); + if (container != null) { + if (container.isContainerInFinalStates()) { + releasingDevices = releasingDevices + container.getResource() + .getResourceInformation(resourceName).getValue(); + } + } + } + return releasingDevices; + } + + // default scheduling logic + private void defaultScheduleAction(Set allowed, + Map used, Set assigned, + ContainerId containerId, int count) { + for (Device device : allowed) { + if (!used.containsKey(device)) { + used.put(device, containerId); + assigned.add(device); + if (assigned.size() == count) { + return; + } + } + } // end for + } + + static class DeviceAllocation { + private String resourceName; + + private Set allowed = Collections.emptySet(); + private Set denied = Collections.emptySet(); + + DeviceAllocation(String resName, Set a, + Set d) { + this.resourceName = resName; + if (allowed != null) { + this.allowed = ImmutableSet.copyOf(a); + } + if (denied != null) { + this.denied = ImmutableSet.copyOf(d); + } + } + + + public Set getAllowed() { + return allowed; + } + + @Override + public String toString() { + return "ResourceType: " + resourceName + + ", Allowed Devices: " + allowed + + ", Denied Devices: " + denied; + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java index 2db58be61a6..373f535a8dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java @@ -33,23 +33,34 @@ /** - * The {@link DevicePluginAdapter} will adapt existing hooks + * The {@link DevicePluginAdapter} will adapt existing hooks. * into vendor plugin's logic. * It decouples the vendor plugin from YARN's device framework * * */ public class DevicePluginAdapter implements ResourcePlugin { - final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class); + static final Log LOG = LogFactory.getLog(DevicePluginAdapter.class); private String resourceName; private DevicePlugin devicePlugin; + + private DeviceMappingManager deviceMappingManager; + + + private DeviceResourceHandlerImpl deviceResourceHandler; private DeviceResourceUpdaterImpl deviceResourceUpdater; - public DevicePluginAdapter(String name, DevicePlugin dp) { + public DevicePluginAdapter(String name, DevicePlugin dp, + DeviceMappingManager dmm) { + deviceMappingManager = dmm; resourceName = name; devicePlugin = dp; } + public DeviceMappingManager getDeviceMappingManager() { + return deviceMappingManager; + } + @Override public void initialize(Context context) throws YarnException { deviceResourceUpdater = new DeviceResourceUpdaterImpl( @@ -62,7 +73,10 @@ public void initialize(Context context) throws YarnException { public ResourceHandler createResourceHandler(Context nmContext, CGroupsHandler cGroupsHandler, PrivilegedOperationExecutor privilegedOperationExecutor) { - return null; + this.deviceResourceHandler = new DeviceResourceHandlerImpl(resourceName, + devicePlugin, this, deviceMappingManager, + cGroupsHandler, privilegedOperationExecutor); + return deviceResourceHandler; } @Override @@ -85,4 +99,7 @@ public NMResourceInfo getNMResourceInfo() throws YarnException { return null; } + public DeviceResourceHandlerImpl getDeviceResourceHandler() { + return deviceResourceHandler; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java new file mode 100644 index 00000000000..03725cb2b42 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; + +import java.util.List; +import java.util.Set; + +public class DeviceResourceHandlerImpl implements ResourceHandler { + + static final Log LOG = LogFactory.getLog(DeviceResourceHandlerImpl.class); + + private String resourceName; + private DevicePlugin devicePlugin; + private DeviceMappingManager deviceMappingManager; + private CGroupsHandler cGroupsHandler; + private PrivilegedOperationExecutor privilegedOperationExecutor; + private DevicePluginAdapter devicePluginAdapter; + + public DeviceResourceHandlerImpl(String reseName, + DevicePlugin devPlugin, + DevicePluginAdapter devPluginAdapter, + DeviceMappingManager devMappingManager, + CGroupsHandler cgHandler, + PrivilegedOperationExecutor operation) { + this.devicePluginAdapter = devPluginAdapter; + this.resourceName = reseName; + this.devicePlugin = devPlugin; + this.cGroupsHandler = cgHandler; + this.privilegedOperationExecutor = operation; + this.deviceMappingManager = devMappingManager; + } + + @Override + public List bootstrap(Configuration configuration) + throws ResourceHandlerException { + Set availableDevices = null; + try { + availableDevices = devicePlugin.getDevices(); + } catch (Exception e) { + throw new ResourceHandlerException("Exception thrown from" + + " plugin's \"getDevices\"" + e.getMessage()); + } + /** + * We won't fail the NM if plugin returns invalid value here. + * TODO: we should update RM's resource count if something wrong + * */ + if (availableDevices == null) { + LOG.error("Bootstrap " + resourceName + + " failed. Null value got from plugin's getDevices method"); + return null; + } + // Add device set. Here we trust the plugin's return value + deviceMappingManager.addDeviceSet(resourceName, availableDevices); + // TODO: Init cgroups + + return null; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + DeviceMappingManager.DeviceAllocation allocation = + deviceMappingManager.assignDevices(resourceName, container); + LOG.debug("Allocated to " + + containerIdStr + ": " + allocation); + + try { + devicePlugin.onDevicesAllocated( + allocation.getAllowed(), DeviceRuntimeSpec.RUNTIME_CGROUPS); + } catch (Exception e) { + throw new ResourceHandlerException("Exception thrown from" + + " plugin's \"onDeviceAllocated\"" + e.getMessage()); + } + + // cgroups operation based on allocation + /** + * TODO: implement a general container-executor device module + * */ + + return null; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + deviceMappingManager.recoverAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + deviceMappingManager.cleanupAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List teardown() + throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java index 6d185412dc0..59a92c76c9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java @@ -18,17 +18,34 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; -import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.service.ServiceOperations; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.nodemanager.*; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.TestResourceUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -36,13 +53,23 @@ import java.io.File; import java.io.IOException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; /** * Unit tests for DevicePluginAdapter. @@ -55,6 +82,9 @@ private YarnConfiguration conf; private String tempResourceTypesFile; + private CGroupsHandler mockCGroupsHandler; + private PrivilegedOperationExecutor mockPrivilegedExecutor; + private NodeManager nm; @Before public void setup() throws Exception { @@ -64,6 +94,8 @@ public void setup() throws Exception { String resourceTypesFile = "resource-types-pluggable-devices.xml"; this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile); + mockCGroupsHandler = mock(CGroupsHandler.class); + mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class); } @After @@ -73,19 +105,111 @@ public void tearDown() throws IOException { if (dest.exists()) { dest.delete(); } + if (nm != null) { + try { + ServiceOperations.stop(nm); + } catch (Throwable t) { + // ignore + } + } + } + + + /** + * Use the MyPlugin which doesn't implement scheduler interfaces + * Plugin's initialization is tested in TestResourcePluginManager + * + * */ + @Test + public void testBasicWorkflow() + throws YarnException, IOException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService storeService = mock(NMStateStoreService.class); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceMappingManager dmm = new DeviceMappingManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceMappingManager()).thenReturn(dmm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.RESOURCE_NAME; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dmm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + int size = dmm.getAvailableDevices(resourceName); + Assert.assertEquals(3, size); + + // A container c1 requests 1 device + Container c1 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c1); + // check book keeping + Assert.assertEquals(2, + dmm.getAvailableDevices(resourceName)); + Assert.assertEquals(1, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.getDeviceResourceHandler().postComplete(getContainerId(0)); + Assert.assertEquals(3, + dmm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + + // A container c2 requests 3 device + Container c2 = mockContainerWithDeviceRequest(1, + resourceName, + 3,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c2); + // check book keeping + Assert.assertEquals(0, + dmm.getAvailableDevices(resourceName)); + Assert.assertEquals(3, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.getDeviceResourceHandler().postComplete(getContainerId(1)); + Assert.assertEquals(3, + dmm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); } @Test public void testDeviceResourceUpdaterImpl() throws YarnException { Resource nodeResource = mock(Resource.class); + NodeManager.NMContext context = mock(NodeManager.NMContext.class); // Init an plugin MyPlugin plugin = new MyPlugin(); MyPlugin spyPlugin = spy(plugin); String resourceName = MyPlugin.RESOURCE_NAME; + // Init scheduler manager + DeviceMappingManager dmm = new DeviceMappingManager(context); // Init an adapter for the plugin DevicePluginAdapter adapter = new DevicePluginAdapter( - resourceName, - spyPlugin); + resourceName, spyPlugin, dmm); adapter.initialize(mock(Context.class)); adapter.getNodeResourceHandlerInstance() .updateConfiguredResource(nodeResource); @@ -94,6 +218,234 @@ public void testDeviceResourceUpdaterImpl() throws YarnException { resourceName, 3); } + @Test + public void testStoreDeviceSchedulerManagerState() + throws IOException, YarnException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService realStoreService = new NMMemoryStateStoreService(); + NMStateStoreService storeService = spy(realStoreService); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceMappingManager dmm = new DeviceMappingManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceMappingManager()).thenReturn(dmm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.RESOURCE_NAME; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dmm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + + // A container c0 requests 1 device + org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container c0 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c0); + // ensure container1's resource is persistent + verify(storeService).storeAssignedResources(c0, resourceName, + Arrays.asList(Device.Builder.newInstance() + .setId(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:00.0") + .setHealthy(true) + .build())); + } + + @Test + public void testRecoverDeviceSchedulerManagerState() throws IOException, YarnException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService realStoreService = new NMMemoryStateStoreService(); + NMStateStoreService storeService = spy(realStoreService); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceMappingManager dmm = new DeviceMappingManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceMappingManager()).thenReturn(dmm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.RESOURCE_NAME; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dmm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + // mock NMStateStore + Device storedDevice = Device.Builder.newInstance() + .setId(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:00.0") + .setHealthy(true) + .build(); + ConcurrentHashMap runningContainersMap + = new ConcurrentHashMap<>(); + Container nmContainer = mock(Container.class); + ResourceMappings rmap = new ResourceMappings(); + ResourceMappings.AssignedResources ar = + new ResourceMappings.AssignedResources(); + ar.updateAssignedResources( + Arrays.asList(storedDevice)); + rmap.addAssignedResources(resourceName, ar); + when(nmContainer.getResourceMappings()).thenReturn(rmap); + when(context.getContainers()).thenReturn(runningContainersMap); + + // Test case 1. c0 get recovered. scheduler state restored + runningContainersMap.put(getContainerId(0), nmContainer); + adapter.getDeviceResourceHandler().reacquireContainer( + getContainerId(0)); + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(2, + dmm.getAvailableDevices(resourceName)); + Map used = dmm.getAllUsedDevices().get(resourceName); + Assert.assertTrue(used.keySet().contains(storedDevice)); + + // Test case 2. c1 wants get recovered. + // But stored device is already allocated to c2 + nmContainer = mock(Container.class); + rmap = new ResourceMappings(); + ar = new ResourceMappings.AssignedResources(); + ar.updateAssignedResources( + Arrays.asList(storedDevice)); + rmap.addAssignedResources(resourceName, ar); + // already assigned to c1 + runningContainersMap.put(getContainerId(2), nmContainer); + boolean caughtException = false; + try { + adapter.getDeviceResourceHandler().reacquireContainer(getContainerId(1)); + } catch (ResourceHandlerException e) { + caughtException = true; + } + Assert.assertTrue( + "Should fail since requested device is assigned already", + caughtException); + // don't affect c0 allocation state + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(2, + dmm.getAvailableDevices(resourceName)); + used = dmm.getAllUsedDevices().get(resourceName); + Assert.assertTrue(used.keySet().contains(storedDevice)); + } + + @Test + public void testAssignedDeviceCleanupWhenStoreOpFails() + throws IOException, YarnException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService realStoreService = new NMMemoryStateStoreService(); + NMStateStoreService storeService = spy(realStoreService); + when(context.getNMStateStore()).thenReturn(storeService); + doThrow(new IOException("Exception ...")).when(storeService) + .storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceMappingManager dmm = new DeviceMappingManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceMappingManager()).thenReturn(dmm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.RESOURCE_NAME; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dmm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + + // A container c0 requests 1 device + Container c0 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + boolean exception = false; + try { + adapter.getDeviceResourceHandler().preStart(c0); + } catch (ResourceHandlerException e) { + exception = true; + } + Assert.assertTrue("Should throw exception in preStart", exception); + // no device assigned + Assert.assertEquals(3, + dmm.getAllAllowedDevices().get(resourceName).size()); + Assert.assertEquals(0, + dmm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dmm.getAvailableDevices(resourceName)); + + } + + private static Container mockContainerWithDeviceRequest(int id, + String resourceName, + int numDeviceRequest, + boolean dockerContainerEnabled) { + Container c = mock(Container.class); + when(c.getContainerId()).thenReturn(getContainerId(id)); + + Resource res = Resource.newInstance(1024, 1); + ResourceMappings resMapping = new ResourceMappings(); + + res.setResourceValue(resourceName, numDeviceRequest); + when(c.getResource()).thenReturn(res); + when(c.getResourceMappings()).thenReturn(resMapping); + + ContainerLaunchContext clc = mock(ContainerLaunchContext.class); + Map env = new HashMap<>(); + if (dockerContainerEnabled) { + env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE, + ContainerRuntimeConstants.CONTAINER_RUNTIME_DOCKER); + } + when(clc.getEnvironment()).thenReturn(env); + when(c.getLaunchContext()).thenReturn(clc); + return c; + } + + private static ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), id); + } + private class MyPlugin implements DevicePlugin { private final static String RESOURCE_NAME = "cmpA.com/hdwA"; @Override