volumeClaims;
+
+ private Builder() {
+ runtime = DeviceRuntimeSpec.RUNTIME_DOCKER;
+ envs = new HashMap<>();
+ volumeClaims = new TreeSet<>();
+ deviceMounts = new TreeSet<>();
+ volumeMounts = new TreeSet<>();
+ }
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public DeviceRuntimeSpec build() {
+ return new DeviceRuntimeSpec(this);
+ }
+
+ public Builder setRuntime(String runtime) {
+ this.runtime = runtime;
+ return this;
+ }
+
+ public Builder addVolumeSpec(VolumeSpec spec) {
+ this.volumeClaims.add(spec);
+ return this;
+ }
+
+ public Builder addMountVolumeSpec(MountVolumeSpec spec) {
+ this.volumeMounts.add(spec);
+ return this;
+ }
+
+ public Builder addMountDeviceSpec(MountDeviceSpec spec) {
+ this.deviceMounts.add(spec);
+ return this;
+ }
+
+ public Builder addEnv(String key, String value) {
+ this.envs.put(Objects.requireNonNull(key),
+ Objects.requireNonNull(value));
+ return this;
+ }
+
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java
new file mode 100644
index 00000000000..c4a616e78c2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MountDeviceSpec implements Serializable, Comparable {
+
+ private final String devicePathInHost;
+ private final String devicePathInContainer;
+
+ // r for only read, rw can do read and write
+ private final String devicePermission;
+
+ public final static String RO = "r";
+ public final static String RW = "rw";
+
+ private MountDeviceSpec(Builder builder) {
+ this.devicePathInContainer = builder.devicePathInContainer;
+ this.devicePathInHost = builder.devicePathInHost;
+ this.devicePermission = builder.devicePermission;
+ }
+
+ public String getDevicePathInHost() {
+ return devicePathInHost;
+ }
+
+ public String getDevicePathInContainer() {
+ return devicePathInContainer;
+ }
+
+ public String getDevicePermission() {
+ return devicePermission;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ MountDeviceSpec other = (MountDeviceSpec) o;
+ return Objects.equals(devicePathInHost, other.devicePathInHost) &&
+ Objects.equals(devicePathInContainer, other.devicePathInContainer) &&
+ Objects.equals(devicePermission, other.devicePermission);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ public static class Builder {
+ private String devicePathInHost;
+ private String devicePathInContainer;
+ private String devicePermission;
+
+ private Builder() {}
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public MountDeviceSpec build() {
+ return new MountDeviceSpec(this);
+ }
+
+ public Builder setDevicePermission(String permission) {
+ this.devicePermission = permission;
+ return this;
+ }
+
+ public Builder setDevicePathInContainer(String devicePathInContainer) {
+ this.devicePathInContainer = devicePathInContainer;
+ return this;
+ }
+
+ public Builder setDevicePathInHost(String devicePathInHost) {
+ this.devicePathInHost = devicePathInHost;
+ return this;
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java
new file mode 100644
index 00000000000..4612a2115aa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MountVolumeSpec implements Serializable, Comparable{
+
+ private static final long serialVersionUID = 1L;
+
+ // host path or volume name
+ private final String hostPath;
+
+ // path in the container
+ private final String mountPath;
+
+ // if true, data in mountPath can only be read
+ // "-v hostPath:mountPath:ro"
+ private final Boolean isReadOnly;
+
+ public final static String READONLYOPTION = "ro";
+
+ private MountVolumeSpec (Builder builder) {
+ this.hostPath = builder.hostPath;
+ this.mountPath = builder.mountPath;
+ this.isReadOnly = builder.isReadOnly;
+ }
+
+ public String getHostPath() {
+ return hostPath;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public Boolean getReadOnly() {
+ return isReadOnly;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ MountVolumeSpec other = (MountVolumeSpec) o;
+ return Objects.equals(hostPath, other.hostPath) &&
+ Objects.equals(mountPath, other.mountPath) &&
+ Objects.equals(isReadOnly, other.isReadOnly);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ public static class Builder {
+ private String hostPath;
+ private String mountPath;
+ private Boolean isReadOnly;
+
+ private Builder() {}
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public MountVolumeSpec build() {
+ return new MountVolumeSpec(this);
+ }
+
+ public Builder setHostPath(String hostPath) {
+ this.hostPath = hostPath;
+ return this;
+ }
+
+ public Builder setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ return this;
+ }
+
+ public Builder setReadOnly(Boolean readOnly) {
+ isReadOnly = readOnly;
+ return this;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java
new file mode 100644
index 00000000000..e221326a731
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class VolumeSpec implements Serializable, Comparable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String volumeDriver;
+ private final String volumeName;
+ private final String volumeOperation;
+
+ public final static String CREATE = "create";
+ public final static String DELETE = "delete";
+
+ private VolumeSpec(Builder builder) {
+ this.volumeDriver = builder.volumeDriver;
+ this.volumeName = builder.volumeName;
+ this.volumeOperation = builder.volumeOperation;
+ }
+
+ public String getVolumeDriver() {
+ return volumeDriver;
+ }
+
+ public String getVolumeName() {
+ return volumeName;
+ }
+
+ public String getVolumeOperation() {
+ return volumeOperation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ VolumeSpec other = (VolumeSpec) o;
+ return Objects.equals(volumeDriver, other.volumeDriver) &&
+ Objects.equals(volumeName, other.volumeName) &&
+ Objects.equals(volumeOperation, other.volumeOperation);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ public static class Builder {
+ private String volumeDriver;
+ private String volumeName;
+ private String volumeOperation;
+
+ private Builder(){}
+
+ public static Builder newInstance () {
+ return new Builder();
+ }
+
+ public VolumeSpec build() {
+ return new VolumeSpec(this);
+ }
+
+ public Builder setVolumeDriver(String volumeDriver) {
+ this.volumeDriver = volumeDriver;
+ return this;
+ }
+
+ public Builder setVolumeName(String volumeName) {
+ this.volumeName = volumeName;
+ return this;
+ }
+
+ public Builder setVolumeOperation(String volumeOperation) {
+ this.volumeOperation = volumeOperation;
+ return this;
+ }
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
index f28aad206a6..de692918635 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
@@ -18,14 +18,24 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceSchedulerManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,14 +59,15 @@
private Map configuredPlugins =
Collections.emptyMap();
+ private DeviceSchedulerManager deviceSchedulerManager = null;
+
public synchronized void initialize(Context context)
- throws YarnException {
+ throws YarnException, ClassNotFoundException {
Configuration conf = context.getConf();
- String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
+ Map pluginMap = new HashMap<>();
+ String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
if (plugins != null) {
- Map pluginMap = new HashMap<>();
-
// Initialize each plugins
for (String resourceName : plugins) {
resourceName = resourceName.trim();
@@ -92,9 +103,101 @@ public synchronized void initialize(Context context)
plugin.initialize(context);
pluginMap.put(resourceName, plugin);
}
+ }
+ // Try to load pluggable device plugins
+ boolean puggableDeviceFrameworkEnabled = conf.getBoolean(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+ if (puggableDeviceFrameworkEnabled) {
+ LOG.info("The pluggable device framework is not enabled. If you want, set true to " +
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+ initializePluggableDevicePlugins(context, conf, pluginMap);
+ }
+ configuredPlugins = Collections.unmodifiableMap(pluginMap);
+ }
- configuredPlugins = Collections.unmodifiableMap(pluginMap);
+ public void initializePluggableDevicePlugins(Context context,
+ Configuration configuration,
+ Map pluginMap)
+ throws YarnRuntimeException, ClassNotFoundException {
+ LOG.info("The pluggable device framework enabled, trying to load the vendor plugins");
+ deviceSchedulerManager = new DeviceSchedulerManager(context);
+ String[] pluginClassNames = configuration.getStrings(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
+ if (null == pluginClassNames) {
+ throw new YarnRuntimeException("Null value found in configuration: " +
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
}
+ // Check if framework prefer customized device scheduler
+ Boolean ifPrefer = configuration.getBoolean(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER,
+ YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER
+ );
+ deviceSchedulerManager.setPreferCustomizedScheduler(ifPrefer);
+ LOG.info("If the device plugin framework prefer customized device scheduler:" +
+ ifPrefer);
+ for (String pluginClassName : pluginClassNames) {
+ Class> pluginClazz = Class.forName(pluginClassName);
+ if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) {
+ throw new YarnRuntimeException("Class: " + pluginClassName
+ + " not instance of " + DevicePlugin.class.getCanonicalName());
+ }
+ DevicePlugin dpInstance = (DevicePlugin) ReflectionUtils.newInstance(pluginClazz,
+ configuration);
+ // Try to register plugin
+ // TODO: handle the plugin method timeout issue
+ DeviceRegisterRequest request = dpInstance.register();
+ String resourceName = request.getResourceName();
+ // check if someone has already registered this resource type name
+ if (pluginMap.containsKey(resourceName)) {
+ throw new YarnRuntimeException(resourceName +
+ " has already been registered! Please change a resource type name");
+ }
+ // check resource name is valid and configured in resource-types.xml
+ if (!isValidAndConfiguredResourceName(resourceName)) {
+ throw new YarnRuntimeException(resourceName
+ + " is not configured inside "
+ + YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE +
+ " , please configure it first");
+ }
+ LOG.info("New resource type: " + resourceName +
+ " registered successfully by " + pluginClassName);
+ DevicePluginAdapter pluginAdapter = new DevicePluginAdapter(this,
+ resourceName, dpInstance);
+ LOG.info("Adapter of " + pluginClassName + " created. Initializing..");
+ try {
+ pluginAdapter.initialize(context);
+ } catch (YarnException e) {
+ throw new YarnRuntimeException("Adapter of " + pluginClassName + " init failed!");
+ }
+ LOG.info("Adapter of " + pluginClassName + " init success!");
+ // Store plugin as adapter instance
+ pluginMap.put(request.getResourceName(), pluginAdapter);
+ // If the device plugin implements DevicePluginScheduler interface
+ if (dpInstance instanceof DevicePluginScheduler) {
+ LOG.info(pluginClassName +
+ " can schedule " + resourceName + " devices. Added as preferred device plugin scheduler");
+ deviceSchedulerManager.addDevicePluginScheduler(
+ resourceName,
+ (DevicePluginScheduler)dpInstance);
+ }
+ } // end for
+ }
+
+ @VisibleForTesting
+ public boolean isValidAndConfiguredResourceName(String resourceName) {
+ // check pattern match
+ // check configured
+ Map configuredResourceTypes =
+ ResourceUtils.getResourceTypes();
+ if (!configuredResourceTypes.containsKey(resourceName)) {
+ return false;
+ }
+ return true;
+ }
+
+ public DeviceSchedulerManager getDeviceSchedulerManager() {
+ return deviceSchedulerManager;
}
public synchronized void cleanup() throws YarnException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java
new file mode 100644
index 00000000000..0c38f9d324d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
+
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * The {@link DevicePluginAdapter} will adapt existing hooks into vendor plugin's logic.
+ * It decouples the vendor plugin from YARN's device framework
+ *
+ * */
+public class DevicePluginAdapter extends NodeResourceUpdaterPlugin
+ implements ResourcePlugin, DockerCommandPlugin, ResourceHandler{
+ final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class);
+
+ private ResourcePluginManager devicePluginManager;
+ private String resourceName;
+ private DevicePlugin devicePlugin;
+ private DeviceSchedulerManager deviceSchedulerManager;
+ private CGroupsHandler cGroupsHandler;
+ private PrivilegedOperationExecutor privilegedOperationExecutor;
+
+ public DevicePluginAdapter(ResourcePluginManager pluginManager, String name, DevicePlugin dp) {
+ devicePluginManager = pluginManager;
+ deviceSchedulerManager = pluginManager.getDeviceSchedulerManager();
+ resourceName = name;
+ devicePlugin = dp;
+ }
+
+ /**
+ * Act as a {@link NodeResourceUpdaterPlugin} to update the {@link Resource}
+ *
+ * */
+ @Override
+ public void updateConfiguredResource(Resource res) throws YarnException {
+ LOG.info(resourceName + " plugin update resource ");
+ Set devices = devicePlugin.getDevices();
+ if (devices == null) {
+ LOG.warn(resourceName + " plugin failed to discover resource ( null value got)." );
+ return;
+ }
+ res.setResourceValue(resourceName, devices.size());
+ }
+
+ /**
+ * Act as a {@link ResourcePlugin}
+ * */
+ @Override
+ public void initialize(Context context) throws YarnException {
+ LOG.info(resourceName + " plugin adapter initialized");
+ return;
+ }
+
+ @Override
+ public ResourceHandler createResourceHandler(Context nmContext, CGroupsHandler cGroupsHandler,
+ PrivilegedOperationExecutor privilegedOperationExecutor) {
+ this.cGroupsHandler = cGroupsHandler;
+ this.privilegedOperationExecutor = privilegedOperationExecutor;
+ return this;
+ }
+
+ @Override
+ public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() {
+ return this;
+ }
+
+ @Override
+ public void cleanup() throws YarnException {
+
+ }
+
+ @Override
+ public DockerCommandPlugin getDockerCommandPluginInstance() {
+ return this;
+ }
+
+ @Override
+ public NMResourceInfo getNMResourceInfo() throws YarnException {
+ return null;
+ }
+
+ /**
+ * Act as a {@link DockerCommandPlugin} to hook the
+ * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime}
+ * */
+ @Override
+ public void updateDockerRunCommand(DockerRunCommand dockerRunCommand, Container container)
+ throws ContainerExecutionException {
+
+ }
+
+ @Override
+ public DockerVolumeCommand getCreateDockerVolumeCommand(Container container)
+ throws ContainerExecutionException {
+ return null;
+ }
+
+ @Override
+ public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container)
+ throws ContainerExecutionException {
+ return null;
+ }
+
+ /**
+ * Act as a {@link ResourceHandler}
+ * */
+ @Override
+ public List bootstrap(Configuration configuration) throws ResourceHandlerException {
+ Set availableDevices = devicePlugin.getDevices();
+
+ /**
+ * We won't fail the NM if plugin returns invalid value here.
+ * // TODO: we should update RM's resource count if something wrong
+ * */
+ if (availableDevices == null) {
+ LOG.error("Bootstrap " + resourceName + " failed. Null value got from plugin's getDevices method");
+ return null;
+ }
+ // Add device set. Here we trust the plugin's return value
+ deviceSchedulerManager.addDeviceSet(resourceName, availableDevices);
+ // TODO: Init cgroups
+
+ return null;
+ }
+
+ @Override
+ public List preStart(Container container)
+ throws ResourceHandlerException {
+ String containerIdStr = container.getContainerId().toString();
+ DeviceSchedulerManager.DeviceAllocation allocation = deviceSchedulerManager.assignDevices(
+ resourceName, container);
+ LOG.debug("Allocated to " +
+ containerIdStr + ": " + allocation );
+ /**
+ * TODO: implement a general container-executor device module to do isolation
+ * */
+ return null;
+ }
+
+ @Override
+ public List reacquireContainer(ContainerId containerId)
+ throws ResourceHandlerException {
+ deviceSchedulerManager.recoverAssignedDevices(resourceName, containerId);
+ return null;
+ }
+
+ @Override
+ public List updateContainer(Container container)
+ throws ResourceHandlerException {
+ return null;
+ }
+
+ @Override
+ public List postComplete(ContainerId containerId)
+ throws ResourceHandlerException {
+ deviceSchedulerManager.cleanupAssignedDevices(resourceName, containerId);
+ return null;
+ }
+
+ @Override
+ public List teardown() throws ResourceHandlerException {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java
new file mode 100644
index 00000000000..0d9c2996a61
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * Schedule device resource based on requirements and do book keeping
+ * It holds all device type resource and can do scheduling as a default scheduler
+ * If one resource type brings its own scheduler. That will be used.
+ * */
+public class DeviceSchedulerManager {
+ final static Log LOG = LogFactory.getLog(DeviceSchedulerManager.class);
+
+ private Context nmContext;
+ private static final int WAIT_MS_PER_LOOP = 1000;
+
+ // Holds vendor implemented scheduler
+ private Map devicePluginSchedulers =
+ new HashMap<>();
+
+ private boolean preferCustomizedScheduler = false;
+
+ @VisibleForTesting
+ public Map> getAllAllowedDevices() {
+ return allAllowedDevices;
+ }
+
+ @VisibleForTesting
+ public Map> getAllUsedDevices() {
+ return allUsedDevices;
+ }
+
+ /**
+ * Hold all type of devices
+ * key is the device resource name
+ * value is a sorted set of {@link Device}
+ * */
+ private Map> allAllowedDevices = new HashMap<>();
+
+ /**
+ * Hold used devices
+ * key is the device resource name
+ * value is a sorted map of {@link Device} and {@link ContainerId} pairs
+ * */
+ private Map> allUsedDevices = new HashMap<>();
+
+ public DeviceSchedulerManager(Context context) {
+ nmContext = context;
+ }
+
+ public synchronized void addDeviceSet(String resourceName, Set deviceSet) {
+ LOG.info("Adding new resource: " + "type:" + resourceName + "," + deviceSet);
+ allAllowedDevices.put(resourceName, new TreeSet<>(deviceSet));
+ allUsedDevices.put(resourceName, new TreeMap<>());
+ }
+
+ public synchronized DeviceAllocation assignDevices(String resourceName, Container container)
+ throws ResourceHandlerException {
+ DeviceAllocation allocation = internalAssignDevices(resourceName, container);
+ // Wait for a maximum of 120 seconds if no available Devices are there which
+ // are yet to be released.
+ final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP;
+ int timeWaiting = 0;
+ while (allocation == null) {
+ if (timeWaiting >= timeoutMsecs) {
+ break;
+ }
+
+ // Sleep for 1 sec to ensure there are some free devices which are
+ // getting released.
+ try {
+ LOG.info("Container : " + container.getContainerId()
+ + " is waiting for free " + resourceName + " devices.");
+ Thread.sleep(WAIT_MS_PER_LOOP);
+ timeWaiting += WAIT_MS_PER_LOOP;
+ allocation = internalAssignDevices(resourceName, container);
+ } catch (InterruptedException e) {
+ // On any interrupt, break the loop and continue execution.
+ break;
+ }
+ }
+
+ if(allocation == null) {
+ String message = "Could not get valid " + resourceName + " device for container '" +
+ container.getContainerId()
+ + "' as some other containers might not releasing them.";
+ LOG.warn(message);
+ throw new ResourceHandlerException(message);
+ }
+ return allocation;
+ }
+
+ public synchronized DeviceAllocation internalAssignDevices(String resourceName, Container container)
+ throws ResourceHandlerException {
+ Resource requestedResource = container.getResource();
+ ContainerId containerId = container.getContainerId();
+ int requestedDeviceCount = getRequestedDeviceCount(resourceName, requestedResource);
+ // Assign devices to container if requested some.
+ if (requestedDeviceCount > 0) {
+ if (requestedDeviceCount > getAvailableDevices(resourceName)) {
+ // If there are some devices which are getting released, wait for few
+ // seconds to get it.
+ if (requestedDeviceCount <= getReleasingDevices(resourceName) +
+ getAvailableDevices(resourceName)) {
+ return null;
+ }
+ }
+
+ int availableDeviceCount = getAvailableDevices(resourceName);
+ if (requestedDeviceCount > availableDeviceCount) {
+ throw new ResourceHandlerException(
+ "Failed to find enough " + resourceName + ", requestor=" + containerId
+ + ", #Requested=" + requestedDeviceCount + ", #available="
+ + availableDeviceCount);
+ }
+
+ Set assignedDevices = new TreeSet<>();
+ Map usedDevices = allUsedDevices.get(resourceName);
+ Set allowedDevices = allAllowedDevices.get(resourceName);
+
+ DevicePluginScheduler dps = devicePluginSchedulers.get(resourceName);
+ // Use default schedule logic if not prefer or dps not exists or
+ // (prefer but null instance)
+ if ((!isPreferCustomizedScheduler() || null == dps)
+ || (isPreferCustomizedScheduler() && null == dps)) {
+ if (!isPreferCustomizedScheduler()) {
+ LOG.debug("Customized device plugin scheduler is not preferred, use default logic");
+ }
+
+ if (isPreferCustomizedScheduler() && null == dps) {
+ LOG.debug("Customized device plugin scheduler is preferred but not implemented, use default logic");
+ }
+
+ for (Device device : allowedDevices) {
+ if (!usedDevices.containsKey(device)) {
+ usedDevices.put(device, containerId);
+ assignedDevices.add(device);
+ if (assignedDevices.size() == requestedDeviceCount) {
+ break;
+ }
+ }
+ }
+ } else {
+ if(isPreferCustomizedScheduler() && null != dps) {
+ LOG.debug("LOG.debug(\"Customized device plugin scheduler is preferred and implemented," +
+ "use customized logic\");");
+ }
+ // Use customized device scheduler
+ LOG.debug("Try to schedule " + requestedDeviceCount
+ + "(" + resourceName + ") using " + dps.getClass());
+ Set dpsAllocated = dps.allocateDevices(
+ Sets.difference(allowedDevices, usedDevices.keySet()),
+ requestedDeviceCount);
+ if (dpsAllocated.size() != requestedDeviceCount) {
+ throw new ResourceHandlerException(dps.getClass() + " should allocate "
+ + requestedDeviceCount
+ + " of " + resourceName + ", but actual: "
+ + assignedDevices.size());
+ // TODO: fall back to default schedule logic?
+ }
+ // copy
+ assignedDevices.addAll(dpsAllocated);
+ // Store assigned devices into usedDevices
+ for (Device device : assignedDevices) {
+ usedDevices.put(device, containerId);
+ }
+ }
+ // Record in state store if we allocated anything
+ if (!assignedDevices.isEmpty()) {
+ try {
+ // Update state store.
+ nmContext.getNMStateStore().storeAssignedResources(container, resourceName,
+ new ArrayList<>(assignedDevices));
+ } catch (IOException e) {
+ cleanupAssignedDevices(resourceName, containerId);
+ throw new ResourceHandlerException(e);
+ }
+ }
+
+ return new DeviceAllocation(resourceName, assignedDevices,
+ Sets.difference(allowedDevices, assignedDevices));
+ }
+ return new DeviceAllocation(resourceName, null,
+ allAllowedDevices.get(resourceName));
+ }
+
+ public synchronized void recoverAssignedDevices(String resourceName, ContainerId containerId)
+ throws ResourceHandlerException{
+ Container c = nmContext.getContainers().get(containerId);
+ Map usedDevices = allUsedDevices.get(resourceName);
+ Set allowedDevices = allAllowedDevices.get(resourceName);
+ if (null == c) {
+ throw new ResourceHandlerException(
+ "This shouldn't happen, cannot find container with id="
+ + containerId);
+ }
+
+ for (Serializable deviceSerializable : c.getResourceMappings()
+ .getAssignedResources(resourceName)) {
+ if (!(deviceSerializable instanceof Device)) {
+ throw new ResourceHandlerException(
+ "Trying to recover device id, however it"
+ + " is not Device instance, this shouldn't happen");
+ }
+
+ Device device = (Device) deviceSerializable;
+
+ // Make sure it is in allowed device.
+ if (!allowedDevices.contains(device)) {
+ throw new ResourceHandlerException(
+ "Try to recover device = " + device
+ + " however it is not in allowed device list:" + StringUtils
+ .join(",", allowedDevices));
+ }
+
+ // Make sure it is not occupied by anybody else
+ if (usedDevices.containsKey(device)) {
+ throw new ResourceHandlerException(
+ "Try to recover device id = " + device
+ + " however it is already assigned to container="
+ + usedDevices.get(device)
+ + ", please double check what happened.");
+ }
+
+ usedDevices.put(device, containerId);
+ }
+ }
+
+ public synchronized void cleanupAssignedDevices(String resourceName, ContainerId containerId) {
+ Iterator> iter =
+ allUsedDevices.get(resourceName).entrySet().iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getValue().equals(containerId)) {
+ iter.remove();
+ }
+ }
+ }
+
+ public synchronized int getRequestedDeviceCount(String resourceName, Resource requestedResource) {
+ try {
+ return Long.valueOf(requestedResource.getResourceValue(
+ resourceName)).intValue();
+ } catch (ResourceNotFoundException e) {
+ return 0;
+ }
+ }
+
+ public synchronized int getAvailableDevices(String resourceName) {
+ return allAllowedDevices.get(resourceName).size() -
+ allUsedDevices.get(resourceName).size();
+ }
+
+ private synchronized long getReleasingDevices(String resourceName) {
+ long releasingDevices = 0;
+ Map used = allUsedDevices.get(resourceName);
+ Iterator> iter = used.entrySet()
+ .iterator();
+ while (iter.hasNext()) {
+ ContainerId containerId = iter.next().getValue();
+ Container container;
+ if ((container = nmContext.getContainers().get(containerId)) != null) {
+ if (container.isContainerInFinalStates()) {
+ releasingDevices = releasingDevices + container.getResource()
+ .getResourceInformation(resourceName).getValue();
+ }
+ }
+ }
+ return releasingDevices;
+ }
+
+ public boolean isPreferCustomizedScheduler() {
+ return preferCustomizedScheduler;
+ }
+
+ public void setPreferCustomizedScheduler(boolean preferCustomizedScheduler) {
+ this.preferCustomizedScheduler = preferCustomizedScheduler;
+ }
+
+ static class DeviceAllocation {
+ private String resourceName;
+ private Set allowed = Collections.emptySet();
+ private Set denied = Collections.emptySet();
+
+ DeviceAllocation(String resourceName, Set allowed, Setdenied) {
+ this.resourceName = resourceName;
+ if (allowed != null) {
+ this.allowed = ImmutableSet.copyOf(allowed);
+ }
+ if (denied != null) {
+ this.denied = ImmutableSet.copyOf(denied);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ResourceType: " + resourceName +
+ ", Allowed Devices: " + allowed +
+ ", Denied Devices: " + denied;
+ }
+
+ }
+
+ @VisibleForTesting
+ public void addDevicePluginScheduler(String resourceName,
+ DevicePluginScheduler s) {
+ this.devicePluginSchedulers.put(resourceName,
+ Objects.requireNonNull(s));
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java
new file mode 100644
index 00000000000..f4e47dc1ceb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FakeDevicePlugin
+ implements DevicePlugin,DevicePluginScheduler {
+ public final static String resourceName = "cmp.com/cmp";
+ @Override
+ public DeviceRegisterRequest register() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName(resourceName)
+ .setPluginVersion("v1.0").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/cmp0")
+ .setMajorNumber(243)
+ .setMinorNumber(0)
+ .setBusID("0000:65:00.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) {
+ return null;
+ }
+
+ @Override
+ public void OnDevicesReleased(Set allocatedDevices) {
+
+ }
+
+ @Override
+ public Set allocateDevices(Set availableDevices, Integer count) {
+ Set allocated = new TreeSet<>();
+ int number = 0;
+ for (Device d : availableDevices) {
+ allocated.add(d);
+ number++;
+ if (number == count) {
+ break;
+ }
+ }
+ return allocated;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
index 6ed7c568899..1f748826ffd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@@ -41,27 +43,41 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin1;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin2;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin3;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.util.*;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestResourcePluginManager extends NodeManagerTestBase {
private NodeManager nm;
+ private YarnConfiguration conf;
+
+ private String tempResourceTypesFile;
+
+ @Before
+ public void setup() throws Exception {
+ this.conf = createNMConfig();
+ // setup resource-types.xml
+ ResourceUtils.resetResourceTypes();
+ String resourceTypesFile = "resource-types-pluggable-devices.xml";
+ this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile);
+ }
+
ResourcePluginManager stubResourcePluginmanager() {
// Stub ResourcePluginManager
final ResourcePluginManager rpm = mock(ResourcePluginManager.class);
@@ -94,6 +110,11 @@ public void tearDown() {
// ignore
}
}
+ // cleanup resource-types.xml
+ File dest = new File(this.tempResourceTypesFile);
+ if (dest.exists()) {
+ dest.delete();
+ }
}
private class CustomizedResourceHandler implements ResourceHandler {
@@ -145,7 +166,7 @@ public MyMockNM(ResourcePluginManager rpm) {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- ((NodeManager.NMContext)context).setResourcePluginManager(rpm);
+ ((NodeManager.NMContext) context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
metrics, new BaseResourceTrackerForTest());
}
@@ -157,7 +178,7 @@ protected ContainerManagerImpl createContainerManager(Context context,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
- metrics, diskhandler);
+ metrics, diskhandler);
}
@Override
@@ -222,7 +243,7 @@ public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Except
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- ((NMContext)context).setResourcePluginManager(rpm);
+ ((NMContext) context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
metrics, new BaseResourceTrackerForTest());
}
@@ -239,7 +260,7 @@ protected ContainerManagerImpl createContainerManager(Context context,
@Override
protected ContainerExecutor createContainerExecutor(Configuration conf) {
- ((NMContext)this.getNMContext()).setResourcePluginManager(rpm);
+ ((NMContext) this.getNMContext()).setResourcePluginManager(rpm);
lce.setConf(conf);
return lce;
}
@@ -257,6 +278,9 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) {
boolean newHandlerAdded = false;
for (ResourceHandler h : ((ResourceHandlerChain) handler)
.getResourceHandlerList()) {
+ if (h instanceof DevicePluginAdapter) {
+ Assert.assertTrue(false);
+ }
if (h instanceof CustomizedResourceHandler) {
newHandlerAdded = true;
break;
@@ -264,4 +288,175 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) {
}
Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded);
}
+
+ // Disabled pluggable framework.
+ // We use spy object of real rpm to verify "initializePluggableDevicePlugins"
+ // because use mock rpm will not working
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkDisabled() throws Exception {
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ false);
+ nm.init(conf);
+ nm.start();
+ verify(rpmSpy, times(1)).initialize(
+ any(Context.class));
+ verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ }
+
+ // no configuration set.
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkDisabled2() throws Exception {
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ nm.init(conf);
+ nm.start();
+ verify(rpmSpy, times(1)).initialize(
+ any(Context.class));
+ verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ }
+
+ // Enable framework and configure pluggable device classes
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkEnabled() throws Exception {
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeDevicePlugin.class.getCanonicalName());
+ nm.init(conf);
+ nm.start();
+ verify(rpmSpy, times(1)).initialize(
+ any(Context.class));
+ verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ }
+
+ // Enable pluggable framework, but leave device classes un-configured
+ // initializePluggableDevicePlugins invoked but it should throw an exception
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkEnabled2()
+ throws ClassNotFoundException{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+ Boolean fail = false;
+ try {
+ YarnConfiguration conf = createNMConfig();
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+
+ nm.init(conf);
+ nm.start();
+ } catch (YarnRuntimeException e) {
+ fail = true;
+ } catch (Exception e) {
+
+ }
+ verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ Assert.assertTrue(fail);
+ }
+
+ @Test(timeout = 30000)
+ public void testNormalInitializationOfPluggableDeviceClasses()
+ throws Exception{
+
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeTestDevicePlugin1.class.getCanonicalName() + "," +
+ FakeDevicePlugin.class.getCanonicalName());
+ nm.init(conf);
+ nm.start();
+ Map pluginMap = rpmSpy.getNameToPlugins();
+ Assert.assertEquals(2,pluginMap.size());
+ ResourcePlugin rp = pluginMap.get("cmp.com/cmp");
+ if (! (rp instanceof DevicePluginAdapter)) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ // Fail to load a class which doesn't implement interface DevicePlugin
+ @Test(timeout = 30000)
+ public void testLoadInvalidPluggableDeviceClasses()
+ throws Exception{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeTestDevicePlugin2.class.getCanonicalName());
+
+ String expectedMessage = "Class: " + FakeTestDevicePlugin2.class.getCanonicalName()
+ + " not instance of " + DevicePlugin.class.getCanonicalName();
+ String actualMessage = "";
+ try {
+ nm.init(conf);
+ nm.start();
+ } catch (YarnRuntimeException e) {
+ actualMessage = e.getMessage();
+ }
+ Assert.assertEquals(expectedMessage, actualMessage);
+ }
+
+ @Test(timeout = 30000)
+ public void testLoadDuplicateResourceNameDevicePlugin()
+ throws Exception{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeTestDevicePlugin1.class.getCanonicalName() + "," +
+ FakeTestDevicePlugin3.class.getCanonicalName());
+
+ String expectedMessage = "cmpA.com/hdwA" +
+ " has already been registered! Please change a resource type name";
+ String actualMessage = "";
+ try {
+ nm.init(conf);
+ nm.start();
+ } catch (YarnRuntimeException e) {
+ actualMessage = e.getMessage();
+ }
+ Assert.assertEquals(expectedMessage, actualMessage);
+ }
+
+ @Test(timeout = 30000)
+ public void testRequestedResourceNameIsConfigured()
+ throws Exception{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+ String resourceName = "a.com/a";
+ Assert.assertFalse(rpm.isValidAndConfiguredResourceName(resourceName));
+ resourceName = "cmp.com/cmp";
+ Assert.assertTrue(rpm.isValidAndConfiguredResourceName(resourceName));
+ }
+
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java
new file mode 100644
index 00000000000..68f28c95b2c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Used only for testing.
+ * A fake normal vendor plugin
+ * */
+public class FakeTestDevicePlugin1 implements DevicePlugin {
+ @Override
+ public DeviceRegisterRequest register() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName("cmpA.com/hdwA").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(243)
+ .setMinorNumber(0)
+ .setBusID("0000:65:00.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) {
+ return null;
+ }
+
+ @Override
+ public void OnDevicesReleased(Set allocatedDevices) {
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java
new file mode 100644
index 00000000000..8c1a61dc21c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+/**
+ * Only used for testing.
+ * This isn't a implementation of DevicePlugin
+ * */
+public class FakeTestDevicePlugin2 {
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java
new file mode 100644
index 00000000000..b457c2e5b0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Only used for testing
+ * This plugin register a same name with FakeTestDevicePlugin1
+ * */
+public class FakeTestDevicePlugin3 implements DevicePlugin {
+ @Override
+ public DeviceRegisterRequest register() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName("cmpA.com/hdwA").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(243)
+ .setMinorNumber(0)
+ .setBusID("0000:65:00.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) {
+ return null;
+ }
+
+ @Override
+ public void OnDevicesReleased(Set allocatedDevices) {
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java
new file mode 100644
index 00000000000..d20b8887312
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.*;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.verify;
+
+public class TestDevicePluginAdapter {
+
+ private YarnConfiguration conf;
+ private String tempResourceTypesFile;
+
+ @Before
+ public void setup() throws Exception {
+ this.conf = new YarnConfiguration();
+ // setup resource-types.xml
+ ResourceUtils.resetResourceTypes();
+ String resourceTypesFile = "resource-types-pluggable-devices.xml";
+ this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile);
+ }
+
+ @After
+ public void tearDown() {
+ // cleanup resource-types.xml
+ File dest = new File(this.tempResourceTypesFile);
+ if (dest.exists()) {
+ dest.delete();
+ }
+ }
+
+
+ /**
+ * Use the MyPlugin which doesn't implement scheduler interfaces
+ * Test Adapter's "Bootstrap", "preStart" and "postComplete"
+ * Test plugin's "getDevices".
+ * Plugin's initialization and "register" invocation is tested in
+ * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.TestResourcePluginManager}
+ *
+ * */
+ @Test
+ public void testBasicWorkflow()
+ throws ResourceHandlerException, IOException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService storeService = mock(NMStateStoreService.class);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doNothing().when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ MyPlugin plugin = new MyPlugin();
+ MyPlugin spyPlugin = spy(plugin);
+ String resourceName = MyPlugin.resourceName;
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(rpm,
+ resourceName,
+ spyPlugin);
+ // Bootstrap, adding device
+ adapter.bootstrap(conf);
+ int size = dsm.getAvailableDevices(resourceName);
+ Assert.assertEquals(3, size);
+
+ // A container c1 requests 1 device
+ Container c1 = mockContainerWithDeviceRequest(0,
+ resourceName,
+ 1,false);
+ // preStart
+ adapter.preStart(c1);
+ // check book keeping
+ Assert.assertEquals(2,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(1,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // postComplete
+ adapter.postComplete(getContainerId(0));
+ Assert.assertEquals(3,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+
+ // A container c2 requests 3 device
+ Container c2 = mockContainerWithDeviceRequest(1,
+ resourceName,
+ 3,false);
+ // preStart
+ adapter.preStart(c2);
+ // check book keeping
+ Assert.assertEquals(0,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(3,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // postComplete
+ adapter.postComplete(getContainerId(1));
+ Assert.assertEquals(3,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ }
+
+ /**
+ * Use {@link FakeDevicePlugin} which implements
+ * {@link org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler}
+ * interface.
+ * Test Adapter's "Bootstrap", "preStart" and "postComplete"
+ * Test plugin's "getDevices", "allocateDevices"
+ *
+ * */
+ @Test
+ public void testBasicWorkflowWithPluginAndPluginScheduler()
+ throws ResourceHandlerException, IOException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService storeService = mock(NMStateStoreService.class);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doNothing().when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ FakeDevicePlugin plugin = new FakeDevicePlugin();
+ FakeDevicePlugin spyPlugin = spy(plugin);
+ String resourceName = FakeDevicePlugin.resourceName;
+ // Add customized device plugin scheduler
+ dsm.setPreferCustomizedScheduler(true);
+ dsm.addDevicePluginScheduler(resourceName,spyPlugin);
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(rpm,
+ resourceName,
+ spyPlugin);
+ // Bootstrap, adding device
+ adapter.bootstrap(conf);
+ int size = dsm.getAvailableDevices(resourceName);
+ Assert.assertEquals(1, size);
+
+ // A container requests 1 device
+ Container c1 = mockContainerWithDeviceRequest(0,
+ resourceName,
+ 1,false);
+ // preStart
+ adapter.preStart(c1);
+ // check if the plugin's own scheduler works
+ verify(spyPlugin, times(1))
+ .allocateDevices(isA(Set.class),isA(Integer.class));
+ // check book keeping
+ Assert.assertEquals(0,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(1,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(1,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // postComplete
+ adapter.postComplete(getContainerId(0));
+ Assert.assertEquals(1,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(1,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ }
+
+ private static Container mockContainerWithDeviceRequest(int id,
+ String resourceName,
+ int numDeviceRequest,
+ boolean dockerContainerEnabled) {
+ Container c = mock(Container.class);
+ when(c.getContainerId()).thenReturn(getContainerId(id));
+
+ Resource res = Resource.newInstance(1024, 1);
+ ResourceMappings resMapping = new ResourceMappings();
+
+ res.setResourceValue(resourceName, numDeviceRequest);
+ when(c.getResource()).thenReturn(res);
+ when(c.getResourceMappings()).thenReturn(resMapping);
+
+ ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+ Map env = new HashMap<>();
+ if (dockerContainerEnabled) {
+ env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE,
+ ContainerRuntimeConstants.CONTAINER_RUNTIME_DOCKER);
+ }
+ when(clc.getEnvironment()).thenReturn(env);
+ when(c.getLaunchContext()).thenReturn(clc);
+ return c;
+ }
+
+ private static ContainerId getContainerId(int id) {
+ return ContainerId.newContainerId(ApplicationAttemptId
+ .newInstance(ApplicationId.newInstance(1234L, 1), 1), id);
+ }
+
+
+ private class MyPlugin implements DevicePlugin {
+ private final static String resourceName = "cmpA.com/hdwA";
+ @Override
+ public DeviceRegisterRequest register() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName(resourceName)
+ .setPluginVersion("v1.0").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:00.0")
+ .setHealthy(true)
+ .build());
+ r.add(Device.Builder.newInstance()
+ .setID(1)
+ .setDevPath("/dev/hdwA1")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:01.0")
+ .setHealthy(true)
+ .build());
+ r.add(Device.Builder.newInstance()
+ .setID(2)
+ .setDevPath("/dev/hdwA2")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:02.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) {
+ return null;
+ }
+
+ @Override
+ public void OnDevicesReleased(Set releasedDevices) {
+
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml
new file mode 100644
index 00000000000..65b8e752f1c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml
@@ -0,0 +1,7 @@
+
+
+
+ yarn.resource-types
+ cmp.com/cmp,cmpA.com/hdwA
+
+
\ No newline at end of file