topology;
+
+ private Builder() {
+ topology = new TreeSet<>();
+ }
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public Device build(){
+ return new Device(this);
+ }
+
+ public Builder setID(Integer ID) {
+ this.ID = ID;
+ return this;
+ }
+
+ public Builder setDevPath(String devPath) {
+ this.devPath = devPath;
+ return this;
+ }
+
+ public Builder setMajorNumber(Integer majorNumber) {
+ this.majorNumber = majorNumber;
+ return this;
+ }
+
+ public Builder setMinorNumber(Integer minorNumber) {
+ this.minorNumber = minorNumber;
+ return this;
+ }
+
+ public Builder setBusID(String busID) {
+ this.busID = busID;
+ return this;
+ }
+
+ public Builder setHealthy(boolean healthy) {
+ isHealthy = healthy;
+ return this;
+ }
+
+ public Builder setStatus(String status) {
+ this.status = status;
+ return this;
+ }
+
+ public Builder addDeviceLink(DeviceLink link) {
+ this.topology.add(link);
+ return this;
+ }
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLink.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLink.java
new file mode 100644
index 00000000000..47444eac194
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLink.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class DeviceLink implements Serializable, Comparable {
+
+ private static final long serialVersionUID = 1L;
+
+ String busId;
+ DeviceLinkType linkType;
+
+ public String getBusId() {
+ return busId;
+ }
+
+ public DeviceLinkType getLinkType() {
+ return linkType;
+ }
+
+ private DeviceLink(Builder builder) {
+ this.busId = builder.busId;
+ this.linkType = builder.linkType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ DeviceLink other = (DeviceLink) o;
+ return Objects.equals(other.getBusId(), busId) &&
+ Objects.equals(other.getLinkType(), linkType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(busId, linkType);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (o == null || (!(o instanceof DeviceLink))) {
+ return -1;
+ }
+ DeviceLink other = (DeviceLink) o;
+
+ return other.linkType.compareTo(getLinkType());
+ }
+
+ public static class Builder {
+ String busId;
+ DeviceLinkType linkType;
+
+ private Builder(){}
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public DeviceLink build() {
+ return new DeviceLink(this);
+ }
+
+ public Builder setBusId(String busId) {
+ this.busId = busId;
+ return this;
+ }
+
+ public Builder setLinkType(DeviceLinkType linkType) {
+ this.linkType = linkType;
+ return this;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLinkType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLinkType.java
new file mode 100644
index 00000000000..ccbd6f5c8ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLinkType.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+public enum DeviceLinkType {
+ /**
+ * For Nvdia GPU NVLink
+ * */
+ P2PLinkNVLink(4),
+
+ /**
+ * Connected to same CPU (Same NUMA node)
+ * */
+ P2PLinkSameCPU(3),
+
+ /**
+ * Cross CPU through socket-level link (e.g. QPI).
+ * Usually cross NUMA node
+ * */
+ P2PLinkCrossCPU(2),
+
+ /**
+ * Just need to traverse one PCIe switch to talk
+ * */
+ P2PLinkSingleSwitch(1),
+
+ /**
+ * Need to traverse multiple PCIe switch to talk
+ * */
+ P2PLinkMultiSwitch(0);
+
+ // A higher link level means faster communication
+ private int linkLevel;
+
+ public int getLinkLevel() {
+ return linkLevel;
+ }
+
+ DeviceLinkType(int linkLevel) {
+ this.linkLevel = linkLevel;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java
new file mode 100644
index 00000000000..a4898417f05
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.util.Set;
+
+/**
+ * A must interface for vendor plugin to implement.
+ * */
+public interface DevicePlugin {
+ /**
+ * Called first when device plugin framework wants to register
+ * @return DeviceRegisterRequest {@link DeviceRegisterRequest}
+ * */
+ DeviceRegisterRequest getRegisterRequestInfo();
+
+ /**
+ * Called when update node resource
+ * @return a set of {@link Device}, {@link java.util.TreeSet} recommended
+ * */
+ Set getDevices();
+
+ /**
+ * Asking how these devices should be prepared/used
+ * before/when container launch. A plugin can do some tasks in its own or
+ * define it in DeviceRuntimeSpec to let the framework do it.
+ * For instance, define {@code VolumeSpec} to let the
+ * framework to create volume before running container.
+ *
+ * @param allocatedDevices A set of allocated {@link Device}.
+ * @param yarnRuntime Indicate which runtime YARN will use
+ * Could be {@code docker} or {@code default}
+ * in {@link DeviceRuntimeSpec} constants
+ * @return a {@link DeviceRuntimeSpec} description about environment,
+ * {@link VolumeSpec}, {@link MountVolumeSpec}. etc
+ * */
+ DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices,
+ String yarnRuntime);
+
+ /**
+ * Called after device released.
+ * */
+ void onDevicesReleased(Set releasedDevices);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java
new file mode 100644
index 00000000000..414cc4ace86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.util.Set;
+
+/**
+ * An optional interface to implement if custom device scheduling is needed.
+ * If this is not implemented, the device framework will do.
+ * */
+public interface DevicePluginScheduler {
+ /**
+ * Called when allocating devices. The framework will do all device book keeping
+ * and fail recovery. So this hook should only do scheduling based on available devices
+ * passed in. This method could be invoked multiple times.
+ * @param availableDevices Devices allowed to be chosen from.
+ * @param count Number of device to be allocated.
+ * @return a set of {@link Device}
+ * */
+ Set allocateDevices(Set availableDevices, Integer count);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java
new file mode 100644
index 00000000000..7753ad1efdf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.util.Objects;
+
+/**
+ * A vendor device plugin use this object to register
+ * to NM device plugin framework
+ * */
+public class DeviceRegisterRequest {
+
+ // plugin's own version
+ private final String pluginVersion;
+ private final String resourceName;
+
+ public DeviceRegisterRequest(Builder builder) {
+ this.resourceName = Objects.requireNonNull(builder.resourceName);
+ this.pluginVersion = builder.pluginVersion;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public String getPluginVersion() {
+ return pluginVersion;
+ }
+
+
+ public static class Builder {
+ private String pluginVersion;
+ private String resourceName;
+
+ private Builder() {}
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public DeviceRegisterRequest build() {
+ return new DeviceRegisterRequest(this);
+ }
+
+ public Builder setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ return this;
+ }
+
+ public Builder setPluginVersion(String pluginVersion) {
+ this.pluginVersion = pluginVersion;
+ return this;
+ }
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java
new file mode 100644
index 00000000000..f7aa80e0161
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.util.*;
+
+public class DeviceRuntimeSpec {
+
+ /**
+ * The containerRuntime gives device framework a hint (not forced to)
+ * on which container containerRuntime can use this Spec
+ * (if empty then default "runc" is used).
+ * For instance, it could be "nvidia" in Nvidia GPU Docker v2.
+ * The "nvidia" will be passed as a parameter to docker run
+ * with --runtime "nvidia"
+ *
+ * If cgroups, these fields could be empty if no special requirement
+ * since the framework already knows major and minor device number
+ * in {@link Device}.
+ * If docker, these fields below should be populated as needed
+ */
+ private final String containerRuntime;
+ private final Map envs;
+ private final Set volumeMounts;
+ private final Set deviceMounts;
+ private final Set volumeClaims;
+
+ public final static String RUNTIME_CGROUPS = "cgroups";
+ public final static String RUNTIME_DOCKER = "docker";
+
+ private DeviceRuntimeSpec(Builder builder) {
+ this.containerRuntime = builder.containerRuntime;
+ this.deviceMounts = builder.deviceMounts;
+ this.envs = builder.envs;
+ this.volumeClaims = builder.volumeClaims;
+ this.volumeMounts = builder.volumeMounts;
+ }
+
+ public String getContainerRuntime() {
+ return containerRuntime;
+ }
+
+ public Map getEnvs() {
+ return envs;
+ }
+
+ public Set getVolumeMounts() {
+ return volumeMounts;
+ }
+
+ public Set getDeviceMounts() {
+ return deviceMounts;
+ }
+
+ public Set getVolumeClaims() {
+ return volumeClaims;
+ }
+ public static class Builder {
+
+ private String containerRuntime;
+ private Map envs;
+ private Set volumeMounts;
+ private Set deviceMounts;
+ private Set volumeClaims;
+
+ private Builder() {
+ containerRuntime = DeviceRuntimeSpec.RUNTIME_DOCKER;
+ envs = new HashMap<>();
+ volumeClaims = new TreeSet<>();
+ deviceMounts = new TreeSet<>();
+ volumeMounts = new TreeSet<>();
+ }
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public DeviceRuntimeSpec build() {
+ return new DeviceRuntimeSpec(this);
+ }
+
+ public Builder setContainerRuntime(String containerRuntime) {
+ this.containerRuntime = containerRuntime;
+ return this;
+ }
+
+ public Builder addVolumeSpec(VolumeSpec spec) {
+ this.volumeClaims.add(spec);
+ return this;
+ }
+
+ public Builder addMountVolumeSpec(MountVolumeSpec spec) {
+ this.volumeMounts.add(spec);
+ return this;
+ }
+
+ public Builder addMountDeviceSpec(MountDeviceSpec spec) {
+ this.deviceMounts.add(spec);
+ return this;
+ }
+
+ public Builder addEnv(String key, String value) {
+ this.envs.put(Objects.requireNonNull(key),
+ Objects.requireNonNull(value));
+ return this;
+ }
+
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java
new file mode 100644
index 00000000000..8393558d627
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MountDeviceSpec implements Serializable, Comparable {
+
+ private final String devicePathInHost;
+ private final String devicePathInContainer;
+
+ // r for only read, rw can do read and write
+ private final String devicePermission;
+
+ public final static String RO = "r";
+ public final static String RW = "rw";
+
+ private MountDeviceSpec(Builder builder) {
+ this.devicePathInContainer = builder.devicePathInContainer;
+ this.devicePathInHost = builder.devicePathInHost;
+ this.devicePermission = builder.devicePermission;
+ }
+
+ public String getDevicePathInHost() {
+ return devicePathInHost;
+ }
+
+ public String getDevicePathInContainer() {
+ return devicePathInContainer;
+ }
+
+ public String getDevicePermission() {
+ return devicePermission;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ MountDeviceSpec other = (MountDeviceSpec) o;
+ return Objects.equals(devicePathInHost, other.getDevicePathInHost()) &&
+ Objects.equals(devicePathInContainer, other.getDevicePathInContainer()) &&
+ Objects.equals(devicePermission, other.getDevicePermission());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(devicePathInContainer,
+ devicePathInHost, devicePermission);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ public static class Builder {
+ private String devicePathInHost;
+ private String devicePathInContainer;
+ private String devicePermission;
+
+ private Builder() {}
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public MountDeviceSpec build() {
+ return new MountDeviceSpec(this);
+ }
+
+ public Builder setDevicePermission(String permission) {
+ this.devicePermission = permission;
+ return this;
+ }
+
+ public Builder setDevicePathInContainer(String devicePathInContainer) {
+ this.devicePathInContainer = devicePathInContainer;
+ return this;
+ }
+
+ public Builder setDevicePathInHost(String devicePathInHost) {
+ this.devicePathInHost = devicePathInHost;
+ return this;
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java
new file mode 100644
index 00000000000..b19228212d4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MountVolumeSpec implements Serializable, Comparable{
+
+ private static final long serialVersionUID = 1L;
+
+ // host path or volume name
+ private final String hostPath;
+
+ // path in the container
+ private final String mountPath;
+
+ // if true, data in mountPath can only be read
+ // "-v hostPath:mountPath:ro"
+ private final Boolean isReadOnly;
+
+ public final static String READONLYOPTION = "ro";
+
+ private MountVolumeSpec (Builder builder) {
+ this.hostPath = builder.hostPath;
+ this.mountPath = builder.mountPath;
+ this.isReadOnly = builder.isReadOnly;
+ }
+
+ public String getHostPath() {
+ return hostPath;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public Boolean getReadOnly() {
+ return isReadOnly;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ MountVolumeSpec other = (MountVolumeSpec) o;
+ return Objects.equals(hostPath, other.getHostPath()) &&
+ Objects.equals(mountPath, other.getMountPath()) &&
+ Objects.equals(isReadOnly, other.getReadOnly());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostPath, mountPath, isReadOnly);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ public static class Builder {
+ private String hostPath;
+ private String mountPath;
+ private Boolean isReadOnly;
+
+ private Builder() {}
+
+ public static Builder newInstance() {
+ return new Builder();
+ }
+
+ public MountVolumeSpec build() {
+ return new MountVolumeSpec(this);
+ }
+
+ public Builder setHostPath(String hostPath) {
+ this.hostPath = hostPath;
+ return this;
+ }
+
+ public Builder setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ return this;
+ }
+
+ public Builder setReadOnly(Boolean readOnly) {
+ isReadOnly = readOnly;
+ return this;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java
new file mode 100644
index 00000000000..32097b9e46b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class VolumeSpec implements Serializable, Comparable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String volumeDriver;
+ private final String volumeName;
+ private final String volumeOperation;
+
+ public final static String CREATE = "create";
+ public final static String DELETE = "delete";
+
+ private VolumeSpec(Builder builder) {
+ this.volumeDriver = builder.volumeDriver;
+ this.volumeName = builder.volumeName;
+ this.volumeOperation = builder.volumeOperation;
+ }
+
+ public String getVolumeDriver() {
+ return volumeDriver;
+ }
+
+ public String getVolumeName() {
+ return volumeName;
+ }
+
+ public String getVolumeOperation() {
+ return volumeOperation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()){
+ return false;
+ }
+ VolumeSpec other = (VolumeSpec) o;
+ return Objects.equals(volumeDriver, other.getVolumeDriver()) &&
+ Objects.equals(volumeName, other.getVolumeName()) &&
+ Objects.equals(volumeOperation, other.getVolumeOperation());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(volumeDriver, volumeName,volumeOperation);
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ return 0;
+ }
+
+ public static class Builder {
+ private String volumeDriver;
+ private String volumeName;
+ private String volumeOperation;
+
+ private Builder(){}
+
+ public static Builder newInstance () {
+ return new Builder();
+ }
+
+ public VolumeSpec build() {
+ return new VolumeSpec(this);
+ }
+
+ public Builder setVolumeDriver(String volumeDriver) {
+ this.volumeDriver = volumeDriver;
+ return this;
+ }
+
+ public Builder setVolumeName(String volumeName) {
+ this.volumeName = volumeName;
+ return this;
+ }
+
+ public Builder setVolumeOperation(String volumeOperation) {
+ this.volumeOperation = volumeOperation;
+ return this;
+ }
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
index 2cfa9c51dca..7a7d28a4048 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
@@ -439,32 +439,6 @@ private String runDockerVolumeCommand(DockerVolumeCommand dockerVolumeCommand,
@Override
public void prepareContainer(ContainerRuntimeContext ctx)
throws ContainerExecutionException {
- Container container = ctx.getContainer();
-
- // Create volumes when needed.
- if (nmContext != null
- && nmContext.getResourcePluginManager().getNameToPlugins() != null) {
- for (ResourcePlugin plugin : nmContext.getResourcePluginManager()
- .getNameToPlugins().values()) {
- DockerCommandPlugin dockerCommandPlugin =
- plugin.getDockerCommandPluginInstance();
- if (dockerCommandPlugin != null) {
- DockerVolumeCommand dockerVolumeCommand =
- dockerCommandPlugin.getCreateDockerVolumeCommand(
- ctx.getContainer());
- if (dockerVolumeCommand != null) {
- runDockerVolumeCommand(dockerVolumeCommand, container);
-
- // After volume created, run inspect to make sure volume properly
- // created.
- if (dockerVolumeCommand.getSubCommand().equals(
- DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) {
- checkDockerVolumeCreated(dockerVolumeCommand, container);
- }
- }
- }
- }
- }
}
private void checkDockerVolumeCreated(
@@ -1005,14 +979,30 @@ public void launchContainer(ContainerRuntimeContext ctx)
}
}
- // use plugins to update docker run command.
+ // use plugins to create volume and update docker run command.
if (nmContext != null
&& nmContext.getResourcePluginManager().getNameToPlugins() != null) {
for (ResourcePlugin plugin : nmContext.getResourcePluginManager()
.getNameToPlugins().values()) {
DockerCommandPlugin dockerCommandPlugin =
plugin.getDockerCommandPluginInstance();
+
if (dockerCommandPlugin != null) {
+ // Create volumes when needed.
+ DockerVolumeCommand dockerVolumeCommand =
+ dockerCommandPlugin.getCreateDockerVolumeCommand(
+ ctx.getContainer());
+ if (dockerVolumeCommand != null) {
+ runDockerVolumeCommand(dockerVolumeCommand, container);
+
+ // After volume created, run inspect to make sure volume properly
+ // created.
+ if (dockerVolumeCommand.getSubCommand().equals(
+ DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) {
+ checkDockerVolumeCreated(dockerVolumeCommand, container);
+ }
+ }
+ // Update cmd
dockerCommandPlugin.updateDockerRunCommand(runCommand, container);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
index f28aad206a6..499a05fd916 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
@@ -18,17 +18,29 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceSchedulerManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -49,14 +61,15 @@
private Map configuredPlugins =
Collections.emptyMap();
+ private DeviceSchedulerManager deviceSchedulerManager = null;
+
public synchronized void initialize(Context context)
- throws YarnException {
+ throws YarnException, ClassNotFoundException {
Configuration conf = context.getConf();
- String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
+ Map pluginMap = new HashMap<>();
+ String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
if (plugins != null) {
- Map pluginMap = new HashMap<>();
-
// Initialize each plugins
for (String resourceName : plugins) {
resourceName = resourceName.trim();
@@ -92,9 +105,148 @@ public synchronized void initialize(Context context)
plugin.initialize(context);
pluginMap.put(resourceName, plugin);
}
+ }
+ // Try to load pluggable device plugins
+ boolean puggableDeviceFrameworkEnabled = conf.getBoolean(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+
+ if (puggableDeviceFrameworkEnabled) {
+ initializePluggableDevicePlugins(context, conf, pluginMap);
+ } else {
+ LOG.info("The pluggable device framework is not enabled. If you want, please set true to {}",
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+ }
+ configuredPlugins = Collections.unmodifiableMap(pluginMap);
+ }
+
+ public void initializePluggableDevicePlugins(Context context,
+ Configuration configuration,
+ Map pluginMap)
+ throws YarnRuntimeException, ClassNotFoundException {
+ LOG.info("The pluggable device framework enabled, trying to load the vendor plugins");
+ deviceSchedulerManager = new DeviceSchedulerManager(context);
+ String[] pluginClassNames = configuration.getStrings(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
+ if (null == pluginClassNames) {
+ throw new YarnRuntimeException("Null value found in configuration: " +
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
+ }
+ // Check if framework prefer customized device scheduler
+ Boolean ifPrefer = configuration.getBoolean(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER,
+ YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER
+ );
+ deviceSchedulerManager.setPreferCustomizedScheduler(ifPrefer);
+ LOG.info("Whether the device plugin framework prefer customized device scheduler: {}",
+ ifPrefer);
+ for (String pluginClassName : pluginClassNames) {
+ Class> pluginClazz = Class.forName(pluginClassName);
+ if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) {
+ throw new YarnRuntimeException("Class: " + pluginClassName
+ + " not instance of " + DevicePlugin.class.getCanonicalName());
+ }
+ // sanity-check before initialization
+ checkInterfaceCompatibility(DevicePlugin.class, pluginClazz);
+
+ DevicePlugin dpInstance = (DevicePlugin) ReflectionUtils.newInstance(pluginClazz,
+ configuration);
+
+ // Try to register plugin
+ // TODO: handle the plugin method timeout issue
+ DeviceRegisterRequest request = dpInstance.getRegisterRequestInfo();
+ String resourceName = request.getResourceName();
+ // check if someone has already registered this resource type name
+ if (pluginMap.containsKey(resourceName)) {
+ throw new YarnRuntimeException(resourceName +
+ " has already been registered! Please change a resource type name");
+ }
+ // check resource name is valid and configured in resource-types.xml
+ if (!isValidAndConfiguredResourceName(resourceName)) {
+ throw new YarnRuntimeException(resourceName
+ + " is not configured inside "
+ + YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE +
+ " , please configure it first");
+ }
+ LOG.info("New resource type: {} registered successfully by {}",
+ resourceName,
+ pluginClassName);
+ DevicePluginAdapter pluginAdapter = new DevicePluginAdapter(
+ resourceName, dpInstance, deviceSchedulerManager);
+ LOG.info("Adapter of {} created. Initializing..", pluginClassName);
+ try {
+ pluginAdapter.initialize(context);
+ } catch (YarnException e) {
+ throw new YarnRuntimeException("Adapter of " + pluginClassName + " init failed!");
+ }
+ LOG.info("Adapter of {} init success!", pluginClassName);
+ // Store plugin as adapter instance
+ pluginMap.put(request.getResourceName(), pluginAdapter);
+ // If the device plugin implements DevicePluginScheduler interface
+ if (dpInstance instanceof DevicePluginScheduler) {
+ // check DevicePluginScheduler interface compatibility
+ checkInterfaceCompatibility(DevicePluginScheduler.class, pluginClazz);
+ LOG.info("{} can schedule {} devices. Added as preferred device plugin scheduler",
+ pluginClassName,
+ resourceName);
+ deviceSchedulerManager.addDevicePluginScheduler(
+ resourceName,
+ (DevicePluginScheduler)dpInstance);
+ }
+ } // end for
+ }
+
+ // Check if the implemented interfaces' signature is compatible
+ private void checkInterfaceCompatibility(Class> expectedClass, Class> actualClass)
+ throws YarnRuntimeException{
+ LOG.debug("Checking implemented interface's compatibility: \"{}\"",
+ expectedClass.getSimpleName());
+ Method[] expectedDevicePluginMethods = expectedClass.getMethods();
- configuredPlugins = Collections.unmodifiableMap(pluginMap);
+ // Check method compatibility
+ boolean found;
+ for (Method method: expectedDevicePluginMethods) {
+ found = false;
+ LOG.debug("Try to find method: \"{}\"",
+ method.getName());
+ for (Method m : actualClass.getDeclaredMethods()) {
+ if (m.getName().equals(method.getName())) {
+ LOG.debug("Method \"{}\" found in class \"{}\"",
+ actualClass.getSimpleName(),
+ m.getName());
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ LOG.info("Method \"{}\" is not found in plugin",
+ method.getName()
+ );
+ throw new YarnRuntimeException(
+ "Method \"" + method.getName() +
+ "\" is expected but not implemented in " +
+ actualClass.getCanonicalName()
+ );
+ }
+ }// end for
+ LOG.debug("\"{}\" compatibility is ok..",
+ expectedClass.getSimpleName());
+ }
+
+ @VisibleForTesting
+ public boolean isValidAndConfiguredResourceName(String resourceName) {
+ // check pattern match
+ // check configured
+ Map configuredResourceTypes =
+ ResourceUtils.getResourceTypes();
+ if (!configuredResourceTypes.containsKey(resourceName)) {
+ return false;
}
+ return true;
+ }
+
+ public DeviceSchedulerManager getDeviceSchedulerManager() {
+ return deviceSchedulerManager;
}
public synchronized void cleanup() throws YarnException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/AssignedDevice.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/AssignedDevice.java
new file mode 100644
index 00000000000..caf57928dde
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/AssignedDevice.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class AssignedDevice implements Serializable,Comparable {
+
+ private static final long serialVersionUID = 1L;
+
+
+ Device device;
+ String containerId;
+
+ public AssignedDevice(ContainerId containerId, Device device) {
+ this.device = device;
+ this.containerId = containerId.toString();
+ }
+
+ public Device getDevice() {
+ return device;
+ }
+
+ public String getContainerId() {
+ return containerId;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ if (o == null || !(o instanceof AssignedDevice)) {
+ return -1;
+ }
+ AssignedDevice other = (AssignedDevice) o;
+ int result = getDevice().compareTo(other.getDevice());
+ if (0 != result) {
+ return result;
+ }
+ return getContainerId().compareTo(other.getContainerId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof AssignedDevice)) {
+ return false;
+ }
+ AssignedDevice other = (AssignedDevice) o;
+ return getDevice().equals(other.getDevice()) &&
+ getContainerId().equals(other.getContainerId());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getDevice(),getContainerId());
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java
new file mode 100644
index 00000000000..3b2e1ef8501
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMDeviceResourceInfo;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
+
+import java.util.*;
+
+
+/**
+ * The {@link DevicePluginAdapter} will adapt existing hooks into vendor plugin's logic.
+ * It decouples the vendor plugin from YARN's device framework
+ *
+ * */
+public class DevicePluginAdapter implements ResourcePlugin {
+ final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class);
+
+ private String resourceName;
+ private DevicePlugin devicePlugin;
+
+ private DeviceSchedulerManager deviceSchedulerManager;
+
+ private DeviceResourceDockerRuntimePluginImpl deviceDockerCommandPlugin;
+
+ private DeviceResourceHandlerImpl deviceResourceHandler;
+ private DeviceResourceUpdaterImpl deviceResourceUpdater;
+
+ public DevicePluginAdapter(String name, DevicePlugin dp,
+ DeviceSchedulerManager dsm) {
+ deviceSchedulerManager = dsm;
+ resourceName = name;
+ devicePlugin = dp;
+ }
+
+ public DeviceSchedulerManager getDeviceSchedulerManager() {
+ return deviceSchedulerManager;
+ }
+
+ @Override
+ public void initialize(Context context) throws YarnException {
+ deviceDockerCommandPlugin = new DeviceResourceDockerRuntimePluginImpl(
+ resourceName,
+ devicePlugin, this);
+ deviceResourceUpdater = new DeviceResourceUpdaterImpl(
+ resourceName, devicePlugin);
+ LOG.info(resourceName + " plugin adapter initialized");
+ return;
+ }
+
+ @Override
+ public ResourceHandler createResourceHandler(Context nmContext, CGroupsHandler cGroupsHandler,
+ PrivilegedOperationExecutor privilegedOperationExecutor) {
+ this.deviceResourceHandler = new DeviceResourceHandlerImpl(resourceName,
+ devicePlugin, this, deviceSchedulerManager,
+ cGroupsHandler, privilegedOperationExecutor);
+ return deviceResourceHandler;
+ }
+
+ @Override
+ public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() {
+ return deviceResourceUpdater;
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ @Override
+ public DockerCommandPlugin getDockerCommandPluginInstance() {
+ return deviceDockerCommandPlugin;
+ }
+
+ @Override
+ public NMResourceInfo getNMResourceInfo() throws YarnException {
+ List allowed = new ArrayList<>(
+ deviceSchedulerManager.getAllAllowedDevices().get(resourceName));
+ List assigned = new ArrayList<>();
+ Map assignedMap =
+ deviceSchedulerManager.getAllUsedDevices().get(resourceName);
+ for (Map.Entry entry : assignedMap.entrySet()) {
+ assigned.add(new AssignedDevice(entry.getValue(),
+ entry.getKey()));
+ }
+ return new NMDeviceResourceInfo(allowed, assigned);
+ }
+
+ public DeviceResourceHandlerImpl getDeviceResourceHandler() {
+ return deviceResourceHandler;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceDockerRuntimePluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceDockerRuntimePluginImpl.java
new file mode 100644
index 00000000000..5edd8835c15
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceDockerRuntimePluginImpl.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DeviceResourceDockerRuntimePluginImpl
+ implements DockerCommandPlugin {
+
+ final static Log LOG = LogFactory.getLog(
+ DeviceResourceDockerRuntimePluginImpl.class);
+
+ private String resourceName;
+ private DevicePlugin devicePlugin;
+ private DevicePluginAdapter devicePluginAdapter;
+
+ private Map> cachedAllocation =
+ new ConcurrentHashMap();
+
+ private Map cachedSpec =
+ new ConcurrentHashMap<>();
+
+ public DeviceResourceDockerRuntimePluginImpl(String resourceName,
+ DevicePlugin devicePlugin, DevicePluginAdapter devicePluginAdapter) {
+ this.resourceName = resourceName;
+ this.devicePlugin = devicePlugin;
+ this.devicePluginAdapter = devicePluginAdapter;
+ }
+
+ @Override
+ public void updateDockerRunCommand(DockerRunCommand dockerRunCommand,
+ Container container) throws ContainerExecutionException {
+ if(!requestsDevice(resourceName, container)) {
+ return;
+ }
+ DeviceRuntimeSpec deviceRuntimeSpec = getRuntimeSpec(container);
+ // handle device mounts
+ Set deviceMounts = deviceRuntimeSpec.getDeviceMounts();
+ for (MountDeviceSpec mountDeviceSpec : deviceMounts) {
+ dockerRunCommand.addDevice(
+ mountDeviceSpec.getDevicePathInHost(),
+ mountDeviceSpec.getDevicePathInContainer());
+ }
+ // handle volume mounts
+ Set mountVolumeSpecs = deviceRuntimeSpec.getVolumeMounts();
+ for (MountVolumeSpec mountVolumeSpec : mountVolumeSpecs) {
+ if (mountVolumeSpec.getReadOnly()) {
+ dockerRunCommand.addReadOnlyMountLocation(
+ mountVolumeSpec.getHostPath(),
+ mountVolumeSpec.getMountPath());
+ } else {
+ dockerRunCommand.addReadWriteMountLocation(
+ mountVolumeSpec.getHostPath(),
+ mountVolumeSpec.getMountPath());
+ }
+ }
+ // handle envs
+ dockerRunCommand.addEnv(deviceRuntimeSpec.getEnvs());
+
+ }
+
+ @Override
+ public DockerVolumeCommand getCreateDockerVolumeCommand(Container container) throws ContainerExecutionException {
+ if(!requestsDevice(resourceName, container)) {
+ return null;
+ }
+ DeviceRuntimeSpec deviceRuntimeSpec = getRuntimeSpec(container);
+ Set volumeClaims = deviceRuntimeSpec.getVolumeClaims();
+ for (VolumeSpec volumeSec: volumeClaims) {
+ if (volumeSec.getVolumeOperation().equals(VolumeSpec.CREATE)) {
+ DockerVolumeCommand command = new DockerVolumeCommand(
+ DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND);
+ command.setDriverName(volumeSec.getVolumeDriver());
+ command.setVolumeName(volumeSec.getVolumeName());
+ // TODO: support more volume creation and with
+ return command;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container) throws ContainerExecutionException {
+
+ if(!requestsDevice(resourceName, container)) {
+ return null;
+ }
+ Set allocated = new TreeSet<>();
+ getAllocatedDevices(container,allocated);
+ devicePlugin.onDevicesReleased(allocated);
+
+ // remove cache
+ ContainerId containerId = container.getContainerId();
+ cachedAllocation.remove(containerId);
+ cachedSpec.remove(containerId);
+ return null;
+ }
+
+ @VisibleForTesting
+ protected boolean requestsDevice(String resourceName, Container container) {
+ return DeviceSchedulerManager.
+ getRequestedDeviceCount(resourceName, container.getResource()) > 0;
+ }
+
+ private void getAllocatedDevices(Container container, Set allocated) {
+ // get allocated devices
+ ContainerId containerId = container.getContainerId();
+ allocated = cachedAllocation.get(containerId);
+ if (null != allocated) {
+ return;
+ }
+ Map assignedDevice = devicePluginAdapter
+ .getDeviceSchedulerManager()
+ .getAllUsedDevices().get(resourceName);
+ for (Map.Entry entry : assignedDevice.entrySet()) {
+ if (entry.getValue().equals(containerId)) {
+ allocated.add(entry.getKey());
+ }
+ }
+ cachedAllocation.put(containerId, allocated);
+ }
+
+ public synchronized DeviceRuntimeSpec getRuntimeSpec(Container container) {
+ ContainerId containerId = container.getContainerId();
+ DeviceRuntimeSpec deviceRuntimeSpec = cachedSpec.get(containerId);
+ if (deviceRuntimeSpec == null) {
+ Set allocated = new TreeSet<>();
+ getAllocatedDevices(container, allocated);
+ deviceRuntimeSpec = devicePlugin.onDevicesAllocated(allocated,
+ DeviceRuntimeSpec.RUNTIME_DOCKER);
+ if (null == deviceRuntimeSpec) {
+ LOG.error("Null DeviceRuntimeSpec value got, please check plugin logic");
+ return null;
+ }
+ cachedSpec.put(containerId, deviceRuntimeSpec);
+ }
+ return deviceRuntimeSpec;
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java
new file mode 100644
index 00000000000..4d07f64f9a0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+import java.util.List;
+import java.util.Set;
+
+public class DeviceResourceHandlerImpl implements ResourceHandler {
+
+ final static Log LOG = LogFactory.getLog(DeviceResourceHandlerImpl.class);
+
+ private String resourceName;
+ private DevicePlugin devicePlugin;
+ private DeviceSchedulerManager deviceSchedulerManager;
+ private CGroupsHandler cGroupsHandler;
+ private PrivilegedOperationExecutor privilegedOperationExecutor;
+ private DevicePluginAdapter devicePluginAdapter;
+
+ public DeviceResourceHandlerImpl(String resourceName,
+ DevicePlugin devicePlugin,
+ DevicePluginAdapter devicePluginAdapter,
+ DeviceSchedulerManager deviceSchedulerManager,
+ CGroupsHandler cGroupsHandler,
+ PrivilegedOperationExecutor privilegedOperation) {
+ this.devicePluginAdapter = devicePluginAdapter;
+ this.resourceName = resourceName;
+ this.devicePlugin = devicePlugin;
+ this.cGroupsHandler = cGroupsHandler;
+ this.privilegedOperationExecutor = privilegedOperation;
+ this.deviceSchedulerManager = deviceSchedulerManager;
+ }
+
+ @Override
+ public List bootstrap(Configuration configuration) throws ResourceHandlerException {
+ Set availableDevices = devicePlugin.getDevices();
+ /**
+ * We won't fail the NM if plugin returns invalid value here.
+ * // TODO: we should update RM's resource count if something wrong
+ * */
+ if (availableDevices == null) {
+ LOG.error("Bootstrap " + resourceName + " failed. Null value got from plugin's getDevices method");
+ return null;
+ }
+ // Add device set. Here we trust the plugin's return value
+ deviceSchedulerManager.addDeviceSet(resourceName, availableDevices);
+ // TODO: Init cgroups
+
+ return null;
+ }
+
+ @Override
+ public List preStart(Container container) throws ResourceHandlerException {
+ String containerIdStr = container.getContainerId().toString();
+ DeviceSchedulerManager.DeviceAllocation allocation = deviceSchedulerManager.assignDevices(
+ resourceName, container);
+ LOG.debug("Allocated to " +
+ containerIdStr + ": " + allocation );
+
+ DeviceRuntimeSpec deviceRuntimeSpec = devicePlugin.onDevicesAllocated(
+ allocation.getAllowed(), DeviceRuntimeSpec.RUNTIME_CGROUPS);
+
+ // cgroups operation based on allocation
+ /**
+ * TODO: implement a general container-executor device module to accept do isolation
+ * */
+
+ return null;
+ }
+
+ @Override
+ public List reacquireContainer(ContainerId containerId) throws ResourceHandlerException {
+ deviceSchedulerManager.recoverAssignedDevices(resourceName, containerId);
+ return null;
+ }
+
+ @Override
+ public List updateContainer(Container container) throws ResourceHandlerException {
+ return null;
+ }
+
+ @Override
+ public List postComplete(ContainerId containerId) throws ResourceHandlerException {
+ deviceSchedulerManager.cleanupAssignedDevices(resourceName, containerId);
+ return null;
+ }
+
+ @Override
+ public List teardown() throws ResourceHandlerException {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceUpdaterImpl.java
new file mode 100644
index 00000000000..4fbb42d03f2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceUpdaterImpl.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
+
+import java.util.Set;
+
+public class DeviceResourceUpdaterImpl extends NodeResourceUpdaterPlugin {
+
+ final static Log LOG = LogFactory.getLog(DeviceResourceUpdaterImpl.class);
+
+ private String resourceName;
+ private DevicePlugin devicePlugin;
+
+ public DeviceResourceUpdaterImpl(String resourceName,
+ DevicePlugin devicePlugin) {
+ this.devicePlugin = devicePlugin;
+ this.resourceName = resourceName;
+ }
+
+ @Override
+ public void updateConfiguredResource(Resource res) throws YarnException {
+ LOG.info(resourceName + " plugin update resource ");
+ Set devices = devicePlugin.getDevices();
+ if (devices == null) {
+ LOG.warn(resourceName + " plugin failed to discover resource ( null value got)." );
+ return;
+ }
+ res.setResourceValue(resourceName, devices.size());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java
new file mode 100644
index 00000000000..a6375396c70
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Schedule device resource based on requirements and do book keeping
+ * It holds all device type resource and can do scheduling as a default scheduler
+ * If one resource type brings its own scheduler. That will be used.
+ * */
+public class DeviceSchedulerManager {
+ final static Log LOG = LogFactory.getLog(DeviceSchedulerManager.class);
+
+ private Context nmContext;
+ private static final int WAIT_MS_PER_LOOP = 1000;
+
+ // Holds vendor implemented scheduler
+ private Map devicePluginSchedulers =
+ new ConcurrentHashMap<>();
+
+ private boolean preferCustomizedScheduler = false;
+
+ /**
+ * Hold all type of devices
+ * key is the device resource name
+ * value is a sorted set of {@link Device}
+ * */
+ private Map> allAllowedDevices = new ConcurrentHashMap<>();
+
+ /**
+ * Hold used devices
+ * key is the device resource name
+ * value is a sorted map of {@link Device} and {@link ContainerId} pairs
+ * */
+ private Map> allUsedDevices = new ConcurrentHashMap<>();
+
+ public DeviceSchedulerManager(Context context) {
+ nmContext = context;
+ }
+
+ @VisibleForTesting
+ public Map> getAllAllowedDevices() {
+ return allAllowedDevices;
+ }
+
+ @VisibleForTesting
+ public Map> getAllUsedDevices() {
+ return allUsedDevices;
+ }
+
+ public synchronized void addDeviceSet(String resourceName, Set deviceSet) {
+ LOG.info("Adding new resource: " + "type:" + resourceName + "," + deviceSet);
+ allAllowedDevices.put(resourceName, new TreeSet<>(deviceSet));
+ allUsedDevices.put(resourceName, new TreeMap<>());
+ }
+
+ public synchronized DeviceAllocation assignDevices(String resourceName, Container container)
+ throws ResourceHandlerException {
+ DeviceAllocation allocation = internalAssignDevices(resourceName, container);
+ // Wait for a maximum of 120 seconds if no available Devices are there which
+ // are yet to be released.
+ final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP;
+ int timeWaiting = 0;
+ while (allocation == null) {
+ if (timeWaiting >= timeoutMsecs) {
+ break;
+ }
+
+ // Sleep for 1 sec to ensure there are some free devices which are
+ // getting released.
+ try {
+ LOG.info("Container : " + container.getContainerId()
+ + " is waiting for free " + resourceName + " devices.");
+ Thread.sleep(WAIT_MS_PER_LOOP);
+ timeWaiting += WAIT_MS_PER_LOOP;
+ allocation = internalAssignDevices(resourceName, container);
+ } catch (InterruptedException e) {
+ // On any interrupt, break the loop and continue execution.
+ break;
+ }
+ }
+
+ if(allocation == null) {
+ String message = "Could not get valid " + resourceName + " device for container '" +
+ container.getContainerId()
+ + "' as some other containers might not releasing them.";
+ LOG.warn(message);
+ throw new ResourceHandlerException(message);
+ }
+ return allocation;
+ }
+
+ public synchronized DeviceAllocation internalAssignDevices(String resourceName, Container container)
+ throws ResourceHandlerException {
+ Resource requestedResource = container.getResource();
+ ContainerId containerId = container.getContainerId();
+ int requestedDeviceCount = getRequestedDeviceCount(resourceName, requestedResource);
+ // Assign devices to container if requested some.
+ if (requestedDeviceCount > 0) {
+ if (requestedDeviceCount > getAvailableDevices(resourceName)) {
+ // If there are some devices which are getting released, wait for few
+ // seconds to get it.
+ if (requestedDeviceCount <= getReleasingDevices(resourceName) +
+ getAvailableDevices(resourceName)) {
+ return null;
+ }
+ }
+
+ int availableDeviceCount = getAvailableDevices(resourceName);
+ if (requestedDeviceCount > availableDeviceCount) {
+ throw new ResourceHandlerException(
+ "Failed to find enough " + resourceName + ", requestor=" + containerId
+ + ", #Requested=" + requestedDeviceCount + ", #available="
+ + availableDeviceCount);
+ }
+
+ Set assignedDevices = new TreeSet<>();
+ Map usedDevices = allUsedDevices.get(resourceName);
+ Set allowedDevices = allAllowedDevices.get(resourceName);
+
+ DevicePluginScheduler dps = devicePluginSchedulers.get(resourceName);
+ // Use default schedule logic if not prefer or dps not exists or
+ // (prefer but null instance)
+ if ((!isPreferCustomizedScheduler() || null == dps)
+ || (isPreferCustomizedScheduler() && null == dps)) {
+ if (!isPreferCustomizedScheduler()) {
+ LOG.debug("Customized device plugin scheduler is not preferred, use default logic");
+ }
+
+ if (isPreferCustomizedScheduler() && null == dps) {
+ LOG.debug("Customized device plugin scheduler is preferred but not implemented, use default logic");
+ }
+
+ for (Device device : allowedDevices) {
+ if (!usedDevices.containsKey(device)) {
+ usedDevices.put(device, containerId);
+ assignedDevices.add(device);
+ if (assignedDevices.size() == requestedDeviceCount) {
+ break;
+ }
+ }
+ }
+ } else {
+ if(isPreferCustomizedScheduler() && null != dps) {
+ LOG.debug("LOG.debug(\"Customized device plugin scheduler is preferred and implemented," +
+ "use customized logic\");");
+ }
+ // Use customized device scheduler
+ LOG.debug("Try to schedule " + requestedDeviceCount
+ + "(" + resourceName + ") using " + dps.getClass());
+ Set dpsAllocated = dps.allocateDevices(
+ Sets.difference(allowedDevices, usedDevices.keySet()),
+ requestedDeviceCount);
+ // TODO: should check if customized scheduler return values are real
+ if (dpsAllocated.size() != requestedDeviceCount) {
+ throw new ResourceHandlerException(dps.getClass() + " should allocate "
+ + requestedDeviceCount
+ + " of " + resourceName + ", but actual: "
+ + assignedDevices.size());
+ // TODO: fall back to default schedule logic?
+ }
+ // copy
+ assignedDevices.addAll(dpsAllocated);
+ // Store assigned devices into usedDevices
+ for (Device device : assignedDevices) {
+ usedDevices.put(device, containerId);
+ }
+ }
+ // Record in state store if we allocated anything
+ if (!assignedDevices.isEmpty()) {
+ try {
+ // Update state store.
+ nmContext.getNMStateStore().storeAssignedResources(container, resourceName,
+ new ArrayList<>(assignedDevices));
+ } catch (IOException e) {
+ cleanupAssignedDevices(resourceName, containerId);
+ throw new ResourceHandlerException(e);
+ }
+ }
+
+ return new DeviceAllocation(resourceName, assignedDevices,
+ Sets.difference(allowedDevices, assignedDevices));
+ }
+ return new DeviceAllocation(resourceName, null,
+ allAllowedDevices.get(resourceName));
+ }
+
+ public synchronized void recoverAssignedDevices(String resourceName, ContainerId containerId)
+ throws ResourceHandlerException{
+ Container c = nmContext.getContainers().get(containerId);
+ Map usedDevices = allUsedDevices.get(resourceName);
+ Set allowedDevices = allAllowedDevices.get(resourceName);
+ if (null == c) {
+ throw new ResourceHandlerException(
+ "This shouldn't happen, cannot find container with id="
+ + containerId);
+ }
+
+ for (Serializable deviceSerializable : c.getResourceMappings()
+ .getAssignedResources(resourceName)) {
+ if (!(deviceSerializable instanceof Device)) {
+ throw new ResourceHandlerException(
+ "Trying to recover device id, however it"
+ + " is not Device instance, this shouldn't happen");
+ }
+
+ Device device = (Device) deviceSerializable;
+
+ // Make sure it is in allowed device.
+ if (!allowedDevices.contains(device)) {
+ throw new ResourceHandlerException(
+ "Try to recover device = " + device
+ + " however it is not in allowed device list:" + StringUtils
+ .join(",", allowedDevices));
+ }
+
+ // Make sure it is not occupied by anybody else
+ if (usedDevices.containsKey(device)) {
+ throw new ResourceHandlerException(
+ "Try to recover device id = " + device
+ + " however it is already assigned to container="
+ + usedDevices.get(device)
+ + ", please double check what happened.");
+ }
+
+ usedDevices.put(device, containerId);
+ }
+ }
+
+ public synchronized void cleanupAssignedDevices(String resourceName, ContainerId containerId) {
+ Iterator> iter =
+ allUsedDevices.get(resourceName).entrySet().iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getValue().equals(containerId)) {
+ iter.remove();
+ }
+ }
+ }
+
+ public static int getRequestedDeviceCount(String resourceName, Resource requestedResource) {
+ try {
+ return Long.valueOf(requestedResource.getResourceValue(
+ resourceName)).intValue();
+ } catch (ResourceNotFoundException e) {
+ return 0;
+ }
+ }
+
+ public synchronized int getAvailableDevices(String resourceName) {
+ return allAllowedDevices.get(resourceName).size() -
+ allUsedDevices.get(resourceName).size();
+ }
+
+ private synchronized long getReleasingDevices(String resourceName) {
+ long releasingDevices = 0;
+ Map used = allUsedDevices.get(resourceName);
+ Iterator> iter = used.entrySet()
+ .iterator();
+ while (iter.hasNext()) {
+ ContainerId containerId = iter.next().getValue();
+ Container container;
+ if ((container = nmContext.getContainers().get(containerId)) != null) {
+ if (container.isContainerInFinalStates()) {
+ releasingDevices = releasingDevices + container.getResource()
+ .getResourceInformation(resourceName).getValue();
+ }
+ }
+ }
+ return releasingDevices;
+ }
+
+ public boolean isPreferCustomizedScheduler() {
+ return preferCustomizedScheduler;
+ }
+
+ public void setPreferCustomizedScheduler(boolean preferCustomizedScheduler) {
+ this.preferCustomizedScheduler = preferCustomizedScheduler;
+ }
+
+ static class DeviceAllocation {
+ private String resourceName;
+
+
+ private Set allowed = Collections.emptySet();
+ private Set denied = Collections.emptySet();
+
+ DeviceAllocation(String resourceName, Set allowed, Setdenied) {
+ this.resourceName = resourceName;
+ if (allowed != null) {
+ this.allowed = ImmutableSet.copyOf(allowed);
+ }
+ if (denied != null) {
+ this.denied = ImmutableSet.copyOf(denied);
+ }
+ }
+
+
+ public Set getAllowed() {
+ return allowed;
+ }
+
+ @Override
+ public String toString() {
+ return "ResourceType: " + resourceName +
+ ", Allowed Devices: " + allowed +
+ ", Denied Devices: " + denied;
+ }
+
+ }
+
+ @VisibleForTesting
+ public void addDevicePluginScheduler(String resourceName,
+ DevicePluginScheduler s) {
+ this.devicePluginSchedulers.put(resourceName,
+ Objects.requireNonNull(s));
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java
new file mode 100644
index 00000000000..99c920dd773
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FakeDevicePlugin
+ implements DevicePlugin,DevicePluginScheduler {
+ public final static String resourceName = "cmp.com/cmp";
+ @Override
+ public DeviceRegisterRequest getRegisterRequestInfo() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName(resourceName)
+ .setPluginVersion("v1.0").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/cmp0")
+ .setMajorNumber(243)
+ .setMinorNumber(0)
+ .setBusID("0000:65:00.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) {
+ return null;
+ }
+
+ @Override
+ public void onDevicesReleased(Set allocatedDevices) {
+
+ }
+
+ @Override
+ public Set allocateDevices(Set availableDevices, Integer count) {
+ Set allocated = new TreeSet<>();
+ int number = 0;
+ for (Device d : availableDevices) {
+ allocated.add(d);
+ number++;
+ if (number == count) {
+ break;
+ }
+ }
+ return allocated;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMDeviceResourceInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMDeviceResourceInfo.java
new file mode 100644
index 00000000000..3a0ba757cae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMDeviceResourceInfo.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp.dao;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.AssignedDevice;
+
+import java.util.List;
+
+public class NMDeviceResourceInfo extends NMResourceInfo {
+
+ List totalDevices;
+ List assignedDevices;
+
+ public NMDeviceResourceInfo(List totalDevices, List assignedDevices) {
+ this.assignedDevices = assignedDevices;
+ this.totalDevices = totalDevices;
+ }
+
+ public List getTotalDevices() {
+ return totalDevices;
+ }
+
+ public void setTotalDevices(List totalDevices) {
+ this.totalDevices = totalDevices;
+ }
+
+ public List getAssignedDevices() {
+ return assignedDevices;
+ }
+
+ public void setAssignedDevices(
+ List assignedDevices) {
+ this.assignedDevices = assignedDevices;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
index 6ed7c568899..1f748826ffd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -33,6 +34,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@@ -41,27 +43,41 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin1;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin2;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin3;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.util.*;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestResourcePluginManager extends NodeManagerTestBase {
private NodeManager nm;
+ private YarnConfiguration conf;
+
+ private String tempResourceTypesFile;
+
+ @Before
+ public void setup() throws Exception {
+ this.conf = createNMConfig();
+ // setup resource-types.xml
+ ResourceUtils.resetResourceTypes();
+ String resourceTypesFile = "resource-types-pluggable-devices.xml";
+ this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile);
+ }
+
ResourcePluginManager stubResourcePluginmanager() {
// Stub ResourcePluginManager
final ResourcePluginManager rpm = mock(ResourcePluginManager.class);
@@ -94,6 +110,11 @@ public void tearDown() {
// ignore
}
}
+ // cleanup resource-types.xml
+ File dest = new File(this.tempResourceTypesFile);
+ if (dest.exists()) {
+ dest.delete();
+ }
}
private class CustomizedResourceHandler implements ResourceHandler {
@@ -145,7 +166,7 @@ public MyMockNM(ResourcePluginManager rpm) {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- ((NodeManager.NMContext)context).setResourcePluginManager(rpm);
+ ((NodeManager.NMContext) context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
metrics, new BaseResourceTrackerForTest());
}
@@ -157,7 +178,7 @@ protected ContainerManagerImpl createContainerManager(Context context,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
- metrics, diskhandler);
+ metrics, diskhandler);
}
@Override
@@ -222,7 +243,7 @@ public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Except
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
- ((NMContext)context).setResourcePluginManager(rpm);
+ ((NMContext) context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
metrics, new BaseResourceTrackerForTest());
}
@@ -239,7 +260,7 @@ protected ContainerManagerImpl createContainerManager(Context context,
@Override
protected ContainerExecutor createContainerExecutor(Configuration conf) {
- ((NMContext)this.getNMContext()).setResourcePluginManager(rpm);
+ ((NMContext) this.getNMContext()).setResourcePluginManager(rpm);
lce.setConf(conf);
return lce;
}
@@ -257,6 +278,9 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) {
boolean newHandlerAdded = false;
for (ResourceHandler h : ((ResourceHandlerChain) handler)
.getResourceHandlerList()) {
+ if (h instanceof DevicePluginAdapter) {
+ Assert.assertTrue(false);
+ }
if (h instanceof CustomizedResourceHandler) {
newHandlerAdded = true;
break;
@@ -264,4 +288,175 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) {
}
Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded);
}
+
+ // Disabled pluggable framework.
+ // We use spy object of real rpm to verify "initializePluggableDevicePlugins"
+ // because use mock rpm will not working
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkDisabled() throws Exception {
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ false);
+ nm.init(conf);
+ nm.start();
+ verify(rpmSpy, times(1)).initialize(
+ any(Context.class));
+ verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ }
+
+ // no configuration set.
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkDisabled2() throws Exception {
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ nm.init(conf);
+ nm.start();
+ verify(rpmSpy, times(1)).initialize(
+ any(Context.class));
+ verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ }
+
+ // Enable framework and configure pluggable device classes
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkEnabled() throws Exception {
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeDevicePlugin.class.getCanonicalName());
+ nm.init(conf);
+ nm.start();
+ verify(rpmSpy, times(1)).initialize(
+ any(Context.class));
+ verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ }
+
+ // Enable pluggable framework, but leave device classes un-configured
+ // initializePluggableDevicePlugins invoked but it should throw an exception
+ @Test(timeout = 30000)
+ public void testInitializationWithPluggableDeviceFrameworkEnabled2()
+ throws ClassNotFoundException{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+ Boolean fail = false;
+ try {
+ YarnConfiguration conf = createNMConfig();
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+
+ nm.init(conf);
+ nm.start();
+ } catch (YarnRuntimeException e) {
+ fail = true;
+ } catch (Exception e) {
+
+ }
+ verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), any(Map.class));
+ Assert.assertTrue(fail);
+ }
+
+ @Test(timeout = 30000)
+ public void testNormalInitializationOfPluggableDeviceClasses()
+ throws Exception{
+
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeTestDevicePlugin1.class.getCanonicalName() + "," +
+ FakeDevicePlugin.class.getCanonicalName());
+ nm.init(conf);
+ nm.start();
+ Map pluginMap = rpmSpy.getNameToPlugins();
+ Assert.assertEquals(2,pluginMap.size());
+ ResourcePlugin rp = pluginMap.get("cmp.com/cmp");
+ if (! (rp instanceof DevicePluginAdapter)) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ // Fail to load a class which doesn't implement interface DevicePlugin
+ @Test(timeout = 30000)
+ public void testLoadInvalidPluggableDeviceClasses()
+ throws Exception{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeTestDevicePlugin2.class.getCanonicalName());
+
+ String expectedMessage = "Class: " + FakeTestDevicePlugin2.class.getCanonicalName()
+ + " not instance of " + DevicePlugin.class.getCanonicalName();
+ String actualMessage = "";
+ try {
+ nm.init(conf);
+ nm.start();
+ } catch (YarnRuntimeException e) {
+ actualMessage = e.getMessage();
+ }
+ Assert.assertEquals(expectedMessage, actualMessage);
+ }
+
+ @Test(timeout = 30000)
+ public void testLoadDuplicateResourceNameDevicePlugin()
+ throws Exception{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+
+ ResourcePluginManager rpmSpy = spy(rpm);
+ nm = new MyMockNM(rpmSpy);
+
+ conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ true);
+ conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
+ FakeTestDevicePlugin1.class.getCanonicalName() + "," +
+ FakeTestDevicePlugin3.class.getCanonicalName());
+
+ String expectedMessage = "cmpA.com/hdwA" +
+ " has already been registered! Please change a resource type name";
+ String actualMessage = "";
+ try {
+ nm.init(conf);
+ nm.start();
+ } catch (YarnRuntimeException e) {
+ actualMessage = e.getMessage();
+ }
+ Assert.assertEquals(expectedMessage, actualMessage);
+ }
+
+ @Test(timeout = 30000)
+ public void testRequestedResourceNameIsConfigured()
+ throws Exception{
+ ResourcePluginManager rpm = new ResourcePluginManager();
+ String resourceName = "a.com/a";
+ Assert.assertFalse(rpm.isValidAndConfiguredResourceName(resourceName));
+ resourceName = "cmp.com/cmp";
+ Assert.assertTrue(rpm.isValidAndConfiguredResourceName(resourceName));
+ }
+
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java
new file mode 100644
index 00000000000..ec1c2aaca99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Used only for testing.
+ * A fake normal vendor plugin
+ * */
+public class FakeTestDevicePlugin1 implements DevicePlugin {
+ @Override
+ public DeviceRegisterRequest getRegisterRequestInfo() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName("cmpA.com/hdwA").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(243)
+ .setMinorNumber(0)
+ .setBusID("0000:65:00.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) {
+ return null;
+ }
+
+
+ @Override
+ public void onDevicesReleased(Set allocatedDevices) {
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java
new file mode 100644
index 00000000000..8c1a61dc21c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+/**
+ * Only used for testing.
+ * This isn't a implementation of DevicePlugin
+ * */
+public class FakeTestDevicePlugin2 {
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java
new file mode 100644
index 00000000000..f7677314db6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Only used for testing
+ * This plugin register a same name with FakeTestDevicePlugin1
+ * */
+public class FakeTestDevicePlugin3 implements DevicePlugin {
+ @Override
+ public DeviceRegisterRequest getRegisterRequestInfo() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName("cmpA.com/hdwA").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(243)
+ .setMinorNumber(0)
+ .setBusID("0000:65:00.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) {
+ return null;
+ }
+
+ @Override
+ public void onDevicesReleased(Set allocatedDevices) {
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java
new file mode 100644
index 00000000000..9e21087251b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
+
+
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.*;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.verify;
+
+public class TestDevicePluginAdapter {
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(TestDevicePluginAdapter.class);
+
+ private YarnConfiguration conf;
+ private String tempResourceTypesFile;
+ private CGroupsHandler mockCGroupsHandler;
+ private PrivilegedOperationExecutor mockPrivilegedExecutor;
+ private NodeManager nm;
+
+ @Before
+ public void setup() throws Exception {
+ this.conf = new YarnConfiguration();
+ // setup resource-types.xml
+ ResourceUtils.resetResourceTypes();
+ String resourceTypesFile = "resource-types-pluggable-devices.xml";
+ this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile);
+ mockCGroupsHandler = mock(CGroupsHandler.class);
+ mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ // cleanup resource-types.xml
+ File dest = new File(this.tempResourceTypesFile);
+ if (dest.exists()) {
+ dest.delete();
+ }
+ if (nm != null) {
+ try {
+ ServiceOperations.stop(nm);
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+
+
+ /**
+ * Use the MyPlugin which doesn't implement scheduler interfaces
+ * Plugin's initialization is tested in
+ * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.TestResourcePluginManager}
+ *
+ * */
+ @Test
+ public void testBasicWorkflow()
+ throws YarnException, IOException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService storeService = mock(NMStateStoreService.class);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doNothing().when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ MyPlugin plugin = new MyPlugin();
+ MyPlugin spyPlugin = spy(plugin);
+ String resourceName = MyPlugin.resourceName;
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(
+ resourceName,
+ spyPlugin, dsm);
+ // Bootstrap, adding device
+ adapter.initialize(context);
+ adapter.createResourceHandler(context,
+ mockCGroupsHandler, mockPrivilegedExecutor);
+ adapter.getDeviceResourceHandler().bootstrap(conf);
+ int size = dsm.getAvailableDevices(resourceName);
+ Assert.assertEquals(3, size);
+
+ // A container c1 requests 1 device
+ Container c1 = mockContainerWithDeviceRequest(0,
+ resourceName,
+ 1,false);
+ // preStart
+ adapter.getDeviceResourceHandler().preStart(c1);
+ // check book keeping
+ Assert.assertEquals(2,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(1,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // postComplete
+ adapter.getDeviceResourceHandler().postComplete(getContainerId(0));
+ Assert.assertEquals(3,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+
+ // A container c2 requests 3 device
+ Container c2 = mockContainerWithDeviceRequest(1,
+ resourceName,
+ 3,false);
+ // preStart
+ adapter.getDeviceResourceHandler().preStart(c2);
+ // check book keeping
+ Assert.assertEquals(0,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(3,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // postComplete
+ adapter.getDeviceResourceHandler().postComplete(getContainerId(1));
+ Assert.assertEquals(3,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ }
+
+ /**
+ * Use {@link FakeDevicePlugin} which implements
+ * {@link org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler}
+ * interface.
+ *
+ * */
+ @Test
+ public void testBasicWorkflowWithPluginAndPluginScheduler()
+ throws YarnException, IOException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService storeService = mock(NMStateStoreService.class);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doNothing().when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ FakeDevicePlugin plugin = new FakeDevicePlugin();
+ FakeDevicePlugin spyPlugin = spy(plugin);
+ String resourceName = FakeDevicePlugin.resourceName;
+ // Add customized device plugin scheduler
+ dsm.setPreferCustomizedScheduler(true);
+ dsm.addDevicePluginScheduler(resourceName,spyPlugin);
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(
+ resourceName,
+ spyPlugin, dsm);
+ // Bootstrap, adding device
+ adapter.initialize(context);
+ adapter.createResourceHandler(context,
+ mockCGroupsHandler, mockPrivilegedExecutor);
+ adapter.getDeviceResourceHandler().bootstrap(conf);
+ int size = dsm.getAvailableDevices(resourceName);
+ Assert.assertEquals(1, size);
+
+ // A container requests 1 device
+ Container c1 = mockContainerWithDeviceRequest(0,
+ resourceName,
+ 1,false);
+ // preStart
+ adapter.getDeviceResourceHandler().preStart(c1);
+ // check if the plugin's own scheduler works
+ verify(spyPlugin, times(1))
+ .allocateDevices(isA(Set.class),isA(Integer.class));
+ // check book keeping
+ Assert.assertEquals(0,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(1,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(1,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // postComplete
+ adapter.getDeviceResourceHandler().postComplete(getContainerId(0));
+ Assert.assertEquals(1,
+ dsm.getAvailableDevices(resourceName));
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(1,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ }
+
+ @Test
+ public void testStoreDeviceSchedulerManagerState()
+ throws IOException, YarnException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService realStoreService = new NMMemoryStateStoreService();
+ NMStateStoreService storeService = spy(realStoreService);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doNothing().when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ MyPlugin plugin = new MyPlugin();
+ MyPlugin spyPlugin = spy(plugin);
+ String resourceName = MyPlugin.resourceName;
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(
+ resourceName,
+ spyPlugin, dsm);
+ // Bootstrap, adding device
+ adapter.initialize(context);
+ adapter.createResourceHandler(context,
+ mockCGroupsHandler, mockPrivilegedExecutor);
+ adapter.getDeviceResourceHandler().bootstrap(conf);
+
+ // A container c0 requests 1 device
+ Container c0 = mockContainerWithDeviceRequest(0,
+ resourceName,
+ 1,false);
+ // preStart
+ adapter.getDeviceResourceHandler().preStart(c0);
+ // ensure container1's resource is persistent
+ verify(storeService).storeAssignedResources(c0, resourceName,
+ Arrays.asList(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:00.0")
+ .setHealthy(true)
+ .build()));
+ }
+
+ @Test
+ public void testRecoverDeviceSchedulerManagerState() throws IOException, YarnException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService realStoreService = new NMMemoryStateStoreService();
+ NMStateStoreService storeService = spy(realStoreService);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doNothing().when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ MyPlugin plugin = new MyPlugin();
+ MyPlugin spyPlugin = spy(plugin);
+ String resourceName = MyPlugin.resourceName;
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(
+ resourceName,
+ spyPlugin, dsm);
+ // Bootstrap, adding device
+ adapter.initialize(context);
+ adapter.createResourceHandler(context,
+ mockCGroupsHandler, mockPrivilegedExecutor);
+ adapter.getDeviceResourceHandler().bootstrap(conf);
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ // mock NMStateStore
+ Device storedDevice = Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:00.0")
+ .setHealthy(true)
+ .build();
+ ConcurrentHashMap runningContainersMap
+ = new ConcurrentHashMap<>();
+ Container nmContainer = mock(Container.class);
+ ResourceMappings rmap = new ResourceMappings();
+ ResourceMappings.AssignedResources ar =
+ new ResourceMappings.AssignedResources();
+ ar.updateAssignedResources(
+ Arrays.asList(storedDevice));
+ rmap.addAssignedResources(resourceName, ar);
+ when(nmContainer.getResourceMappings()).thenReturn(rmap);
+ when(context.getContainers()).thenReturn(runningContainersMap);
+
+ // Test case 1. c0 get recovered. scheduler state restored
+ runningContainersMap.put(getContainerId(0), nmContainer);
+ adapter.getDeviceResourceHandler().reacquireContainer(
+ getContainerId(0));
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ Assert.assertEquals(1,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(2,
+ dsm.getAvailableDevices(resourceName));
+ Map used = dsm.getAllUsedDevices().get(resourceName);
+ Assert.assertTrue(used.keySet().contains(storedDevice));
+
+ // Test case 2. c1 wants get recovered. But stored device is already allocated to c2
+ nmContainer = mock(Container.class);
+ rmap = new ResourceMappings();
+ ar = new ResourceMappings.AssignedResources();
+ ar.updateAssignedResources(
+ Arrays.asList(storedDevice));
+ rmap.addAssignedResources(resourceName, ar);
+ // already assigned to c1
+ runningContainersMap.put(getContainerId(2), nmContainer);
+ boolean caughtException = false;
+ try {
+ adapter.getDeviceResourceHandler().reacquireContainer(getContainerId(1));
+ } catch (ResourceHandlerException e) {
+ caughtException = true;
+ }
+ Assert.assertTrue(
+ "Should fail since requested device is assigned already",
+ caughtException);
+ // don't affect c0 allocation state
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ Assert.assertEquals(1,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(2,
+ dsm.getAvailableDevices(resourceName));
+ used = dsm.getAllUsedDevices().get(resourceName);
+ Assert.assertTrue(used.keySet().contains(storedDevice));
+ }
+
+ @Test
+ public void testAssignedDeviceCleanupWhenStoreOpFails() throws IOException, YarnException {
+ NodeManager.NMContext context = mock(NodeManager.NMContext.class);
+ NMStateStoreService realStoreService = new NMMemoryStateStoreService();
+ NMStateStoreService storeService = spy(realStoreService);
+ when(context.getNMStateStore()).thenReturn(storeService);
+ doThrow(new IOException("Exception ...")).when(storeService).storeAssignedResources(isA(Container.class),
+ isA(String.class),
+ isA(ArrayList.class));
+
+ // Init scheduler manager
+ DeviceSchedulerManager dsm = new DeviceSchedulerManager(context);
+
+ ResourcePluginManager rpm = mock(ResourcePluginManager.class);
+ when(rpm.getDeviceSchedulerManager()).thenReturn(dsm);
+
+ // Init an plugin
+ MyPlugin plugin = new MyPlugin();
+ MyPlugin spyPlugin = spy(plugin);
+ String resourceName = MyPlugin.resourceName;
+ // Init an adapter for the plugin
+ DevicePluginAdapter adapter = new DevicePluginAdapter(
+ resourceName,
+ spyPlugin, dsm);
+ // Bootstrap, adding device
+ adapter.initialize(context);
+ adapter.createResourceHandler(context,
+ mockCGroupsHandler, mockPrivilegedExecutor);
+ adapter.getDeviceResourceHandler().bootstrap(conf);
+
+ // A container c0 requests 1 device
+ Container c0 = mockContainerWithDeviceRequest(0,
+ resourceName,
+ 1,false);
+ // preStart
+ boolean exception = false;
+ try {
+ adapter.getDeviceResourceHandler().preStart(c0);
+ } catch (ResourceHandlerException e) {
+ exception = true;
+ }
+ Assert.assertTrue("Should throw exception in preStart", exception);
+ // no device assigned
+ Assert.assertEquals(3,
+ dsm.getAllAllowedDevices().get(resourceName).size());
+ Assert.assertEquals(0,
+ dsm.getAllUsedDevices().get(resourceName).size());
+ Assert.assertEquals(3,
+ dsm.getAvailableDevices(resourceName));
+
+ }
+
+ private static Container mockContainerWithDeviceRequest(int id,
+ String resourceName,
+ int numDeviceRequest,
+ boolean dockerContainerEnabled) {
+ Container c = mock(Container.class);
+ when(c.getContainerId()).thenReturn(getContainerId(id));
+
+ Resource res = Resource.newInstance(1024, 1);
+ ResourceMappings resMapping = new ResourceMappings();
+
+ res.setResourceValue(resourceName, numDeviceRequest);
+ when(c.getResource()).thenReturn(res);
+ when(c.getResourceMappings()).thenReturn(resMapping);
+
+ ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+ Map env = new HashMap<>();
+ if (dockerContainerEnabled) {
+ env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE,
+ ContainerRuntimeConstants.CONTAINER_RUNTIME_DOCKER);
+ }
+ when(clc.getEnvironment()).thenReturn(env);
+ when(c.getLaunchContext()).thenReturn(clc);
+ return c;
+ }
+
+ private static ContainerId getContainerId(int id) {
+ return ContainerId.newContainerId(ApplicationAttemptId
+ .newInstance(ApplicationId.newInstance(1234L, 1), 1), id);
+ }
+
+
+ private class MyPlugin implements DevicePlugin {
+ private final static String resourceName = "cmpA.com/hdwA";
+ @Override
+ public DeviceRegisterRequest getRegisterRequestInfo() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName(resourceName)
+ .setPluginVersion("v1.0").build();
+ }
+
+ @Override
+ public Set getDevices() {
+ TreeSet r = new TreeSet<>();
+ r.add(Device.Builder.newInstance()
+ .setID(0)
+ .setDevPath("/dev/hdwA0")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:00.0")
+ .setHealthy(true)
+ .build());
+ r.add(Device.Builder.newInstance()
+ .setID(1)
+ .setDevPath("/dev/hdwA1")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:01.0")
+ .setHealthy(true)
+ .build());
+ r.add(Device.Builder.newInstance()
+ .setID(2)
+ .setDevPath("/dev/hdwA2")
+ .setMajorNumber(256)
+ .setMinorNumber(0)
+ .setBusID("0000:80:02.0")
+ .setHealthy(true)
+ .build());
+ return r;
+ }
+
+ @Override
+ public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) {
+ return null;
+ }
+
+ @Override
+ public void onDevicesReleased(Set releasedDevices) {
+
+ }
+ } // MyPlugin
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml
new file mode 100644
index 00000000000..65b8e752f1c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml
@@ -0,0 +1,7 @@
+
+
+
+ yarn.resource-types
+ cmp.com/cmp,cmpA.com/hdwA
+
+
\ No newline at end of file