diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6488ebfc4ec..c3d41911363 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1605,6 +1605,27 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_RESOURCE_PLUGINS = NM_PREFIX + "resource-plugins"; + /** + * This setting controls if pluggable device plugin framework is enabled. + * */ + @Private + public static final String NM_RESOURCE_PLUGINS_ENABLE_PLUGGABLE_DEVICE_FRAMEWORK = + NM_RESOURCE_PLUGINS + ".pluggable-device-framework.enable"; + + /** + * The pluggable device plugin framework is disabled by default + * */ + @Private + public static final boolean DEFAULT_NM_RESOURCE_PLUGINS_ENABLE_PLUGGABLE_DEVICE_FRAMEWORK = false; + + /** + * This settings contains vendor plugin class names for device plugin framework to load. + * Split by comma + * */ + @Private + public static final String NM_RESOURCE_PLUGINS_PLUGGABLE_CLASS = + NM_RESOURCE_PLUGINS + ".pluggable-class"; + /** * Prefix for gpu configurations. Work in progress: This configuration * parameter may be changed/removed in the future. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java new file mode 100644 index 00000000000..295ee2fa701 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class Device implements Serializable, Comparable { + + private static final long serialVersionUID = 1L; + + /** + * Required fields: + * ID: an plugin specified index number + * devPath: device file like /dev/devicename + * majorNumber: major device number + * minorNumber: minor device number + * busID: PCI Bus ID in format [[[[]:]]:][][.[]]. Can get from "lspci -D" + * isHealthy: true or false indicating device health status + * */ + private final Integer ID; + private final String devPath; + private final Integer majorNumber; + private final Integer minorNumber; + private final String busID; + private boolean isHealthy; + + /** + * Optional fields + * */ + private String status; + // TODO: topology and attributes + + private Device(Builder builder) { + this.ID = Objects.requireNonNull(builder.ID); + this.devPath = Objects.requireNonNull(builder.devPath); + this.majorNumber = Objects.requireNonNull(builder.majorNumber); + this.minorNumber = Objects.requireNonNull(builder.minorNumber); + this.busID = Objects.requireNonNull(builder.busID); + this.isHealthy = Objects.requireNonNull(builder.isHealthy); + } + + public Integer getID() { + return ID; + } + + public String getDevPath() { + return devPath; + } + + public Integer getMajorNumber() { + return majorNumber; + } + + public Integer getMinorNumber() { + return minorNumber; + } + + public String getBusID() { + return busID; + } + + public boolean isHealthy() { + return isHealthy; + } + + public String getStatus() { + return status; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + Device device = (Device) o; + return Objects.equals(ID, device.ID) && + Objects.equals(devPath, device.devPath) && + Objects.equals(majorNumber, device.majorNumber) && + Objects.equals(minorNumber, device.minorNumber) && + Objects.equals(busID, device.busID); + } + + @Override + public int hashCode() { + return Objects.hash(ID, devPath, majorNumber, minorNumber, busID); + } + + @Override + public int compareTo(Object o) { + if (o == null || (!(o instanceof Device))) { + return -1; + } + + Device other = (Device) o; + + int result = Integer.compare(ID, other.getID()); + if (0 != result) { + return result; + } + return Integer.compare(minorNumber, other.minorNumber); + } + + @Override + public String toString() { + return "(" + getDevPath() + ", " + getID() + ", " + getMajorNumber() + ":" + getMinorNumber() + ")"; + } + + public static class Builder { + private Integer ID; + private String devPath; + private Integer majorNumber; + private Integer minorNumber; + private String busID; + private boolean isHealthy; + private String status; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public Device build(){ + return new Device(this); + } + + public Builder setID(Integer ID) { + this.ID = ID; + return this; + } + + public Builder setDevPath(String devPath) { + this.devPath = devPath; + return this; + } + + public Builder setMajorNumber(Integer majorNumber) { + this.majorNumber = majorNumber; + return this; + } + + public Builder setMinorNumber(Integer minorNumber) { + this.minorNumber = minorNumber; + return this; + } + + public Builder setBusID(String busID) { + this.busID = busID; + return this; + } + + public Builder setHealthy(boolean healthy) { + isHealthy = healthy; + return this; + } + + public Builder setStatus(String status) { + this.status = status; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceConstants.java new file mode 100644 index 00000000000..82d8343cca0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceConstants.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +public class DeviceConstants { + /** + * Follows the "majorVersion.minorVersion.patchVersion" convention + * */ + public static final String version = "0.1.0"; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java new file mode 100644 index 00000000000..26c3c4699ad --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Set; + +/** + * Interface for vendor plugin to implement. + * */ +public interface DevicePlugin { + /** + * Called first when device plugin framework wants to register + * @return DeviceRegisterRequest {@link DeviceRegisterRequest} + * */ + DeviceRegisterRequest register(); + + /** + * Called when update node resource + * @return a set of {@link Device}, {@link java.util.TreeSet} recommended + * */ + Set getAndWatch(); + + /** + * Called before container launch. + * */ + DeviceRuntimeSpec preLaunchContainer(Set allocatedDevices); + + /** + * Called after container finish. + * */ + void postCompleteContainer(Set allocatedDevices); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java new file mode 100644 index 00000000000..fcd10dd7147 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Objects; + +/** + * A vendor device plugin use this object to register + * to NM device plugin framework + * */ +public class DeviceRegisterRequest { + + private final String version; + private final String resourceName; + + public DeviceRegisterRequest(Builder builder) { + this.version = Objects.requireNonNull(builder.version); + this.resourceName = Objects.requireNonNull(builder.resourceName); + } + + public String getResourceName() { + return resourceName; + } + + public String getVersion() { + return version; + } + + public static class Builder { + private String version; + private String resourceName; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public DeviceRegisterRequest build() { + return new DeviceRegisterRequest(this); + } + + // TODO: add sematic versioning pattern check + public Builder setVersion(String version) { + this.version = version; + return this; + } + + public Builder setResourceName(String resourceName) { + this.resourceName = resourceName; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java new file mode 100644 index 00000000000..dc2fdf19e3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +public class DeviceRuntimeSpec { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index f28aad206a6..0a3441656b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -21,11 +21,20 @@ import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceConstants; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceLocalScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,14 +58,15 @@ private Map configuredPlugins = Collections.emptyMap(); + private DeviceLocalScheduler deviceLocalScheduler = null; + public synchronized void initialize(Context context) throws YarnException { Configuration conf = context.getConf(); - String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); + Map pluginMap = new HashMap<>(); + String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); if (plugins != null) { - Map pluginMap = new HashMap<>(); - // Initialize each plugins for (String resourceName : plugins) { resourceName = resourceName.trim(); @@ -92,9 +102,111 @@ public synchronized void initialize(Context context) plugin.initialize(context); pluginMap.put(resourceName, plugin); } + } + // Try to load pluggable device plugins + initializePluggableDevicePlugins(context, conf, pluginMap); + configuredPlugins = Collections.unmodifiableMap(pluginMap); + } - configuredPlugins = Collections.unmodifiableMap(pluginMap); + public void initializePluggableDevicePlugins(Context context, + Configuration configuration, + Map pluginMap) { + boolean enable = configuration.getBoolean( + YarnConfiguration.NM_RESOURCE_PLUGINS_ENABLE_PLUGGABLE_DEVICE_FRAMEWORK, + YarnConfiguration.DEFAULT_NM_RESOURCE_PLUGINS_ENABLE_PLUGGABLE_DEVICE_FRAMEWORK); + if (!enable) { + LOG.info("The pluggable device framework is not enabled. If you want, set true to " + + YarnConfiguration.NM_RESOURCE_PLUGINS_ENABLE_PLUGGABLE_DEVICE_FRAMEWORK); + return; } + LOG.info("The pluggable device framework enabled, trying to load the vendor plugins"); + deviceLocalScheduler = new DeviceLocalScheduler(context); + String[] pluginClassNames = configuration.getStrings( + YarnConfiguration.NM_RESOURCE_PLUGINS_PLUGGABLE_CLASS); + if (null == pluginClassNames) { + throw new YarnRuntimeException("Null value found in configuration: " + + YarnConfiguration.NM_RESOURCE_PLUGINS_PLUGGABLE_CLASS); + } + for (String pluginClassName : pluginClassNames) { + try { + Class pluginClazz = Class.forName(pluginClassName); + if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) { + throw new YarnRuntimeException("Class: " + pluginClassName + + " not instance of " + DevicePlugin.class.getCanonicalName()); + } + DevicePlugin dpInstance = (DevicePlugin) ReflectionUtils.newInstance(pluginClazz, + configuration); + // Try to register plugin + // TODO: handle the plugin method timeout issue + DeviceRegisterRequest request = dpInstance.register(); + // Check version for compatibility + String pluginVersion = request.getVersion(); + if (!isVersionCompatible(pluginVersion)) { + LOG.error("Class: " + pluginClassName + " version: " + pluginVersion + + " is not compatible. Expected: " + DeviceConstants.version); + } + String resourceName = request.getResourceName(); + // check if someone has already registered this resource type name + if (pluginMap.containsKey(resourceName)) { + throw new YarnRuntimeException(resourceName + + " has already been registered! Please change a resource type name"); + } + // check resource name is valid and configured in resource-types.xml + if (!isValidAndConfiguredResourceName(resourceName)) { + throw new YarnRuntimeException(resourceName + + " is not configured inside " + + YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE + + " , please configure it first"); + } + LOG.info("New resource type: " + resourceName + + " registered successfully by " + pluginClassName); + DevicePluginAdapter pluginAdapter = new DevicePluginAdapter(this, + resourceName, dpInstance); + LOG.info("Adapter of " + pluginClassName + " created. Initializing.."); + try { + pluginAdapter.initialize(context); + } catch (YarnException e) { + throw new YarnRuntimeException("Adapter of " + pluginClassName + " init failed!"); + } + LOG.info("Adapter of " + pluginClassName + " init success!"); + // Store plugin as adapter instance + pluginMap.put(request.getResourceName(), pluginAdapter); + } catch (Exception e) { + throw new YarnRuntimeException("Could not instantiate pluggable vendor plugin: " + + pluginClassName, e); + } + } // end for + } + + // TODO: check resource name matching pattern + private boolean isValidAndConfiguredResourceName(String resourceName) { + // check pattern match + // check configured + Map configuredResourceTypes = + ResourceUtils.getResourceTypes(); + if (!configuredResourceTypes.containsKey(resourceName)) { + return false; + } + return true; + } + + private boolean isVersionCompatible(String pluginVersion) { + // semantic version + String[] svs = pluginVersion.split("\\."); + String[] currentsvs = DeviceConstants.version.split("\\."); + // should be same major version + if (Integer.valueOf(svs[0]) != Integer.valueOf(currentsvs[0])) { + return false; + } + // should be older minor version + if (Integer.valueOf(svs[1]) > Integer.valueOf(currentsvs[1])) { + return false; + } + return true; + } + + public DeviceLocalScheduler getDeviceLocalScheduler() { + return deviceLocalScheduler; } public synchronized void cleanup() throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceLocalScheduler.java new file mode 100644 index 00000000000..3cc29f7dcb6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceLocalScheduler.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +/** + * Schedule device resource based on requirements. It holds all device type resource + * */ +public class DeviceLocalScheduler { + final static Log LOG = LogFactory.getLog(DeviceLocalScheduler.class); + + private Context nmContext; + private static final int WAIT_MS_PER_LOOP = 1000; + + + /** + * Hold all type of devices + * key is the device resource name + * value is a sorted set of {@link Device} + * */ + private Map> allAllowedDevices = new HashMap<>(); + + /** + * Hold used devices + * key is the device resource name + * value is a sorted map of {@link Device} and {@link ContainerId} pairs + * */ + private Map> allUsedDevices = new HashMap<>(); + + public DeviceLocalScheduler(Context context) { + nmContext = context; + } + + public synchronized void addDeviceSet(String resourceName, Set deviceSet) { + LOG.info("Adding new resource: " + "type:" + resourceName + "," + deviceSet); + allAllowedDevices.put(resourceName, deviceSet); + allUsedDevices.put(resourceName, new TreeMap<>()); + } + + public synchronized DeviceAllocation assignDevices(String resourceName, Container container) + throws ResourceHandlerException { + DeviceAllocation allocation = internalAssignDevices(resourceName, container); + // Wait for a maximum of 120 seconds if no available Devices are there which + // are yet to be released. + final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP; + int timeWaiting = 0; + while (allocation == null) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + // Sleep for 1 sec to ensure there are some free devices which are + // getting released. + try { + LOG.info("Container : " + container.getContainerId() + + " is waiting for free " + resourceName + " devices."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + allocation = internalAssignDevices(resourceName, container); + } catch (InterruptedException e) { + // On any interrupt, break the loop and continue execution. + break; + } + } + + if(allocation == null) { + String message = "Could not get valid " + resourceName + " device for container '" + + container.getContainerId() + + "' as some other containers might not releasing them."; + LOG.warn(message); + throw new ResourceHandlerException(message); + } + return allocation; + } + + public synchronized DeviceAllocation internalAssignDevices(String resourceName, Container container) + throws ResourceHandlerException { + Resource requestedResource = container.getResource(); + ContainerId containerId = container.getContainerId(); + int requestedDeviceCount = getRequestedDeviceCount(resourceName, requestedResource); + // Assign devices to container if requested some. + if (requestedDeviceCount > 0) { + if (requestedDeviceCount > getAvailableDevices(resourceName)) { + // If there are some devices which are getting released, wait for few + // seconds to get it. + if (requestedDeviceCount <= getReleasingDevices(resourceName) + + getAvailableDevices(resourceName)) { + return null; + } + } + + int availableDeviceCount = getAvailableDevices(resourceName); + if (requestedDeviceCount > availableDeviceCount) { + throw new ResourceHandlerException( + "Failed to find enough " + resourceName + ", requestor=" + containerId + + ", #Requested=" + requestedDeviceCount + ", #available=" + + availableDeviceCount); + } + + Set assignedDevices = new TreeSet<>(); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + for (Device device : allowedDevices) { + if (!usedDevices.containsKey(device)) { + usedDevices.put(device, containerId); + assignedDevices.add(device); + if (assignedDevices.size() == requestedDeviceCount) { + break; + } + } + } + + // Record in state store if we allocated anything + if (!assignedDevices.isEmpty()) { + try { + // Update state store. + nmContext.getNMStateStore().storeAssignedResources(container, resourceName, + new ArrayList<>(assignedDevices)); + } catch (IOException e) { + cleanupAssignedDevices(resourceName, containerId); + throw new ResourceHandlerException(e); + } + } + + return new DeviceAllocation(resourceName, assignedDevices, + Sets.difference(allowedDevices, assignedDevices)); + } + return new DeviceAllocation(resourceName, null, + allAllowedDevices.get(resourceName)); + } + + public synchronized void recoverAssignedDevices(String resourceName, ContainerId containerId) + throws ResourceHandlerException{ + Container c = nmContext.getContainers().get(containerId); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + if (null == c) { + throw new ResourceHandlerException( + "This shouldn't happen, cannot find container with id=" + + containerId); + } + + for (Serializable deviceSerializable : c.getResourceMappings() + .getAssignedResources(resourceName)) { + if (!(deviceSerializable instanceof Device)) { + throw new ResourceHandlerException( + "Trying to recover device id, however it" + + " is not Device instance, this shouldn't happen"); + } + + Device device = (Device) deviceSerializable; + + // Make sure it is in allowed device. + if (!allowedDevices.contains(device)) { + throw new ResourceHandlerException( + "Try to recover device = " + device + + " however it is not in allowed device list:" + StringUtils + .join(",", allowedDevices)); + } + + // Make sure it is not occupied by anybody else + if (usedDevices.containsKey(device)) { + throw new ResourceHandlerException( + "Try to recover device id = " + device + + " however it is already assigned to container=" + usedDevices + .get(device) + ", please double check what happened."); + } + + usedDevices.put(device, containerId); + } + } + + public synchronized void cleanupAssignedDevices(String resourceName, ContainerId containerId) { + Iterator> iter = + allUsedDevices.get(resourceName).entrySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getValue().equals(containerId)) { + iter.remove(); + } + } + } + + public synchronized int getRequestedDeviceCount(String resourceName, Resource requestedResource) { + try { + return Long.valueOf(requestedResource.getResourceValue( + resourceName)).intValue(); + } catch (ResourceNotFoundException e) { + return 0; + } + } + + public synchronized int getAvailableDevices(String resourceName) { + return allAllowedDevices.get(resourceName).size() - + allUsedDevices.get(resourceName).size(); + } + + private synchronized long getReleasingDevices(String resourceName) { + long releasingDevices = 0; + Map used = allUsedDevices.get(resourceName); + Iterator> iter = used.entrySet() + .iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next().getValue(); + Container container; + if ((container = nmContext.getContainers().get(containerId)) != null) { + if (container.isContainerInFinalStates()) { + releasingDevices = releasingDevices + container.getResource() + .getResourceInformation(resourceName).getValue(); + } + } + } + return releasingDevices; + } + + static class DeviceAllocation { + private String resourceName; + private Set allowed = Collections.emptySet(); + private Set denied = Collections.emptySet(); + + DeviceAllocation(String resourceName, Set allowed, Setdenied) { + this.resourceName = resourceName; + if (allowed != null) { + this.allowed = ImmutableSet.copyOf(allowed); + } + if (denied != null) { + this.denied = ImmutableSet.copyOf(denied); + } + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java new file mode 100644 index 00000000000..16e7977f221 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo; + +import java.util.List; +import java.util.Set; + + +/** + * The {@link DevicePluginAdapter} will adapt existing hooks into vendor plugin's logic. + * It decouples the vendor plugin from YARN's device framework + * + * */ +public class DevicePluginAdapter extends NodeResourceUpdaterPlugin + implements ResourcePlugin, DockerCommandPlugin, ResourceHandler{ + final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class); + + private ResourcePluginManager devicePluginManager; + private String resourceName; + private DevicePlugin devicePlugin; + private DeviceLocalScheduler deviceLocalScheduler; + private CGroupsHandler cGroupsHandler; + private PrivilegedOperationExecutor privilegedOperationExecutor; + + public DevicePluginAdapter(ResourcePluginManager pluginManager, String name, DevicePlugin dp) { + devicePluginManager = pluginManager; + deviceLocalScheduler = pluginManager.getDeviceLocalScheduler(); + resourceName = name; + devicePlugin = dp; + } + + /** + * Act as a {@link NodeResourceUpdaterPlugin} to update the {@link Resource} + * + * */ + @Override + public void updateConfiguredResource(Resource res) throws YarnException { + LOG.info(resourceName + " plugin update resource "); + Set devices = devicePlugin.getAndWatch(); + if (devices == null) { + LOG.warn(resourceName + " plugin failed to discover resource ( null value got)." ); + return; + } + res.setResourceValue(resourceName, devices.size()); + } + + /** + * Act as a {@link ResourcePlugin} + * */ + @Override + public void initialize(Context context) throws YarnException { + LOG.info(resourceName + " plugin adapter initialized"); + return; + } + + @Override + public ResourceHandler createResourceHandler(Context nmContext, CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperationExecutor) { + this.cGroupsHandler = cGroupsHandler; + this.privilegedOperationExecutor = privilegedOperationExecutor; + return this; + } + + @Override + public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() { + return this; + } + + @Override + public void cleanup() throws YarnException { + + } + + @Override + public DockerCommandPlugin getDockerCommandPluginInstance() { + return this; + } + + @Override + public NMResourceInfo getNMResourceInfo() throws YarnException { + return null; + } + + /** + * Act as a {@link DockerCommandPlugin} to hook the + * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime} + * */ + @Override + public void updateDockerRunCommand(DockerRunCommand dockerRunCommand, Container container) + throws ContainerExecutionException { + + } + + @Override + public DockerVolumeCommand getCreateDockerVolumeCommand(Container container) + throws ContainerExecutionException { + return null; + } + + @Override + public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container) + throws ContainerExecutionException { + return null; + } + + /** + * Act as a {@link ResourceHandler} + * */ + @Override + public List bootstrap(Configuration configuration) throws ResourceHandlerException { + Set availableDevices = devicePlugin.getAndWatch(); + + /** + * We won't fail the NM if plugin returns invalid value here. + * // TODO: we should update RM's resource count if something wrong + * */ + if (availableDevices == null) { + LOG.error("Bootstrap " + resourceName + " failed. Null value got from plugin's getAndWatch method"); + return null; + } + // Add device set. Here we trust the plugin's return value + deviceLocalScheduler.addDeviceSet(resourceName, availableDevices); + // TODO: Init cgroups + + return null; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + DeviceLocalScheduler.DeviceAllocation allocation = deviceLocalScheduler.assignDevices( + resourceName, container); + /** + * TODO: implement a general container-executor device module to do isolation + * */ + return null; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + deviceLocalScheduler.recoverAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + deviceLocalScheduler.cleanupAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List teardown() throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java new file mode 100644 index 00000000000..64a18939323 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +public class FakeDevicePlugin implements DevicePlugin { + @Override + public DeviceRegisterRequest register() { + return DeviceRegisterRequest.Builder.newInstance() + .setVersion(DeviceConstants.version).setResourceName("cmp.com/cmp").build(); + } + + @Override + public Set getAndWatch() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/cmp0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec preLaunchContainer(Set allocatedDevices) { + return null; + } + + @Override + public void postCompleteContainer(Set allocatedDevices) { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java index 6ed7c568899..3e5dcfa9037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java @@ -264,4 +264,30 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) { } Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded); } + + @Test(timeout = 30000) + public void testPluggableDeviceFrameworkEnabled() { + + } + + @Test(timeout = 30000) + public void testPluggableDeviceFrameworkDisabled() { + + } + + @Test(timeout = 30000) + public void testDuplicatedDevicePluginRegistration() { + + } + + @Test(timeout = 30000) + public void testPluginVersionCompatible() { + + } + + @Test(timeout = 30000) + public void testPluginRegistrationNameValid() { + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java new file mode 100644 index 00000000000..f9c32d9832e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.junit.Test; + +public class TestDevicePluginAdapter { + + @Test + public void testBootstrap() { + + } + + @Test + public void testAllocation() { + + } + + @Test + public void testRecoverAllocation() { + + } + + @Test + public void testStoreAllocation() { + + } +}