diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ce38d2762b2..ae0fa347f39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1605,6 +1605,27 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_RESOURCE_PLUGINS = NM_PREFIX + "resource-plugins"; + /** + * This setting controls if pluggable device plugin framework is enabled. + * */ + @Private + public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED = + NM_PREFIX + "pluggable-device-framework.enabled"; + + /** + * The pluggable device plugin framework is disabled by default + * */ + @Private + public static final boolean DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED = false; + + /** + * This settings contains vendor plugin class names for + * device plugin framework to load. Split by comma + * */ + @Private + public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES = + NM_PREFIX + "pluggable-device-framework.device-classes"; + /** * Prefix for gpu configurations. Work in progress: This configuration * parameter may be changed/removed in the future. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1360e73f7fd..1671dcad970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3770,6 +3770,25 @@ false + + + This settings controls if pluggable device framework is enabled. + Disabled by default + + yarn.nodemanager.pluggable-device-framework.enabled + false + + + + + Configure vendor device plugin class name here. Comma separated. + The class must be found in CLASSPATH. The pluggable device framework will + load these classes. + + yarn.nodemanager.pluggable-device-framework.device-classes + + + When yarn.nodemanager.resource.gpu.allowed-gpu-devices=auto specified, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java new file mode 100644 index 00000000000..dfec70f5608 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +public class Device implements Serializable, Comparable { + + private static final long serialVersionUID = 1L; + + /** + * Required fields: + * ID: an plugin specified index number + * majorNumber: major device number + * minorNumber: minor device number + * isHealthy: true or false indicating device health status + * + * Optional fields: + * devPath: device file like /dev/devicename + * busID: PCI Bus ID in format [[[[]:]]:][][.[]]. Can get from "lspci -D" + * topology: describe connection to other devices + * Status: For future use + * */ + private final Integer ID; + private final String devPath; + private final Integer majorNumber; + private final Integer minorNumber; + private final String busID; + private boolean isHealthy; + + /** + * Optional fields + * */ + private String status; + + /** + * A {@code Device} has a topology which + * represent the links to other {@code Device}s + * */ + private final Set topology; + + private Device(Builder builder) { + this.ID = Objects.requireNonNull(builder.ID); + this.devPath = builder.devPath; + this.majorNumber = Objects.requireNonNull(builder.majorNumber); + this.minorNumber = Objects.requireNonNull(builder.minorNumber); + this.busID = builder.busID; + this.isHealthy = Objects.requireNonNull(builder.isHealthy); + this.topology = builder.topology; + this.status = builder.status; + } + + public Integer getID() { + return ID; + } + + public String getDevPath() { + return devPath; + } + + public Integer getMajorNumber() { + return majorNumber; + } + + public Integer getMinorNumber() { + return minorNumber; + } + + public String getBusID() { + return busID; + } + + public boolean isHealthy() { + return isHealthy; + } + + public String getStatus() { + return status; + } + + public Set getTopology() { + return topology; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + Device device = (Device) o; + return Objects.equals(ID, device.getID()) && + Objects.equals(devPath, device.getDevPath()) && + Objects.equals(majorNumber, device.getMajorNumber()) && + Objects.equals(minorNumber, device.getMinorNumber()) && + Objects.equals(busID, device.getBusID()); + } + + @Override + public int hashCode() { + return Objects.hash(ID, devPath, majorNumber, minorNumber, busID); + } + + @Override + public int compareTo(Object o) { + if (o == null || (!(o instanceof Device))) { + return -1; + } + + Device other = (Device) o; + + int result = Integer.compare(ID, other.getID()); + if (0 != result) { + return result; + } + + result = Integer.compare(majorNumber, other.getMajorNumber()); + if (0 != result) { + return result; + } + + result = Integer.compare(minorNumber, other.getMinorNumber()); + if (0 != result) { + return result; + } + + result = devPath.compareTo(other.getDevPath()); + if (0 != result) { + return result; + } + + return busID.compareTo(other.getBusID()); + } + + @Override + public String toString() { + return "(" + getID() + ", " + getDevPath() + ", " + getMajorNumber() + ":" + getMinorNumber() + ")"; + } + + public static class Builder { + private Integer ID; + private String devPath = ""; + private Integer majorNumber; + private Integer minorNumber; + private String busID = ""; + private boolean isHealthy; + private String status = ""; + private Set topology; + + private Builder() { + topology = new TreeSet<>(); + } + + public static Builder newInstance() { + return new Builder(); + } + + public Device build(){ + return new Device(this); + } + + public Builder setID(Integer ID) { + this.ID = ID; + return this; + } + + public Builder setDevPath(String devPath) { + this.devPath = devPath; + return this; + } + + public Builder setMajorNumber(Integer majorNumber) { + this.majorNumber = majorNumber; + return this; + } + + public Builder setMinorNumber(Integer minorNumber) { + this.minorNumber = minorNumber; + return this; + } + + public Builder setBusID(String busID) { + this.busID = busID; + return this; + } + + public Builder setHealthy(boolean healthy) { + isHealthy = healthy; + return this; + } + + public Builder setStatus(String status) { + this.status = status; + return this; + } + + public Builder addDeviceLink(DeviceLink link) { + this.topology.add(link); + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLink.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLink.java new file mode 100644 index 00000000000..47444eac194 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLink.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class DeviceLink implements Serializable, Comparable { + + private static final long serialVersionUID = 1L; + + String busId; + DeviceLinkType linkType; + + public String getBusId() { + return busId; + } + + public DeviceLinkType getLinkType() { + return linkType; + } + + private DeviceLink(Builder builder) { + this.busId = builder.busId; + this.linkType = builder.linkType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + DeviceLink other = (DeviceLink) o; + return Objects.equals(other.getBusId(), busId) && + Objects.equals(other.getLinkType(), linkType); + } + + @Override + public int hashCode() { + return Objects.hash(busId, linkType); + } + + @Override + public int compareTo(Object o) { + if (o == null || (!(o instanceof DeviceLink))) { + return -1; + } + DeviceLink other = (DeviceLink) o; + + return other.linkType.compareTo(getLinkType()); + } + + public static class Builder { + String busId; + DeviceLinkType linkType; + + private Builder(){} + public static Builder newInstance() { + return new Builder(); + } + + public DeviceLink build() { + return new DeviceLink(this); + } + + public Builder setBusId(String busId) { + this.busId = busId; + return this; + } + + public Builder setLinkType(DeviceLinkType linkType) { + this.linkType = linkType; + return this; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLinkType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLinkType.java new file mode 100644 index 00000000000..ccbd6f5c8ff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceLinkType.java @@ -0,0 +1,40 @@ +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +public enum DeviceLinkType { + /** + * For Nvdia GPU NVLink + * */ + P2PLinkNVLink(4), + + /** + * Connected to same CPU (Same NUMA node) + * */ + P2PLinkSameCPU(3), + + /** + * Cross CPU through socket-level link (e.g. QPI). + * Usually cross NUMA node + * */ + P2PLinkCrossCPU(2), + + /** + * Just need to traverse one PCIe switch to talk + * */ + P2PLinkSingleSwitch(1), + + /** + * Need to traverse multiple PCIe switch to talk + * */ + P2PLinkMultiSwitch(0); + + // A higher link level means faster communication + private int linkLevel; + + public int getLinkLevel() { + return linkLevel; + } + + DeviceLinkType(int linkLevel) { + this.linkLevel = linkLevel; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java new file mode 100644 index 00000000000..a4898417f05 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Set; + +/** + * A must interface for vendor plugin to implement. + * */ +public interface DevicePlugin { + /** + * Called first when device plugin framework wants to register + * @return DeviceRegisterRequest {@link DeviceRegisterRequest} + * */ + DeviceRegisterRequest getRegisterRequestInfo(); + + /** + * Called when update node resource + * @return a set of {@link Device}, {@link java.util.TreeSet} recommended + * */ + Set getDevices(); + + /** + * Asking how these devices should be prepared/used + * before/when container launch. A plugin can do some tasks in its own or + * define it in DeviceRuntimeSpec to let the framework do it. + * For instance, define {@code VolumeSpec} to let the + * framework to create volume before running container. + * + * @param allocatedDevices A set of allocated {@link Device}. + * @param yarnRuntime Indicate which runtime YARN will use + * Could be {@code docker} or {@code default} + * in {@link DeviceRuntimeSpec} constants + * @return a {@link DeviceRuntimeSpec} description about environment, + * {@link VolumeSpec}, {@link MountVolumeSpec}. etc + * */ + DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, + String yarnRuntime); + + /** + * Called after device released. + * */ + void onDevicesReleased(Set releasedDevices); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java new file mode 100644 index 00000000000..414cc4ace86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Set; + +/** + * An optional interface to implement if custom device scheduling is needed. + * If this is not implemented, the device framework will do. + * */ +public interface DevicePluginScheduler { + /** + * Called when allocating devices. The framework will do all device book keeping + * and fail recovery. So this hook should only do scheduling based on available devices + * passed in. This method could be invoked multiple times. + * @param availableDevices Devices allowed to be chosen from. + * @param count Number of device to be allocated. + * @return a set of {@link Device} + * */ + Set allocateDevices(Set availableDevices, Integer count); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java new file mode 100644 index 00000000000..7753ad1efdf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Objects; + +/** + * A vendor device plugin use this object to register + * to NM device plugin framework + * */ +public class DeviceRegisterRequest { + + // plugin's own version + private final String pluginVersion; + private final String resourceName; + + public DeviceRegisterRequest(Builder builder) { + this.resourceName = Objects.requireNonNull(builder.resourceName); + this.pluginVersion = builder.pluginVersion; + } + + public String getResourceName() { + return resourceName; + } + + public String getPluginVersion() { + return pluginVersion; + } + + + public static class Builder { + private String pluginVersion; + private String resourceName; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public DeviceRegisterRequest build() { + return new DeviceRegisterRequest(this); + } + + public Builder setResourceName(String resourceName) { + this.resourceName = resourceName; + return this; + } + + public Builder setPluginVersion(String pluginVersion) { + this.pluginVersion = pluginVersion; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java new file mode 100644 index 00000000000..f7aa80e0161 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.*; + +public class DeviceRuntimeSpec { + + /** + * The containerRuntime gives device framework a hint (not forced to) + * on which container containerRuntime can use this Spec + * (if empty then default "runc" is used). + * For instance, it could be "nvidia" in Nvidia GPU Docker v2. + * The "nvidia" will be passed as a parameter to docker run + * with --runtime "nvidia" + * + * If cgroups, these fields could be empty if no special requirement + * since the framework already knows major and minor device number + * in {@link Device}. + * If docker, these fields below should be populated as needed + */ + private final String containerRuntime; + private final Map envs; + private final Set volumeMounts; + private final Set deviceMounts; + private final Set volumeClaims; + + public final static String RUNTIME_CGROUPS = "cgroups"; + public final static String RUNTIME_DOCKER = "docker"; + + private DeviceRuntimeSpec(Builder builder) { + this.containerRuntime = builder.containerRuntime; + this.deviceMounts = builder.deviceMounts; + this.envs = builder.envs; + this.volumeClaims = builder.volumeClaims; + this.volumeMounts = builder.volumeMounts; + } + + public String getContainerRuntime() { + return containerRuntime; + } + + public Map getEnvs() { + return envs; + } + + public Set getVolumeMounts() { + return volumeMounts; + } + + public Set getDeviceMounts() { + return deviceMounts; + } + + public Set getVolumeClaims() { + return volumeClaims; + } + public static class Builder { + + private String containerRuntime; + private Map envs; + private Set volumeMounts; + private Set deviceMounts; + private Set volumeClaims; + + private Builder() { + containerRuntime = DeviceRuntimeSpec.RUNTIME_DOCKER; + envs = new HashMap<>(); + volumeClaims = new TreeSet<>(); + deviceMounts = new TreeSet<>(); + volumeMounts = new TreeSet<>(); + } + + public static Builder newInstance() { + return new Builder(); + } + + public DeviceRuntimeSpec build() { + return new DeviceRuntimeSpec(this); + } + + public Builder setContainerRuntime(String containerRuntime) { + this.containerRuntime = containerRuntime; + return this; + } + + public Builder addVolumeSpec(VolumeSpec spec) { + this.volumeClaims.add(spec); + return this; + } + + public Builder addMountVolumeSpec(MountVolumeSpec spec) { + this.volumeMounts.add(spec); + return this; + } + + public Builder addMountDeviceSpec(MountDeviceSpec spec) { + this.deviceMounts.add(spec); + return this; + } + + public Builder addEnv(String key, String value) { + this.envs.put(Objects.requireNonNull(key), + Objects.requireNonNull(value)); + return this; + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java new file mode 100644 index 00000000000..8393558d627 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class MountDeviceSpec implements Serializable, Comparable { + + private final String devicePathInHost; + private final String devicePathInContainer; + + // r for only read, rw can do read and write + private final String devicePermission; + + public final static String RO = "r"; + public final static String RW = "rw"; + + private MountDeviceSpec(Builder builder) { + this.devicePathInContainer = builder.devicePathInContainer; + this.devicePathInHost = builder.devicePathInHost; + this.devicePermission = builder.devicePermission; + } + + public String getDevicePathInHost() { + return devicePathInHost; + } + + public String getDevicePathInContainer() { + return devicePathInContainer; + } + + public String getDevicePermission() { + return devicePermission; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + MountDeviceSpec other = (MountDeviceSpec) o; + return Objects.equals(devicePathInHost, other.getDevicePathInHost()) && + Objects.equals(devicePathInContainer, other.getDevicePathInContainer()) && + Objects.equals(devicePermission, other.getDevicePermission()); + } + + @Override + public int hashCode() { + return Objects.hash(devicePathInContainer, + devicePathInHost, devicePermission); + } + + @Override + public int compareTo(Object o) { + return 0; + } + + public static class Builder { + private String devicePathInHost; + private String devicePathInContainer; + private String devicePermission; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public MountDeviceSpec build() { + return new MountDeviceSpec(this); + } + + public Builder setDevicePermission(String permission) { + this.devicePermission = permission; + return this; + } + + public Builder setDevicePathInContainer(String devicePathInContainer) { + this.devicePathInContainer = devicePathInContainer; + return this; + } + + public Builder setDevicePathInHost(String devicePathInHost) { + this.devicePathInHost = devicePathInHost; + return this; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java new file mode 100644 index 00000000000..b19228212d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class MountVolumeSpec implements Serializable, Comparable{ + + private static final long serialVersionUID = 1L; + + // host path or volume name + private final String hostPath; + + // path in the container + private final String mountPath; + + // if true, data in mountPath can only be read + // "-v hostPath:mountPath:ro" + private final Boolean isReadOnly; + + public final static String READONLYOPTION = "ro"; + + private MountVolumeSpec (Builder builder) { + this.hostPath = builder.hostPath; + this.mountPath = builder.mountPath; + this.isReadOnly = builder.isReadOnly; + } + + public String getHostPath() { + return hostPath; + } + + public String getMountPath() { + return mountPath; + } + + public Boolean getReadOnly() { + return isReadOnly; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + MountVolumeSpec other = (MountVolumeSpec) o; + return Objects.equals(hostPath, other.getHostPath()) && + Objects.equals(mountPath, other.getMountPath()) && + Objects.equals(isReadOnly, other.getReadOnly()); + } + + @Override + public int hashCode() { + return Objects.hash(hostPath, mountPath, isReadOnly); + } + + @Override + public int compareTo(Object o) { + return 0; + } + + public static class Builder { + private String hostPath; + private String mountPath; + private Boolean isReadOnly; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public MountVolumeSpec build() { + return new MountVolumeSpec(this); + } + + public Builder setHostPath(String hostPath) { + this.hostPath = hostPath; + return this; + } + + public Builder setMountPath(String mountPath) { + this.mountPath = mountPath; + return this; + } + + public Builder setReadOnly(Boolean readOnly) { + isReadOnly = readOnly; + return this; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java new file mode 100644 index 00000000000..32097b9e46b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class VolumeSpec implements Serializable, Comparable { + + private static final long serialVersionUID = 1L; + + private final String volumeDriver; + private final String volumeName; + private final String volumeOperation; + + public final static String CREATE = "create"; + public final static String DELETE = "delete"; + + private VolumeSpec(Builder builder) { + this.volumeDriver = builder.volumeDriver; + this.volumeName = builder.volumeName; + this.volumeOperation = builder.volumeOperation; + } + + public String getVolumeDriver() { + return volumeDriver; + } + + public String getVolumeName() { + return volumeName; + } + + public String getVolumeOperation() { + return volumeOperation; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + VolumeSpec other = (VolumeSpec) o; + return Objects.equals(volumeDriver, other.getVolumeDriver()) && + Objects.equals(volumeName, other.getVolumeName()) && + Objects.equals(volumeOperation, other.getVolumeOperation()); + } + + @Override + public int hashCode() { + return Objects.hash(volumeDriver, volumeName,volumeOperation); + } + + @Override + public int compareTo(Object o) { + return 0; + } + + public static class Builder { + private String volumeDriver; + private String volumeName; + private String volumeOperation; + + private Builder(){} + + public static Builder newInstance () { + return new Builder(); + } + + public VolumeSpec build() { + return new VolumeSpec(this); + } + + public Builder setVolumeDriver(String volumeDriver) { + this.volumeDriver = volumeDriver; + return this; + } + + public Builder setVolumeName(String volumeName) { + this.volumeName = volumeName; + return this; + } + + public Builder setVolumeOperation(String volumeOperation) { + this.volumeOperation = volumeOperation; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java index 2cfa9c51dca..7a7d28a4048 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java @@ -439,32 +439,6 @@ private String runDockerVolumeCommand(DockerVolumeCommand dockerVolumeCommand, @Override public void prepareContainer(ContainerRuntimeContext ctx) throws ContainerExecutionException { - Container container = ctx.getContainer(); - - // Create volumes when needed. - if (nmContext != null - && nmContext.getResourcePluginManager().getNameToPlugins() != null) { - for (ResourcePlugin plugin : nmContext.getResourcePluginManager() - .getNameToPlugins().values()) { - DockerCommandPlugin dockerCommandPlugin = - plugin.getDockerCommandPluginInstance(); - if (dockerCommandPlugin != null) { - DockerVolumeCommand dockerVolumeCommand = - dockerCommandPlugin.getCreateDockerVolumeCommand( - ctx.getContainer()); - if (dockerVolumeCommand != null) { - runDockerVolumeCommand(dockerVolumeCommand, container); - - // After volume created, run inspect to make sure volume properly - // created. - if (dockerVolumeCommand.getSubCommand().equals( - DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) { - checkDockerVolumeCreated(dockerVolumeCommand, container); - } - } - } - } - } } private void checkDockerVolumeCreated( @@ -1005,14 +979,30 @@ public void launchContainer(ContainerRuntimeContext ctx) } } - // use plugins to update docker run command. + // use plugins to create volume and update docker run command. if (nmContext != null && nmContext.getResourcePluginManager().getNameToPlugins() != null) { for (ResourcePlugin plugin : nmContext.getResourcePluginManager() .getNameToPlugins().values()) { DockerCommandPlugin dockerCommandPlugin = plugin.getDockerCommandPluginInstance(); + if (dockerCommandPlugin != null) { + // Create volumes when needed. + DockerVolumeCommand dockerVolumeCommand = + dockerCommandPlugin.getCreateDockerVolumeCommand( + ctx.getContainer()); + if (dockerVolumeCommand != null) { + runDockerVolumeCommand(dockerVolumeCommand, container); + + // After volume created, run inspect to make sure volume properly + // created. + if (dockerVolumeCommand.getSubCommand().equals( + DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND)) { + checkDockerVolumeCreated(dockerVolumeCommand, container); + } + } + // Update cmd dockerCommandPlugin.updateDockerRunCommand(runCommand, container); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index f28aad206a6..c720bbe762e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -18,17 +18,29 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceSchedulerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -49,14 +61,15 @@ private Map configuredPlugins = Collections.emptyMap(); + private DeviceSchedulerManager deviceSchedulerManager = null; + public synchronized void initialize(Context context) - throws YarnException { + throws YarnException, ClassNotFoundException { Configuration conf = context.getConf(); - String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); + Map pluginMap = new HashMap<>(); + String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); if (plugins != null) { - Map pluginMap = new HashMap<>(); - // Initialize each plugins for (String resourceName : plugins) { resourceName = resourceName.trim(); @@ -92,9 +105,141 @@ public synchronized void initialize(Context context) plugin.initialize(context); pluginMap.put(resourceName, plugin); } + } + // Try to load pluggable device plugins + boolean puggableDeviceFrameworkEnabled = conf.getBoolean( + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + + if (puggableDeviceFrameworkEnabled) { + initializePluggableDevicePlugins(context, conf, pluginMap); + } else { + LOG.info("The pluggable device framework is not enabled. If you want, please set true to {}", + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + } + configuredPlugins = Collections.unmodifiableMap(pluginMap); + } - configuredPlugins = Collections.unmodifiableMap(pluginMap); + public void initializePluggableDevicePlugins(Context context, + Configuration configuration, + Map pluginMap) + throws YarnRuntimeException, ClassNotFoundException { + LOG.info("The pluggable device framework enabled, trying to load the vendor plugins"); + deviceSchedulerManager = new DeviceSchedulerManager(context); + String[] pluginClassNames = configuration.getStrings( + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES); + if (null == pluginClassNames) { + throw new YarnRuntimeException("Null value found in configuration: " + + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES); } + + for (String pluginClassName : pluginClassNames) { + Class pluginClazz = Class.forName(pluginClassName); + if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) { + throw new YarnRuntimeException("Class: " + pluginClassName + + " not instance of " + DevicePlugin.class.getCanonicalName()); + } + // sanity-check before initialization + checkInterfaceCompatibility(DevicePlugin.class, pluginClazz); + + DevicePlugin dpInstance = (DevicePlugin) ReflectionUtils.newInstance(pluginClazz, + configuration); + + // Try to register plugin + // TODO: handle the plugin method timeout issue + DeviceRegisterRequest request = dpInstance.getRegisterRequestInfo(); + String resourceName = request.getResourceName(); + // check if someone has already registered this resource type name + if (pluginMap.containsKey(resourceName)) { + throw new YarnRuntimeException(resourceName + + " has already been registered! Please change a resource type name"); + } + // check resource name is valid and configured in resource-types.xml + if (!isValidAndConfiguredResourceName(resourceName)) { + throw new YarnRuntimeException(resourceName + + " is not configured inside " + + YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE + + " , please configure it first"); + } + LOG.info("New resource type: {} registered successfully by {}", + resourceName, + pluginClassName); + DevicePluginAdapter pluginAdapter = new DevicePluginAdapter( + resourceName, dpInstance, deviceSchedulerManager); + LOG.info("Adapter of {} created. Initializing..", pluginClassName); + try { + pluginAdapter.initialize(context); + } catch (YarnException e) { + throw new YarnRuntimeException("Adapter of " + pluginClassName + " init failed!"); + } + LOG.info("Adapter of {} init success!", pluginClassName); + // Store plugin as adapter instance + pluginMap.put(request.getResourceName(), pluginAdapter); + // If the device plugin implements DevicePluginScheduler interface + if (dpInstance instanceof DevicePluginScheduler) { + // check DevicePluginScheduler interface compatibility + checkInterfaceCompatibility(DevicePluginScheduler.class, pluginClazz); + LOG.info("{} can schedule {} devices. Added as preferred device plugin scheduler", + pluginClassName, + resourceName); + deviceSchedulerManager.addDevicePluginScheduler( + resourceName, + (DevicePluginScheduler)dpInstance); + } + } // end for + } + + // Check if the implemented interfaces' signature is compatible + private void checkInterfaceCompatibility(Class expectedClass, Class actualClass) + throws YarnRuntimeException{ + LOG.debug("Checking implemented interface's compatibility: \"{}\"", + expectedClass.getSimpleName()); + Method[] expectedDevicePluginMethods = expectedClass.getMethods(); + + // Check method compatibility + boolean found; + for (Method method: expectedDevicePluginMethods) { + found = false; + LOG.debug("Try to find method: \"{}\"", + method.getName()); + for (Method m : actualClass.getDeclaredMethods()) { + if (m.getName().equals(method.getName())) { + LOG.debug("Method \"{}\" found in class \"{}\"", + actualClass.getSimpleName(), + m.getName()); + found = true; + break; + } + } + if (!found) { + LOG.info("Method \"{}\" is not found in plugin", + method.getName() + ); + throw new YarnRuntimeException( + "Method \"" + method.getName() + + "\" is expected but not implemented in " + + actualClass.getCanonicalName() + ); + } + }// end for + LOG.debug("\"{}\" compatibility is ok..", + expectedClass.getSimpleName()); + } + + @VisibleForTesting + public boolean isValidAndConfiguredResourceName(String resourceName) { + // check pattern match + // check configured + Map configuredResourceTypes = + ResourceUtils.getResourceTypes(); + if (!configuredResourceTypes.containsKey(resourceName)) { + return false; + } + return true; + } + + public DeviceSchedulerManager getDeviceSchedulerManager() { + return deviceSchedulerManager; } public synchronized void cleanup() throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/AssignedDevice.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/AssignedDevice.java new file mode 100644 index 00000000000..caf57928dde --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/AssignedDevice.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; + +import java.io.Serializable; +import java.util.Objects; + +public class AssignedDevice implements Serializable,Comparable { + + private static final long serialVersionUID = 1L; + + + Device device; + String containerId; + + public AssignedDevice(ContainerId containerId, Device device) { + this.device = device; + this.containerId = containerId.toString(); + } + + public Device getDevice() { + return device; + } + + public String getContainerId() { + return containerId; + } + + @Override + public int compareTo(Object o) { + if (o == null || !(o instanceof AssignedDevice)) { + return -1; + } + AssignedDevice other = (AssignedDevice) o; + int result = getDevice().compareTo(other.getDevice()); + if (0 != result) { + return result; + } + return getContainerId().compareTo(other.getContainerId()); + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof AssignedDevice)) { + return false; + } + AssignedDevice other = (AssignedDevice) o; + return getDevice().equals(other.getDevice()) && + getContainerId().equals(other.getContainerId()); + } + + @Override + public int hashCode() { + return Objects.hash(getDevice(),getContainerId()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java new file mode 100644 index 00000000000..3b2e1ef8501 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMDeviceResourceInfo; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo; + +import java.util.*; + + +/** + * The {@link DevicePluginAdapter} will adapt existing hooks into vendor plugin's logic. + * It decouples the vendor plugin from YARN's device framework + * + * */ +public class DevicePluginAdapter implements ResourcePlugin { + final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class); + + private String resourceName; + private DevicePlugin devicePlugin; + + private DeviceSchedulerManager deviceSchedulerManager; + + private DeviceResourceDockerRuntimePluginImpl deviceDockerCommandPlugin; + + private DeviceResourceHandlerImpl deviceResourceHandler; + private DeviceResourceUpdaterImpl deviceResourceUpdater; + + public DevicePluginAdapter(String name, DevicePlugin dp, + DeviceSchedulerManager dsm) { + deviceSchedulerManager = dsm; + resourceName = name; + devicePlugin = dp; + } + + public DeviceSchedulerManager getDeviceSchedulerManager() { + return deviceSchedulerManager; + } + + @Override + public void initialize(Context context) throws YarnException { + deviceDockerCommandPlugin = new DeviceResourceDockerRuntimePluginImpl( + resourceName, + devicePlugin, this); + deviceResourceUpdater = new DeviceResourceUpdaterImpl( + resourceName, devicePlugin); + LOG.info(resourceName + " plugin adapter initialized"); + return; + } + + @Override + public ResourceHandler createResourceHandler(Context nmContext, CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperationExecutor) { + this.deviceResourceHandler = new DeviceResourceHandlerImpl(resourceName, + devicePlugin, this, deviceSchedulerManager, + cGroupsHandler, privilegedOperationExecutor); + return deviceResourceHandler; + } + + @Override + public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() { + return deviceResourceUpdater; + } + + @Override + public void cleanup() { + + } + + @Override + public DockerCommandPlugin getDockerCommandPluginInstance() { + return deviceDockerCommandPlugin; + } + + @Override + public NMResourceInfo getNMResourceInfo() throws YarnException { + List allowed = new ArrayList<>( + deviceSchedulerManager.getAllAllowedDevices().get(resourceName)); + List assigned = new ArrayList<>(); + Map assignedMap = + deviceSchedulerManager.getAllUsedDevices().get(resourceName); + for (Map.Entry entry : assignedMap.entrySet()) { + assigned.add(new AssignedDevice(entry.getValue(), + entry.getKey())); + } + return new NMDeviceResourceInfo(allowed, assigned); + } + + public DeviceResourceHandlerImpl getDeviceResourceHandler() { + return deviceResourceHandler; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceDockerRuntimePluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceDockerRuntimePluginImpl.java new file mode 100644 index 00000000000..4a064bce709 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceDockerRuntimePluginImpl.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; + +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +public class DeviceResourceDockerRuntimePluginImpl + implements DockerCommandPlugin { + + final static Log LOG = LogFactory.getLog( + DeviceResourceDockerRuntimePluginImpl.class); + + private String resourceName; + private DevicePlugin devicePlugin; + private DevicePluginAdapter devicePluginAdapter; + + private Map> cachedAllocation = + new ConcurrentHashMap(); + + private Map cachedSpec = + new ConcurrentHashMap<>(); + + public DeviceResourceDockerRuntimePluginImpl(String resourceName, + DevicePlugin devicePlugin, DevicePluginAdapter devicePluginAdapter) { + this.resourceName = resourceName; + this.devicePlugin = devicePlugin; + this.devicePluginAdapter = devicePluginAdapter; + } + + @Override + public void updateDockerRunCommand(DockerRunCommand dockerRunCommand, + Container container) throws ContainerExecutionException { + if(!requestsDevice(resourceName, container)) { + return; + } + DeviceRuntimeSpec deviceRuntimeSpec = getRuntimeSpec(container); + // handle device mounts + Set deviceMounts = deviceRuntimeSpec.getDeviceMounts(); + for (MountDeviceSpec mountDeviceSpec : deviceMounts) { + dockerRunCommand.addDevice( + mountDeviceSpec.getDevicePathInHost(), + mountDeviceSpec.getDevicePathInContainer()); + } + // handle volume mounts + Set mountVolumeSpecs = deviceRuntimeSpec.getVolumeMounts(); + for (MountVolumeSpec mountVolumeSpec : mountVolumeSpecs) { + if (mountVolumeSpec.getReadOnly()) { + dockerRunCommand.addReadOnlyMountLocation( + mountVolumeSpec.getHostPath(), + mountVolumeSpec.getMountPath()); + } else { + dockerRunCommand.addReadWriteMountLocation( + mountVolumeSpec.getHostPath(), + mountVolumeSpec.getMountPath()); + } + } + // handle envs + dockerRunCommand.addEnv(deviceRuntimeSpec.getEnvs()); + + } + + @Override + public DockerVolumeCommand getCreateDockerVolumeCommand(Container container) throws ContainerExecutionException { + if(!requestsDevice(resourceName, container)) { + return null; + } + DeviceRuntimeSpec deviceRuntimeSpec = getRuntimeSpec(container); + Set volumeClaims = deviceRuntimeSpec.getVolumeClaims(); + for (VolumeSpec volumeSec: volumeClaims) { + if (volumeSec.getVolumeOperation().equals(VolumeSpec.CREATE)) { + DockerVolumeCommand command = new DockerVolumeCommand( + DockerVolumeCommand.VOLUME_CREATE_SUB_COMMAND); + command.setDriverName(volumeSec.getVolumeDriver()); + command.setVolumeName(volumeSec.getVolumeName()); + // TODO: support more volume creation and with + return command; + } + } + return null; + } + + @Override + public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container) throws ContainerExecutionException { + + if(!requestsDevice(resourceName, container)) { + return null; + } + Set allocated = new TreeSet<>(); + getAllocatedDevices(container, allocated); + devicePlugin.onDevicesReleased(allocated); + + // remove cache + ContainerId containerId = container.getContainerId(); + cachedAllocation.remove(containerId); + cachedSpec.remove(containerId); + return null; + } + + @VisibleForTesting + protected boolean requestsDevice(String resourceName, Container container) { + return DeviceSchedulerManager. + getRequestedDeviceCount(resourceName, container.getResource()) > 0; + } + + private void getAllocatedDevices(Container container, Set allocated) { + // get allocated devices + ContainerId containerId = container.getContainerId(); + allocated = cachedAllocation.get(containerId); + if (null == allocated) { + return; + } + Map assignedDevice = devicePluginAdapter + .getDeviceSchedulerManager() + .getAllUsedDevices().get(resourceName); + for (Map.Entry entry : assignedDevice.entrySet()) { + if (entry.getValue().equals(containerId)) { + allocated.add(entry.getKey()); + } + } + cachedAllocation.put(containerId, allocated); + } + + public synchronized DeviceRuntimeSpec getRuntimeSpec(Container container) { + ContainerId containerId = container.getContainerId(); + DeviceRuntimeSpec deviceRuntimeSpec = cachedSpec.get(containerId); + if (deviceRuntimeSpec == null) { + Set allocated = new TreeSet<>(); + getAllocatedDevices(container, allocated); + deviceRuntimeSpec = devicePlugin.onDevicesAllocated(allocated, + DeviceRuntimeSpec.RUNTIME_DOCKER); + if (null == deviceRuntimeSpec) { + LOG.error("Null DeviceRuntimeSpec value got, please check plugin logic"); + return null; + } + cachedSpec.put(containerId, deviceRuntimeSpec); + } + return deviceRuntimeSpec; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java new file mode 100644 index 00000000000..76482a8e6cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; + +import java.util.List; +import java.util.Set; + +public class DeviceResourceHandlerImpl implements ResourceHandler { + + final static Log LOG = LogFactory.getLog(DeviceResourceHandlerImpl.class); + + private String resourceName; + private DevicePlugin devicePlugin; + private DeviceSchedulerManager deviceSchedulerManager; + private CGroupsHandler cGroupsHandler; + private PrivilegedOperationExecutor privilegedOperationExecutor; + private DevicePluginAdapter devicePluginAdapter; + + public DeviceResourceHandlerImpl(String resourceName, + DevicePlugin devicePlugin, + DevicePluginAdapter devicePluginAdapter, + DeviceSchedulerManager deviceSchedulerManager, + CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperation) { + this.devicePluginAdapter = devicePluginAdapter; + this.resourceName = resourceName; + this.devicePlugin = devicePlugin; + this.cGroupsHandler = cGroupsHandler; + this.privilegedOperationExecutor = privilegedOperation; + this.deviceSchedulerManager = deviceSchedulerManager; + } + + @Override + public List bootstrap(Configuration configuration) throws ResourceHandlerException { + Set availableDevices = devicePlugin.getDevices(); + /** + * We won't fail the NM if plugin returns invalid value here. + * // TODO: we should update RM's resource count if something wrong + * */ + if (availableDevices == null) { + LOG.error("Bootstrap " + resourceName + " failed. Null value got from plugin's getDevices method"); + return null; + } + // Add device set. Here we trust the plugin's return value + deviceSchedulerManager.addDeviceSet(resourceName, availableDevices); + // TODO: Init cgroups + + return null; + } + + @Override + public List preStart(Container container) throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + DeviceSchedulerManager.DeviceAllocation allocation = deviceSchedulerManager.assignDevices( + resourceName, container); + LOG.debug("Allocated to " + + containerIdStr + ": " + allocation ); + + devicePlugin.onDevicesAllocated( + allocation.getAllowed(), DeviceRuntimeSpec.RUNTIME_CGROUPS); + + // cgroups operation based on allocation + /** + * TODO: implement a general container-executor device module to accept do isolation + * */ + + return null; + } + + @Override + public List reacquireContainer(ContainerId containerId) throws ResourceHandlerException { + deviceSchedulerManager.recoverAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List updateContainer(Container container) throws ResourceHandlerException { + return null; + } + + @Override + public List postComplete(ContainerId containerId) throws ResourceHandlerException { + deviceSchedulerManager.cleanupAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List teardown() throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceUpdaterImpl.java new file mode 100644 index 00000000000..4fbb42d03f2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceUpdaterImpl.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; + +import java.util.Set; + +public class DeviceResourceUpdaterImpl extends NodeResourceUpdaterPlugin { + + final static Log LOG = LogFactory.getLog(DeviceResourceUpdaterImpl.class); + + private String resourceName; + private DevicePlugin devicePlugin; + + public DeviceResourceUpdaterImpl(String resourceName, + DevicePlugin devicePlugin) { + this.devicePlugin = devicePlugin; + this.resourceName = resourceName; + } + + @Override + public void updateConfiguredResource(Resource res) throws YarnException { + LOG.info(resourceName + " plugin update resource "); + Set devices = devicePlugin.getDevices(); + if (devices == null) { + LOG.warn(resourceName + " plugin failed to discover resource ( null value got)." ); + return; + } + res.setResourceValue(resourceName, devices.size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java new file mode 100644 index 00000000000..1e79c42207c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Schedule device resource based on requirements and do book keeping + * It holds all device type resource and can do scheduling as a default scheduler + * If one resource type brings its own scheduler. That will be used. + * */ +public class DeviceSchedulerManager { + final static Log LOG = LogFactory.getLog(DeviceSchedulerManager.class); + + private Context nmContext; + private static final int WAIT_MS_PER_LOOP = 1000; + + // Holds vendor implemented scheduler + private Map devicePluginSchedulers = + new ConcurrentHashMap<>(); + + /** + * Hold all type of devices + * key is the device resource name + * value is a sorted set of {@link Device} + * */ + private Map> allAllowedDevices = new ConcurrentHashMap<>(); + + /** + * Hold used devices + * key is the device resource name + * value is a sorted map of {@link Device} and {@link ContainerId} pairs + * */ + private Map> allUsedDevices = new ConcurrentHashMap<>(); + + public DeviceSchedulerManager(Context context) { + nmContext = context; + } + + @VisibleForTesting + public Map> getAllAllowedDevices() { + return allAllowedDevices; + } + + @VisibleForTesting + public Map> getAllUsedDevices() { + return allUsedDevices; + } + + public synchronized void addDeviceSet(String resourceName, Set deviceSet) { + LOG.info("Adding new resource: " + "type:" + resourceName + "," + deviceSet); + allAllowedDevices.put(resourceName, new TreeSet<>(deviceSet)); + allUsedDevices.put(resourceName, new TreeMap<>()); + } + + public DeviceAllocation assignDevices(String resourceName, Container container) + throws ResourceHandlerException { + DeviceAllocation allocation = internalAssignDevices(resourceName, container); + // Wait for a maximum of 120 seconds if no available Devices are there which + // are yet to be released. + final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP; + int timeWaiting = 0; + while (allocation == null) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + // Sleep for 1 sec to ensure there are some free devices which are + // getting released. + try { + LOG.info("Container : " + container.getContainerId() + + " is waiting for free " + resourceName + " devices."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + allocation = internalAssignDevices(resourceName, container); + } catch (InterruptedException e) { + // On any interrupt, break the loop and continue execution. + break; + } + } + + if(allocation == null) { + String message = "Could not get valid " + resourceName + " device for container '" + + container.getContainerId() + + "' as some other containers might not releasing them."; + LOG.warn(message); + throw new ResourceHandlerException(message); + } + return allocation; + } + + private synchronized DeviceAllocation internalAssignDevices(String resourceName, Container container) + throws ResourceHandlerException { + Resource requestedResource = container.getResource(); + ContainerId containerId = container.getContainerId(); + int requestedDeviceCount = getRequestedDeviceCount(resourceName, requestedResource); + // Assign devices to container if requested some. + if (requestedDeviceCount > 0) { + if (requestedDeviceCount > getAvailableDevices(resourceName)) { + // If there are some devices which are getting released, wait for few + // seconds to get it. + if (requestedDeviceCount <= getReleasingDevices(resourceName) + + getAvailableDevices(resourceName)) { + return null; + } + } + + int availableDeviceCount = getAvailableDevices(resourceName); + if (requestedDeviceCount > availableDeviceCount) { + throw new ResourceHandlerException( + "Failed to find enough " + resourceName + ", requestor=" + containerId + + ", #Requested=" + requestedDeviceCount + ", #available=" + + availableDeviceCount); + } + + Set assignedDevices = new TreeSet<>(); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + + DevicePluginScheduler dps = devicePluginSchedulers.get(resourceName); + // Prefer DevicePluginScheduler logic + if (null == dps) { + LOG.debug("Customized device plugin scheduler is preferred " + + "but not implemented, use default logic"); + defaultScheduleAction(allowedDevices, usedDevices, + assignedDevices, containerId, requestedDeviceCount); + } else { + LOG.debug("Customized device plugin implemented," + + "use customized logic"); + // Use customized device scheduler + LOG.debug("Try to schedule " + requestedDeviceCount + + "(" + resourceName + ") using " + dps.getClass()); + Set dpsAllocated = dps.allocateDevices( + Sets.difference(allowedDevices, usedDevices.keySet()), + requestedDeviceCount); + // TODO: should check if customized scheduler return values are real + if (dpsAllocated.size() != requestedDeviceCount) { + throw new ResourceHandlerException(dps.getClass() + + " should allocate " + requestedDeviceCount + + " of " + resourceName + ", but actual: " + + assignedDevices.size()); + // TODO: fall back to default schedule logic + } + // copy + assignedDevices.addAll(dpsAllocated); + // Store assigned devices into usedDevices + for (Device device : assignedDevices) { + usedDevices.put(device, containerId); + } + } + // Record in state store if we allocated anything + if (!assignedDevices.isEmpty()) { + try { + // Update state store. + nmContext.getNMStateStore().storeAssignedResources(container, resourceName, + new ArrayList<>(assignedDevices)); + } catch (IOException e) { + cleanupAssignedDevices(resourceName, containerId); + throw new ResourceHandlerException(e); + } + } + + return new DeviceAllocation(resourceName, assignedDevices, + Sets.difference(allowedDevices, assignedDevices)); + } + return new DeviceAllocation(resourceName, null, + allAllowedDevices.get(resourceName)); + } + + public synchronized void recoverAssignedDevices(String resourceName, ContainerId containerId) + throws ResourceHandlerException{ + Container c = nmContext.getContainers().get(containerId); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + if (null == c) { + throw new ResourceHandlerException( + "This shouldn't happen, cannot find container with id=" + + containerId); + } + + for (Serializable deviceSerializable : c.getResourceMappings() + .getAssignedResources(resourceName)) { + if (!(deviceSerializable instanceof Device)) { + throw new ResourceHandlerException( + "Trying to recover device id, however it" + + " is not Device instance, this shouldn't happen"); + } + + Device device = (Device) deviceSerializable; + + // Make sure it is in allowed device. + if (!allowedDevices.contains(device)) { + throw new ResourceHandlerException( + "Try to recover device = " + device + + " however it is not in allowed device list:" + StringUtils + .join(",", allowedDevices)); + } + + // Make sure it is not occupied by anybody else + if (usedDevices.containsKey(device)) { + throw new ResourceHandlerException( + "Try to recover device id = " + device + + " however it is already assigned to container=" + + usedDevices.get(device) + + ", please double check what happened."); + } + + usedDevices.put(device, containerId); + } + } + + public synchronized void cleanupAssignedDevices(String resourceName, ContainerId containerId) { + Iterator> iter = + allUsedDevices.get(resourceName).entrySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getValue().equals(containerId)) { + iter.remove(); + } + } + } + + public static int getRequestedDeviceCount(String resourceName, Resource requestedResource) { + try { + return Long.valueOf(requestedResource.getResourceValue( + resourceName)).intValue(); + } catch (ResourceNotFoundException e) { + return 0; + } + } + + public synchronized int getAvailableDevices(String resourceName) { + return allAllowedDevices.get(resourceName).size() - + allUsedDevices.get(resourceName).size(); + } + + private synchronized long getReleasingDevices(String resourceName) { + long releasingDevices = 0; + Map used = allUsedDevices.get(resourceName); + Iterator> iter = used.entrySet() + .iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next().getValue(); + Container container; + if ((container = nmContext.getContainers().get(containerId)) != null) { + if (container.isContainerInFinalStates()) { + releasingDevices = releasingDevices + container.getResource() + .getResourceInformation(resourceName).getValue(); + } + } + } + return releasingDevices; + } + + // default scheduling logic + private void defaultScheduleAction(Set allowed, + Map used, Set assigned, + ContainerId containerId, int count) { + for (Device device : allowed) { + if (!used.containsKey(device)) { + used.put(device, containerId); + assigned.add(device); + if (assigned.size() == count) { + return; + } + } + } // end for + } + + static class DeviceAllocation { + private String resourceName; + + + private Set allowed = Collections.emptySet(); + private Set denied = Collections.emptySet(); + + DeviceAllocation(String resourceName, Set allowed, Setdenied) { + this.resourceName = resourceName; + if (allowed != null) { + this.allowed = ImmutableSet.copyOf(allowed); + } + if (denied != null) { + this.denied = ImmutableSet.copyOf(denied); + } + } + + + public Set getAllowed() { + return allowed; + } + + @Override + public String toString() { + return "ResourceType: " + resourceName + + ", Allowed Devices: " + allowed + + ", Denied Devices: " + denied; + } + + } + + @VisibleForTesting + public void addDevicePluginScheduler(String resourceName, + DevicePluginScheduler s) { + this.devicePluginSchedulers.put(resourceName, + Objects.requireNonNull(s)); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java new file mode 100644 index 00000000000..99c920dd773 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +public class FakeDevicePlugin + implements DevicePlugin,DevicePluginScheduler { + public final static String resourceName = "cmp.com/cmp"; + @Override + public DeviceRegisterRequest getRegisterRequestInfo() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName(resourceName) + .setPluginVersion("v1.0").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/cmp0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) { + return null; + } + + @Override + public void onDevicesReleased(Set allocatedDevices) { + + } + + @Override + public Set allocateDevices(Set availableDevices, Integer count) { + Set allocated = new TreeSet<>(); + int number = 0; + for (Device d : availableDevices) { + allocated.add(d); + number++; + if (number == count) { + break; + } + } + return allocated; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMDeviceResourceInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMDeviceResourceInfo.java new file mode 100644 index 00000000000..3a0ba757cae --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/NMDeviceResourceInfo.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp.dao; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.AssignedDevice; + +import java.util.List; + +public class NMDeviceResourceInfo extends NMResourceInfo { + + List totalDevices; + List assignedDevices; + + public NMDeviceResourceInfo(List totalDevices, List assignedDevices) { + this.assignedDevices = assignedDevices; + this.totalDevices = totalDevices; + } + + public List getTotalDevices() { + return totalDevices; + } + + public void setTotalDevices(List totalDevices) { + this.totalDevices = totalDevices; + } + + public List getAssignedDevices() { + return assignedDevices; + } + + public void setAssignedDevices( + List assignedDevices) { + this.assignedDevices = assignedDevices; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java index 40ac61851fb..0b5ce3ac374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/TestDockerContainerRuntime.java @@ -1958,16 +1958,16 @@ private void checkVolumeCreateCommand() ArgumentCaptor opCaptor = ArgumentCaptor.forClass( PrivilegedOperation.class); - //single invocation expected + //Three invocations expected (volume creation, volume check, run container) //due to type erasure + mocking, this verification requires a suppress // warning annotation on the entire method - verify(mockExecutor, times(2)) + verify(mockExecutor, times(3)) .executePrivilegedOperation(anyList(), opCaptor.capture(), any( File.class), anyMap(), anyBoolean(), anyBoolean()); //verification completed. we need to isolate specific invications. // hence, reset mock here - Mockito.reset(mockExecutor); + //Mockito.reset(mockExecutor); List allCaptures = opCaptor.getAllValues(); @@ -2070,10 +2070,8 @@ private void testDockerCommandPluginWithVolumesOutput( try { runtime.prepareContainer(containerRuntimeContext); - - checkVolumeCreateCommand(); - runtime.launchContainer(containerRuntimeContext); + checkVolumeCreateCommand(); } catch (ContainerExecutionException e) { if (expectFail) { // Expected @@ -2166,10 +2164,11 @@ public void testDockerCommandPlugin() throws Exception { ContainerRuntimeContext containerRuntimeContext = builder.build(); runtime.prepareContainer(containerRuntimeContext); - checkVolumeCreateCommand(); runtime.launchContainer(containerRuntimeContext); - List dockerCommands = readDockerCommands(); + checkVolumeCreateCommand(); + + List dockerCommands = readDockerCommands(3); int expected = 14; int counter = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java index 6ed7c568899..1f748826ffd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; @@ -41,27 +43,41 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin1; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin2; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin3; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.File; +import java.util.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class TestResourcePluginManager extends NodeManagerTestBase { private NodeManager nm; + private YarnConfiguration conf; + + private String tempResourceTypesFile; + + @Before + public void setup() throws Exception { + this.conf = createNMConfig(); + // setup resource-types.xml + ResourceUtils.resetResourceTypes(); + String resourceTypesFile = "resource-types-pluggable-devices.xml"; + this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile); + } + ResourcePluginManager stubResourcePluginmanager() { // Stub ResourcePluginManager final ResourcePluginManager rpm = mock(ResourcePluginManager.class); @@ -94,6 +110,11 @@ public void tearDown() { // ignore } } + // cleanup resource-types.xml + File dest = new File(this.tempResourceTypesFile); + if (dest.exists()) { + dest.delete(); + } } private class CustomizedResourceHandler implements ResourceHandler { @@ -145,7 +166,7 @@ public MyMockNM(ResourcePluginManager rpm) { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - ((NodeManager.NMContext)context).setResourcePluginManager(rpm); + ((NodeManager.NMContext) context).setResourcePluginManager(rpm); return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, metrics, new BaseResourceTrackerForTest()); } @@ -157,7 +178,7 @@ protected ContainerManagerImpl createContainerManager(Context context, ApplicationACLsManager aclsManager, LocalDirsHandlerService diskhandler) { return new MyContainerManager(context, exec, del, nodeStatusUpdater, - metrics, diskhandler); + metrics, diskhandler); } @Override @@ -222,7 +243,7 @@ public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Except @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - ((NMContext)context).setResourcePluginManager(rpm); + ((NMContext) context).setResourcePluginManager(rpm); return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, metrics, new BaseResourceTrackerForTest()); } @@ -239,7 +260,7 @@ protected ContainerManagerImpl createContainerManager(Context context, @Override protected ContainerExecutor createContainerExecutor(Configuration conf) { - ((NMContext)this.getNMContext()).setResourcePluginManager(rpm); + ((NMContext) this.getNMContext()).setResourcePluginManager(rpm); lce.setConf(conf); return lce; } @@ -257,6 +278,9 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) { boolean newHandlerAdded = false; for (ResourceHandler h : ((ResourceHandlerChain) handler) .getResourceHandlerList()) { + if (h instanceof DevicePluginAdapter) { + Assert.assertTrue(false); + } if (h instanceof CustomizedResourceHandler) { newHandlerAdded = true; break; @@ -264,4 +288,175 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) { } Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded); } + + // Disabled pluggable framework. + // We use spy object of real rpm to verify "initializePluggableDevicePlugins" + // because use mock rpm will not working + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkDisabled() throws Exception { + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + false); + nm.init(conf); + nm.start(); + verify(rpmSpy, times(1)).initialize( + any(Context.class)); + verify(rpmSpy, times(0)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + } + + // no configuration set. + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkDisabled2() throws Exception { + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + nm.init(conf); + nm.start(); + verify(rpmSpy, times(1)).initialize( + any(Context.class)); + verify(rpmSpy, times(0)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + } + + // Enable framework and configure pluggable device classes + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkEnabled() throws Exception { + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeDevicePlugin.class.getCanonicalName()); + nm.init(conf); + nm.start(); + verify(rpmSpy, times(1)).initialize( + any(Context.class)); + verify(rpmSpy, times(1)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + } + + // Enable pluggable framework, but leave device classes un-configured + // initializePluggableDevicePlugins invoked but it should throw an exception + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkEnabled2() + throws ClassNotFoundException{ + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + Boolean fail = false; + try { + YarnConfiguration conf = createNMConfig(); + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + + nm.init(conf); + nm.start(); + } catch (YarnRuntimeException e) { + fail = true; + } catch (Exception e) { + + } + verify(rpmSpy, times(1)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + Assert.assertTrue(fail); + } + + @Test(timeout = 30000) + public void testNormalInitializationOfPluggableDeviceClasses() + throws Exception{ + + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeTestDevicePlugin1.class.getCanonicalName() + "," + + FakeDevicePlugin.class.getCanonicalName()); + nm.init(conf); + nm.start(); + Map pluginMap = rpmSpy.getNameToPlugins(); + Assert.assertEquals(2,pluginMap.size()); + ResourcePlugin rp = pluginMap.get("cmp.com/cmp"); + if (! (rp instanceof DevicePluginAdapter)) { + Assert.assertTrue(false); + } + } + + // Fail to load a class which doesn't implement interface DevicePlugin + @Test(timeout = 30000) + public void testLoadInvalidPluggableDeviceClasses() + throws Exception{ + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeTestDevicePlugin2.class.getCanonicalName()); + + String expectedMessage = "Class: " + FakeTestDevicePlugin2.class.getCanonicalName() + + " not instance of " + DevicePlugin.class.getCanonicalName(); + String actualMessage = ""; + try { + nm.init(conf); + nm.start(); + } catch (YarnRuntimeException e) { + actualMessage = e.getMessage(); + } + Assert.assertEquals(expectedMessage, actualMessage); + } + + @Test(timeout = 30000) + public void testLoadDuplicateResourceNameDevicePlugin() + throws Exception{ + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeTestDevicePlugin1.class.getCanonicalName() + "," + + FakeTestDevicePlugin3.class.getCanonicalName()); + + String expectedMessage = "cmpA.com/hdwA" + + " has already been registered! Please change a resource type name"; + String actualMessage = ""; + try { + nm.init(conf); + nm.start(); + } catch (YarnRuntimeException e) { + actualMessage = e.getMessage(); + } + Assert.assertEquals(expectedMessage, actualMessage); + } + + @Test(timeout = 30000) + public void testRequestedResourceNameIsConfigured() + throws Exception{ + ResourcePluginManager rpm = new ResourcePluginManager(); + String resourceName = "a.com/a"; + Assert.assertFalse(rpm.isValidAndConfiguredResourceName(resourceName)); + resourceName = "cmp.com/cmp"; + Assert.assertTrue(rpm.isValidAndConfiguredResourceName(resourceName)); + } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java new file mode 100644 index 00000000000..ec1c2aaca99 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +/** + * Used only for testing. + * A fake normal vendor plugin + * */ +public class FakeTestDevicePlugin1 implements DevicePlugin { + @Override + public DeviceRegisterRequest getRegisterRequestInfo() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName("cmpA.com/hdwA").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) { + return null; + } + + + @Override + public void onDevicesReleased(Set allocatedDevices) { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java new file mode 100644 index 00000000000..8c1a61dc21c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +/** + * Only used for testing. + * This isn't a implementation of DevicePlugin + * */ +public class FakeTestDevicePlugin2 { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java new file mode 100644 index 00000000000..f7677314db6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +/** + * Only used for testing + * This plugin register a same name with FakeTestDevicePlugin1 + * */ +public class FakeTestDevicePlugin3 implements DevicePlugin { + @Override + public DeviceRegisterRequest getRegisterRequestInfo() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName("cmpA.com/hdwA").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) { + return null; + } + + @Override + public void onDevicesReleased(Set allocatedDevices) { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java new file mode 100644 index 00000000000..fe74c0fd1bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + + +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.*; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verify; + +public class TestDevicePluginAdapter { + + protected static final Logger LOG = + LoggerFactory.getLogger(TestDevicePluginAdapter.class); + + private YarnConfiguration conf; + private String tempResourceTypesFile; + private CGroupsHandler mockCGroupsHandler; + private PrivilegedOperationExecutor mockPrivilegedExecutor; + private NodeManager nm; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + // setup resource-types.xml + ResourceUtils.resetResourceTypes(); + String resourceTypesFile = "resource-types-pluggable-devices.xml"; + this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile); + mockCGroupsHandler = mock(CGroupsHandler.class); + mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class); + } + + @After + public void tearDown() throws IOException { + // cleanup resource-types.xml + File dest = new File(this.tempResourceTypesFile); + if (dest.exists()) { + dest.delete(); + } + if (nm != null) { + try { + ServiceOperations.stop(nm); + } catch (Throwable t) { + // ignore + } + } + } + + + /** + * Use the MyPlugin which doesn't implement scheduler interfaces + * Plugin's initialization is tested in + * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.TestResourcePluginManager} + * + * */ + @Test + public void testBasicWorkflow() + throws YarnException, IOException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService storeService = mock(NMStateStoreService.class); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.resourceName; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dsm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + int size = dsm.getAvailableDevices(resourceName); + Assert.assertEquals(3, size); + + // A container c1 requests 1 device + Container c1 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c1); + // check book keeping + Assert.assertEquals(2, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(1, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.getDeviceResourceHandler().postComplete(getContainerId(0)); + Assert.assertEquals(3, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + + // A container c2 requests 3 device + Container c2 = mockContainerWithDeviceRequest(1, + resourceName, + 3,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c2); + // check book keeping + Assert.assertEquals(0, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(3, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.getDeviceResourceHandler().postComplete(getContainerId(1)); + Assert.assertEquals(3, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + } + + /** + * Use {@link FakeDevicePlugin} which implements + * {@link org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler} + * interface. + * + * */ + @Test + public void testBasicWorkflowWithPluginAndPluginScheduler() + throws YarnException, IOException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService storeService = mock(NMStateStoreService.class); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + FakeDevicePlugin plugin = new FakeDevicePlugin(); + FakeDevicePlugin spyPlugin = spy(plugin); + String resourceName = FakeDevicePlugin.resourceName; + // Add customized device plugin scheduler + dsm.addDevicePluginScheduler(resourceName,spyPlugin); + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dsm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + int size = dsm.getAvailableDevices(resourceName); + Assert.assertEquals(1, size); + + // A container requests 1 device + Container c1 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c1); + // check if the plugin's own scheduler works + verify(spyPlugin, times(1)) + .allocateDevices(isA(Set.class),isA(Integer.class)); + // check book keeping + Assert.assertEquals(0, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(1, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dsm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.getDeviceResourceHandler().postComplete(getContainerId(0)); + Assert.assertEquals(1, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dsm.getAllAllowedDevices().get(resourceName).size()); + } + + @Test + public void testStoreDeviceSchedulerManagerState() + throws IOException, YarnException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService realStoreService = new NMMemoryStateStoreService(); + NMStateStoreService storeService = spy(realStoreService); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.resourceName; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dsm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + + // A container c0 requests 1 device + Container c0 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.getDeviceResourceHandler().preStart(c0); + // ensure container1's resource is persistent + verify(storeService).storeAssignedResources(c0, resourceName, + Arrays.asList(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:00.0") + .setHealthy(true) + .build())); + } + + @Test + public void testRecoverDeviceSchedulerManagerState() throws IOException, YarnException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService realStoreService = new NMMemoryStateStoreService(); + NMStateStoreService storeService = spy(realStoreService); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.resourceName; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dsm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + // mock NMStateStore + Device storedDevice = Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:00.0") + .setHealthy(true) + .build(); + ConcurrentHashMap runningContainersMap + = new ConcurrentHashMap<>(); + Container nmContainer = mock(Container.class); + ResourceMappings rmap = new ResourceMappings(); + ResourceMappings.AssignedResources ar = + new ResourceMappings.AssignedResources(); + ar.updateAssignedResources( + Arrays.asList(storedDevice)); + rmap.addAssignedResources(resourceName, ar); + when(nmContainer.getResourceMappings()).thenReturn(rmap); + when(context.getContainers()).thenReturn(runningContainersMap); + + // Test case 1. c0 get recovered. scheduler state restored + runningContainersMap.put(getContainerId(0), nmContainer); + adapter.getDeviceResourceHandler().reacquireContainer( + getContainerId(0)); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(2, + dsm.getAvailableDevices(resourceName)); + Map used = dsm.getAllUsedDevices().get(resourceName); + Assert.assertTrue(used.keySet().contains(storedDevice)); + + // Test case 2. c1 wants get recovered. But stored device is already allocated to c2 + nmContainer = mock(Container.class); + rmap = new ResourceMappings(); + ar = new ResourceMappings.AssignedResources(); + ar.updateAssignedResources( + Arrays.asList(storedDevice)); + rmap.addAssignedResources(resourceName, ar); + // already assigned to c1 + runningContainersMap.put(getContainerId(2), nmContainer); + boolean caughtException = false; + try { + adapter.getDeviceResourceHandler().reacquireContainer(getContainerId(1)); + } catch (ResourceHandlerException e) { + caughtException = true; + } + Assert.assertTrue( + "Should fail since requested device is assigned already", + caughtException); + // don't affect c0 allocation state + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(2, + dsm.getAvailableDevices(resourceName)); + used = dsm.getAllUsedDevices().get(resourceName); + Assert.assertTrue(used.keySet().contains(storedDevice)); + } + + @Test + public void testAssignedDeviceCleanupWhenStoreOpFails() throws IOException, YarnException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService realStoreService = new NMMemoryStateStoreService(); + NMStateStoreService storeService = spy(realStoreService); + when(context.getNMStateStore()).thenReturn(storeService); + doThrow(new IOException("Exception ...")).when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.resourceName; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter( + resourceName, + spyPlugin, dsm); + // Bootstrap, adding device + adapter.initialize(context); + adapter.createResourceHandler(context, + mockCGroupsHandler, mockPrivilegedExecutor); + adapter.getDeviceResourceHandler().bootstrap(conf); + + // A container c0 requests 1 device + Container c0 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + boolean exception = false; + try { + adapter.getDeviceResourceHandler().preStart(c0); + } catch (ResourceHandlerException e) { + exception = true; + } + Assert.assertTrue("Should throw exception in preStart", exception); + // no device assigned + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAvailableDevices(resourceName)); + + } + + private static Container mockContainerWithDeviceRequest(int id, + String resourceName, + int numDeviceRequest, + boolean dockerContainerEnabled) { + Container c = mock(Container.class); + when(c.getContainerId()).thenReturn(getContainerId(id)); + + Resource res = Resource.newInstance(1024, 1); + ResourceMappings resMapping = new ResourceMappings(); + + res.setResourceValue(resourceName, numDeviceRequest); + when(c.getResource()).thenReturn(res); + when(c.getResourceMappings()).thenReturn(resMapping); + + ContainerLaunchContext clc = mock(ContainerLaunchContext.class); + Map env = new HashMap<>(); + if (dockerContainerEnabled) { + env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE, + ContainerRuntimeConstants.CONTAINER_RUNTIME_DOCKER); + } + when(clc.getEnvironment()).thenReturn(env); + when(c.getLaunchContext()).thenReturn(clc); + return c; + } + + private static ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), id); + } + + + private class MyPlugin implements DevicePlugin { + private final static String resourceName = "cmpA.com/hdwA"; + @Override + public DeviceRegisterRequest getRegisterRequestInfo() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName(resourceName) + .setPluginVersion("v1.0").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:00.0") + .setHealthy(true) + .build()); + r.add(Device.Builder.newInstance() + .setID(1) + .setDevPath("/dev/hdwA1") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:01.0") + .setHealthy(true) + .build()); + r.add(Device.Builder.newInstance() + .setID(2) + .setDevPath("/dev/hdwA2") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:02.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec onDevicesAllocated(Set allocatedDevices, String runtime) { + return null; + } + + @Override + public void onDevicesReleased(Set releasedDevices) { + + } + } // MyPlugin + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml new file mode 100644 index 00000000000..42d3278e3ca --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml @@ -0,0 +1,23 @@ + + + + + + + + yarn.resource-types + cmp.com/cmp,cmpA.com/hdwA + + \ No newline at end of file