diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ce38d2762b2..ed8aebfecc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1605,6 +1605,34 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_RESOURCE_PLUGINS = NM_PREFIX + "resource-plugins"; + /** + * This setting controls if pluggable device plugin framework is enabled. + * */ + @Private + public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED = + NM_PREFIX + "pluggable-device-framework.enabled"; + + /** + * The pluggable device plugin framework is disabled by default + * */ + @Private + public static final boolean DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED = false; + + /** + * This settings contains vendor plugin class names for device plugin framework to load. + * Split by comma + * */ + @Private + public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES = + NM_PREFIX + "pluggable-device-framework.device-classes"; + + @Private + public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER = + NM_PREFIX + "pluggable-device-framework.prefer-customized-scheduler"; + + @Private + public static final Boolean DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER = + false; /** * Prefix for gpu configurations. Work in progress: This configuration * parameter may be changed/removed in the future. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1360e73f7fd..1b50b6ae565 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3770,6 +3770,35 @@ false + + + This settings controls if pluggable device framework is enabled. + Disabled by default + + yarn.nodemanager.pluggable-device-framework.enabled + false + + + + + This settings controls if pluggable device framework prefers customized scheduler + implemented in the plugin. + Disabled by default + + yarn.nodemanager.pluggable-device-framework.prefer-customized-scheduler + false + + + + + Configure vendor device plugin class name here. Comma separated. + The class must be found in CLASSPATH. The pluggable device framework will + load these classes. + + yarn.nodemanager.pluggable-device-framework.device-classes + + + When yarn.nodemanager.resource.gpu.allowed-gpu-devices=auto specified, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java new file mode 100644 index 00000000000..8f4e527d013 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/Device.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class Device implements Serializable, Comparable { + + private static final long serialVersionUID = 1L; + + /** + * Required fields: + * ID: an plugin specified index number + * devPath: device file like /dev/devicename + * majorNumber: major device number + * minorNumber: minor device number + * busID: PCI Bus ID in format [[[[]:]]:][][.[]]. Can get from "lspci -D" + * isHealthy: true or false indicating device health status + * */ + private final Integer ID; + private final String devPath; + private final Integer majorNumber; + private final Integer minorNumber; + private final String busID; + private boolean isHealthy; + + /** + * Optional fields + * */ + private String status; + // TODO: topology and attributes + + private Device(Builder builder) { + this.ID = Objects.requireNonNull(builder.ID); + this.devPath = Objects.requireNonNull(builder.devPath); + this.majorNumber = Objects.requireNonNull(builder.majorNumber); + this.minorNumber = Objects.requireNonNull(builder.minorNumber); + this.busID = Objects.requireNonNull(builder.busID); + this.isHealthy = Objects.requireNonNull(builder.isHealthy); + } + + public Integer getID() { + return ID; + } + + public String getDevPath() { + return devPath; + } + + public Integer getMajorNumber() { + return majorNumber; + } + + public Integer getMinorNumber() { + return minorNumber; + } + + public String getBusID() { + return busID; + } + + public boolean isHealthy() { + return isHealthy; + } + + public String getStatus() { + return status; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + Device device = (Device) o; + return Objects.equals(ID, device.ID) && + Objects.equals(devPath, device.devPath) && + Objects.equals(majorNumber, device.majorNumber) && + Objects.equals(minorNumber, device.minorNumber) && + Objects.equals(busID, device.busID); + } + + @Override + public int hashCode() { + return Objects.hash(ID, devPath, majorNumber, minorNumber, busID); + } + + @Override + public int compareTo(Object o) { + if (o == null || (!(o instanceof Device))) { + return -1; + } + + Device other = (Device) o; + + int result = Integer.compare(ID, other.getID()); + if (0 != result) { + return result; + } + + result = Integer.compare(majorNumber, other.majorNumber); + if (0 != result) { + return result; + } + + result = Integer.compare(minorNumber, other.minorNumber); + if (0 != result) { + return result; + } + + result = devPath.compareTo(other.devPath); + if (0 != result) { + return result; + } + + return busID.compareTo(other.busID); + } + + @Override + public String toString() { + return "(" + getDevPath() + ", " + getID() + ", " + getMajorNumber() + ":" + getMinorNumber() + ")"; + } + + public static class Builder { + private Integer ID; + private String devPath; + private Integer majorNumber; + private Integer minorNumber; + private String busID; + private boolean isHealthy; + private String status; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public Device build(){ + return new Device(this); + } + + public Builder setID(Integer ID) { + this.ID = ID; + return this; + } + + public Builder setDevPath(String devPath) { + this.devPath = devPath; + return this; + } + + public Builder setMajorNumber(Integer majorNumber) { + this.majorNumber = majorNumber; + return this; + } + + public Builder setMinorNumber(Integer minorNumber) { + this.minorNumber = minorNumber; + return this; + } + + public Builder setBusID(String busID) { + this.busID = busID; + return this; + } + + public Builder setHealthy(boolean healthy) { + isHealthy = healthy; + return this; + } + + public Builder setStatus(String status) { + this.status = status; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java new file mode 100644 index 00000000000..7b18db74958 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePlugin.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Set; + +/** + * A must interface for vendor plugin to implement. + * */ +public interface DevicePlugin { + /** + * Called first when device plugin framework wants to register + * @return DeviceRegisterRequest {@link DeviceRegisterRequest} + * */ + DeviceRegisterRequest register(); + + /** + * Called when update node resource + * @return a set of {@link Device}, {@link java.util.TreeSet} recommended + * */ + Set getDevices(); + + /** + * Called after device allocated (before container launch). + * @return a {@link DeviceRuntimeSpec} description about environment, + * {@link VolumeSpec}, {@link MountVolumeSpec}. etc + * on how these devices should be used when container launch + * */ + DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices); + + /** + * Called after device released. + * */ + void OnDevicesReleased(Set releasedDevices); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java new file mode 100644 index 00000000000..414cc4ace86 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DevicePluginScheduler.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Set; + +/** + * An optional interface to implement if custom device scheduling is needed. + * If this is not implemented, the device framework will do. + * */ +public interface DevicePluginScheduler { + /** + * Called when allocating devices. The framework will do all device book keeping + * and fail recovery. So this hook should only do scheduling based on available devices + * passed in. This method could be invoked multiple times. + * @param availableDevices Devices allowed to be chosen from. + * @param count Number of device to be allocated. + * @return a set of {@link Device} + * */ + Set allocateDevices(Set availableDevices, Integer count); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java new file mode 100644 index 00000000000..7753ad1efdf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRegisterRequest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.Objects; + +/** + * A vendor device plugin use this object to register + * to NM device plugin framework + * */ +public class DeviceRegisterRequest { + + // plugin's own version + private final String pluginVersion; + private final String resourceName; + + public DeviceRegisterRequest(Builder builder) { + this.resourceName = Objects.requireNonNull(builder.resourceName); + this.pluginVersion = builder.pluginVersion; + } + + public String getResourceName() { + return resourceName; + } + + public String getPluginVersion() { + return pluginVersion; + } + + + public static class Builder { + private String pluginVersion; + private String resourceName; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public DeviceRegisterRequest build() { + return new DeviceRegisterRequest(this); + } + + public Builder setResourceName(String resourceName) { + this.resourceName = resourceName; + return this; + } + + public Builder setPluginVersion(String pluginVersion) { + this.pluginVersion = pluginVersion; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java new file mode 100644 index 00000000000..e34c64fc474 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/DeviceRuntimeSpec.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.util.*; + +public class DeviceRuntimeSpec { + /** + * The runtime gives device framework a hint (not forced to) on which container + * runtime can use this Spec. + * If cgroups, these fields could be empty if no special requirement + * since the framework already knows major and minor device number + * in {@link Device}. + * If docker, these fields below should be populated as needed + */ + private final String runtime; + private final Map envs; + private final Set volumeMounts; + private final Set deviceMounts; + private final Set volumeClaims; + + public final static String RUNTIME_CGROUPS = "cgroups"; + public final static String RUNTIME_DOCKER = "docker"; + + private DeviceRuntimeSpec(Builder builder) { + this.runtime = builder.runtime; + this.deviceMounts = builder.deviceMounts; + this.envs = builder.envs; + this.volumeClaims = builder.volumeClaims; + this.volumeMounts = builder.volumeMounts; + } + + public static class Builder { + + private String runtime; + private Map envs; + private Set volumeMounts; + private Set deviceMounts; + private Set volumeClaims; + + private Builder() { + runtime = DeviceRuntimeSpec.RUNTIME_DOCKER; + envs = new HashMap<>(); + volumeClaims = new TreeSet<>(); + deviceMounts = new TreeSet<>(); + volumeMounts = new TreeSet<>(); + } + + public static Builder newInstance() { + return new Builder(); + } + + public DeviceRuntimeSpec build() { + return new DeviceRuntimeSpec(this); + } + + public Builder setRuntime(String runtime) { + this.runtime = runtime; + return this; + } + + public Builder addVolumeSpec(VolumeSpec spec) { + this.volumeClaims.add(spec); + return this; + } + + public Builder addMountVolumeSpec(MountVolumeSpec spec) { + this.volumeMounts.add(spec); + return this; + } + + public Builder addMountDeviceSpec(MountDeviceSpec spec) { + this.deviceMounts.add(spec); + return this; + } + + public Builder addEnv(String key, String value) { + this.envs.put(Objects.requireNonNull(key), + Objects.requireNonNull(value)); + return this; + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java new file mode 100644 index 00000000000..c4a616e78c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountDeviceSpec.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class MountDeviceSpec implements Serializable, Comparable { + + private final String devicePathInHost; + private final String devicePathInContainer; + + // r for only read, rw can do read and write + private final String devicePermission; + + public final static String RO = "r"; + public final static String RW = "rw"; + + private MountDeviceSpec(Builder builder) { + this.devicePathInContainer = builder.devicePathInContainer; + this.devicePathInHost = builder.devicePathInHost; + this.devicePermission = builder.devicePermission; + } + + public String getDevicePathInHost() { + return devicePathInHost; + } + + public String getDevicePathInContainer() { + return devicePathInContainer; + } + + public String getDevicePermission() { + return devicePermission; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + MountDeviceSpec other = (MountDeviceSpec) o; + return Objects.equals(devicePathInHost, other.devicePathInHost) && + Objects.equals(devicePathInContainer, other.devicePathInContainer) && + Objects.equals(devicePermission, other.devicePermission); + } + + @Override + public int compareTo(Object o) { + return 0; + } + + public static class Builder { + private String devicePathInHost; + private String devicePathInContainer; + private String devicePermission; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public MountDeviceSpec build() { + return new MountDeviceSpec(this); + } + + public Builder setDevicePermission(String permission) { + this.devicePermission = permission; + return this; + } + + public Builder setDevicePathInContainer(String devicePathInContainer) { + this.devicePathInContainer = devicePathInContainer; + return this; + } + + public Builder setDevicePathInHost(String devicePathInHost) { + this.devicePathInHost = devicePathInHost; + return this; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java new file mode 100644 index 00000000000..4612a2115aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/MountVolumeSpec.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class MountVolumeSpec implements Serializable, Comparable{ + + private static final long serialVersionUID = 1L; + + // host path or volume name + private final String hostPath; + + // path in the container + private final String mountPath; + + // if true, data in mountPath can only be read + // "-v hostPath:mountPath:ro" + private final Boolean isReadOnly; + + public final static String READONLYOPTION = "ro"; + + private MountVolumeSpec (Builder builder) { + this.hostPath = builder.hostPath; + this.mountPath = builder.mountPath; + this.isReadOnly = builder.isReadOnly; + } + + public String getHostPath() { + return hostPath; + } + + public String getMountPath() { + return mountPath; + } + + public Boolean getReadOnly() { + return isReadOnly; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + MountVolumeSpec other = (MountVolumeSpec) o; + return Objects.equals(hostPath, other.hostPath) && + Objects.equals(mountPath, other.mountPath) && + Objects.equals(isReadOnly, other.isReadOnly); + } + + @Override + public int compareTo(Object o) { + return 0; + } + + public static class Builder { + private String hostPath; + private String mountPath; + private Boolean isReadOnly; + + private Builder() {} + + public static Builder newInstance() { + return new Builder(); + } + + public MountVolumeSpec build() { + return new MountVolumeSpec(this); + } + + public Builder setHostPath(String hostPath) { + this.hostPath = hostPath; + return this; + } + + public Builder setMountPath(String mountPath) { + this.mountPath = mountPath; + return this; + } + + public Builder setReadOnly(Boolean readOnly) { + isReadOnly = readOnly; + return this; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java new file mode 100644 index 00000000000..e221326a731 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/deviceplugin/VolumeSpec.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin; + +import java.io.Serializable; +import java.util.Objects; + +public class VolumeSpec implements Serializable, Comparable { + + private static final long serialVersionUID = 1L; + + private final String volumeDriver; + private final String volumeName; + private final String volumeOperation; + + public final static String CREATE = "create"; + public final static String DELETE = "delete"; + + private VolumeSpec(Builder builder) { + this.volumeDriver = builder.volumeDriver; + this.volumeName = builder.volumeName; + this.volumeOperation = builder.volumeOperation; + } + + public String getVolumeDriver() { + return volumeDriver; + } + + public String getVolumeName() { + return volumeName; + } + + public String getVolumeOperation() { + return volumeOperation; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + VolumeSpec other = (VolumeSpec) o; + return Objects.equals(volumeDriver, other.volumeDriver) && + Objects.equals(volumeName, other.volumeName) && + Objects.equals(volumeOperation, other.volumeOperation); + } + + @Override + public int compareTo(Object o) { + return 0; + } + + public static class Builder { + private String volumeDriver; + private String volumeName; + private String volumeOperation; + + private Builder(){} + + public static Builder newInstance () { + return new Builder(); + } + + public VolumeSpec build() { + return new VolumeSpec(this); + } + + public Builder setVolumeDriver(String volumeDriver) { + this.volumeDriver = volumeDriver; + return this; + } + + public Builder setVolumeName(String volumeName) { + this.volumeName = volumeName; + return this; + } + + public Builder setVolumeOperation(String volumeOperation) { + this.volumeOperation = volumeOperation; + return this; + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index f28aad206a6..de692918635 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -18,14 +18,24 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DeviceSchedulerManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,14 +59,15 @@ private Map configuredPlugins = Collections.emptyMap(); + private DeviceSchedulerManager deviceSchedulerManager = null; + public synchronized void initialize(Context context) - throws YarnException { + throws YarnException, ClassNotFoundException { Configuration conf = context.getConf(); - String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); + Map pluginMap = new HashMap<>(); + String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); if (plugins != null) { - Map pluginMap = new HashMap<>(); - // Initialize each plugins for (String resourceName : plugins) { resourceName = resourceName.trim(); @@ -92,9 +103,101 @@ public synchronized void initialize(Context context) plugin.initialize(context); pluginMap.put(resourceName, plugin); } + } + // Try to load pluggable device plugins + boolean puggableDeviceFrameworkEnabled = conf.getBoolean( + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + if (puggableDeviceFrameworkEnabled) { + LOG.info("The pluggable device framework is not enabled. If you want, set true to " + + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + initializePluggableDevicePlugins(context, conf, pluginMap); + } + configuredPlugins = Collections.unmodifiableMap(pluginMap); + } - configuredPlugins = Collections.unmodifiableMap(pluginMap); + public void initializePluggableDevicePlugins(Context context, + Configuration configuration, + Map pluginMap) + throws YarnRuntimeException, ClassNotFoundException { + LOG.info("The pluggable device framework enabled, trying to load the vendor plugins"); + deviceSchedulerManager = new DeviceSchedulerManager(context); + String[] pluginClassNames = configuration.getStrings( + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES); + if (null == pluginClassNames) { + throw new YarnRuntimeException("Null value found in configuration: " + + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES); } + // Check if framework prefer customized device scheduler + Boolean ifPrefer = configuration.getBoolean( + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER, + YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_PREFER_CUSTOMIZED_SCHEDULER + ); + deviceSchedulerManager.setPreferCustomizedScheduler(ifPrefer); + LOG.info("If the device plugin framework prefer customized device scheduler:" + + ifPrefer); + for (String pluginClassName : pluginClassNames) { + Class pluginClazz = Class.forName(pluginClassName); + if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) { + throw new YarnRuntimeException("Class: " + pluginClassName + + " not instance of " + DevicePlugin.class.getCanonicalName()); + } + DevicePlugin dpInstance = (DevicePlugin) ReflectionUtils.newInstance(pluginClazz, + configuration); + // Try to register plugin + // TODO: handle the plugin method timeout issue + DeviceRegisterRequest request = dpInstance.register(); + String resourceName = request.getResourceName(); + // check if someone has already registered this resource type name + if (pluginMap.containsKey(resourceName)) { + throw new YarnRuntimeException(resourceName + + " has already been registered! Please change a resource type name"); + } + // check resource name is valid and configured in resource-types.xml + if (!isValidAndConfiguredResourceName(resourceName)) { + throw new YarnRuntimeException(resourceName + + " is not configured inside " + + YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE + + " , please configure it first"); + } + LOG.info("New resource type: " + resourceName + + " registered successfully by " + pluginClassName); + DevicePluginAdapter pluginAdapter = new DevicePluginAdapter(this, + resourceName, dpInstance); + LOG.info("Adapter of " + pluginClassName + " created. Initializing.."); + try { + pluginAdapter.initialize(context); + } catch (YarnException e) { + throw new YarnRuntimeException("Adapter of " + pluginClassName + " init failed!"); + } + LOG.info("Adapter of " + pluginClassName + " init success!"); + // Store plugin as adapter instance + pluginMap.put(request.getResourceName(), pluginAdapter); + // If the device plugin implements DevicePluginScheduler interface + if (dpInstance instanceof DevicePluginScheduler) { + LOG.info(pluginClassName + + " can schedule " + resourceName + " devices. Added as preferred device plugin scheduler"); + deviceSchedulerManager.addDevicePluginScheduler( + resourceName, + (DevicePluginScheduler)dpInstance); + } + } // end for + } + + @VisibleForTesting + public boolean isValidAndConfiguredResourceName(String resourceName) { + // check pattern match + // check configured + Map configuredResourceTypes = + ResourceUtils.getResourceTypes(); + if (!configuredResourceTypes.containsKey(resourceName)) { + return false; + } + return true; + } + + public DeviceSchedulerManager getDeviceSchedulerManager() { + return deviceSchedulerManager; } public synchronized void cleanup() throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java new file mode 100644 index 00000000000..0c38f9d324d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DevicePluginAdapter.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerRunCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo; + +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + + +/** + * The {@link DevicePluginAdapter} will adapt existing hooks into vendor plugin's logic. + * It decouples the vendor plugin from YARN's device framework + * + * */ +public class DevicePluginAdapter extends NodeResourceUpdaterPlugin + implements ResourcePlugin, DockerCommandPlugin, ResourceHandler{ + final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class); + + private ResourcePluginManager devicePluginManager; + private String resourceName; + private DevicePlugin devicePlugin; + private DeviceSchedulerManager deviceSchedulerManager; + private CGroupsHandler cGroupsHandler; + private PrivilegedOperationExecutor privilegedOperationExecutor; + + public DevicePluginAdapter(ResourcePluginManager pluginManager, String name, DevicePlugin dp) { + devicePluginManager = pluginManager; + deviceSchedulerManager = pluginManager.getDeviceSchedulerManager(); + resourceName = name; + devicePlugin = dp; + } + + /** + * Act as a {@link NodeResourceUpdaterPlugin} to update the {@link Resource} + * + * */ + @Override + public void updateConfiguredResource(Resource res) throws YarnException { + LOG.info(resourceName + " plugin update resource "); + Set devices = devicePlugin.getDevices(); + if (devices == null) { + LOG.warn(resourceName + " plugin failed to discover resource ( null value got)." ); + return; + } + res.setResourceValue(resourceName, devices.size()); + } + + /** + * Act as a {@link ResourcePlugin} + * */ + @Override + public void initialize(Context context) throws YarnException { + LOG.info(resourceName + " plugin adapter initialized"); + return; + } + + @Override + public ResourceHandler createResourceHandler(Context nmContext, CGroupsHandler cGroupsHandler, + PrivilegedOperationExecutor privilegedOperationExecutor) { + this.cGroupsHandler = cGroupsHandler; + this.privilegedOperationExecutor = privilegedOperationExecutor; + return this; + } + + @Override + public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() { + return this; + } + + @Override + public void cleanup() throws YarnException { + + } + + @Override + public DockerCommandPlugin getDockerCommandPluginInstance() { + return this; + } + + @Override + public NMResourceInfo getNMResourceInfo() throws YarnException { + return null; + } + + /** + * Act as a {@link DockerCommandPlugin} to hook the + * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime} + * */ + @Override + public void updateDockerRunCommand(DockerRunCommand dockerRunCommand, Container container) + throws ContainerExecutionException { + + } + + @Override + public DockerVolumeCommand getCreateDockerVolumeCommand(Container container) + throws ContainerExecutionException { + return null; + } + + @Override + public DockerVolumeCommand getCleanupDockerVolumesCommand(Container container) + throws ContainerExecutionException { + return null; + } + + /** + * Act as a {@link ResourceHandler} + * */ + @Override + public List bootstrap(Configuration configuration) throws ResourceHandlerException { + Set availableDevices = devicePlugin.getDevices(); + + /** + * We won't fail the NM if plugin returns invalid value here. + * // TODO: we should update RM's resource count if something wrong + * */ + if (availableDevices == null) { + LOG.error("Bootstrap " + resourceName + " failed. Null value got from plugin's getDevices method"); + return null; + } + // Add device set. Here we trust the plugin's return value + deviceSchedulerManager.addDeviceSet(resourceName, availableDevices); + // TODO: Init cgroups + + return null; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + DeviceSchedulerManager.DeviceAllocation allocation = deviceSchedulerManager.assignDevices( + resourceName, container); + LOG.debug("Allocated to " + + containerIdStr + ": " + allocation ); + /** + * TODO: implement a general container-executor device module to do isolation + * */ + return null; + } + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + deviceSchedulerManager.recoverAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + deviceSchedulerManager.cleanupAssignedDevices(resourceName, containerId); + return null; + } + + @Override + public List teardown() throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java new file mode 100644 index 00000000000..5652798ca34 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceSchedulerManager.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.*; + +/** + * Schedule device resource based on requirements and do book keeping + * It holds all device type resource and can do scheduling as a default scheduler + * If one resource type brings its own scheduler. That will be used. + * */ +public class DeviceSchedulerManager { + final static Log LOG = LogFactory.getLog(DeviceSchedulerManager.class); + + private Context nmContext; + private static final int WAIT_MS_PER_LOOP = 1000; + + // Holds vendor implemented scheduler + private Map devicePluginSchedulers = + new HashMap<>(); + + private boolean preferCustomizedScheduler = false; + + @VisibleForTesting + public Map> getAllAllowedDevices() { + return allAllowedDevices; + } + + @VisibleForTesting + public Map> getAllUsedDevices() { + return allUsedDevices; + } + + /** + * Hold all type of devices + * key is the device resource name + * value is a sorted set of {@link Device} + * */ + private Map> allAllowedDevices = new HashMap<>(); + + /** + * Hold used devices + * key is the device resource name + * value is a sorted map of {@link Device} and {@link ContainerId} pairs + * */ + private Map> allUsedDevices = new HashMap<>(); + + public DeviceSchedulerManager(Context context) { + nmContext = context; + } + + public synchronized void addDeviceSet(String resourceName, Set deviceSet) { + LOG.info("Adding new resource: " + "type:" + resourceName + "," + deviceSet); + allAllowedDevices.put(resourceName, new TreeSet<>(deviceSet)); + allUsedDevices.put(resourceName, new TreeMap<>()); + } + + public synchronized DeviceAllocation assignDevices(String resourceName, Container container) + throws ResourceHandlerException { + DeviceAllocation allocation = internalAssignDevices(resourceName, container); + // Wait for a maximum of 120 seconds if no available Devices are there which + // are yet to be released. + final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP; + int timeWaiting = 0; + while (allocation == null) { + if (timeWaiting >= timeoutMsecs) { + break; + } + + // Sleep for 1 sec to ensure there are some free devices which are + // getting released. + try { + LOG.info("Container : " + container.getContainerId() + + " is waiting for free " + resourceName + " devices."); + Thread.sleep(WAIT_MS_PER_LOOP); + timeWaiting += WAIT_MS_PER_LOOP; + allocation = internalAssignDevices(resourceName, container); + } catch (InterruptedException e) { + // On any interrupt, break the loop and continue execution. + break; + } + } + + if(allocation == null) { + String message = "Could not get valid " + resourceName + " device for container '" + + container.getContainerId() + + "' as some other containers might not releasing them."; + LOG.warn(message); + throw new ResourceHandlerException(message); + } + return allocation; + } + + public synchronized DeviceAllocation internalAssignDevices(String resourceName, Container container) + throws ResourceHandlerException { + Resource requestedResource = container.getResource(); + ContainerId containerId = container.getContainerId(); + int requestedDeviceCount = getRequestedDeviceCount(resourceName, requestedResource); + // Assign devices to container if requested some. + if (requestedDeviceCount > 0) { + if (requestedDeviceCount > getAvailableDevices(resourceName)) { + // If there are some devices which are getting released, wait for few + // seconds to get it. + if (requestedDeviceCount <= getReleasingDevices(resourceName) + + getAvailableDevices(resourceName)) { + return null; + } + } + + int availableDeviceCount = getAvailableDevices(resourceName); + if (requestedDeviceCount > availableDeviceCount) { + throw new ResourceHandlerException( + "Failed to find enough " + resourceName + ", requestor=" + containerId + + ", #Requested=" + requestedDeviceCount + ", #available=" + + availableDeviceCount); + } + + Set assignedDevices = new TreeSet<>(); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + + // Use default schedule logic if not prefer + if (!isPreferCustomizedScheduler()) { + LOG.debug("Customized device plugin scheduler is not preferred, use default logic"); + for (Device device : allowedDevices) { + if (!usedDevices.containsKey(device)) { + usedDevices.put(device, containerId); + assignedDevices.add(device); + if (assignedDevices.size() == requestedDeviceCount) { + break; + } + } + } + } else { + // Use customized device scheduler if prefer it + DevicePluginScheduler dps = devicePluginSchedulers.get(resourceName); + if (dps != null) { + // we'll prefer vendor provided scheduler + LOG.debug("Try to schedule " + requestedDeviceCount + + "(" + resourceName + ") using " + dps.getClass()); + Set dpsAllocated = dps.allocateDevices( + Sets.difference(allowedDevices, usedDevices.keySet()), + requestedDeviceCount); + if (dpsAllocated.size() != requestedDeviceCount) { + throw new ResourceHandlerException(dps.getClass() + " should allocate " + + requestedDeviceCount + + " of " + resourceName + ", but actual: " + + assignedDevices.size()); + // TODO: fall back to default schedule logic? + } + // copy + assignedDevices.addAll(dpsAllocated); + // Store assigned devices into usedDevices + for (Device device : assignedDevices) { + usedDevices.put(device, containerId); + } + } + } + // Record in state store if we allocated anything + if (!assignedDevices.isEmpty()) { + try { + // Update state store. + nmContext.getNMStateStore().storeAssignedResources(container, resourceName, + new ArrayList<>(assignedDevices)); + } catch (IOException e) { + cleanupAssignedDevices(resourceName, containerId); + throw new ResourceHandlerException(e); + } + } + + return new DeviceAllocation(resourceName, assignedDevices, + Sets.difference(allowedDevices, assignedDevices)); + } + return new DeviceAllocation(resourceName, null, + allAllowedDevices.get(resourceName)); + } + + public synchronized void recoverAssignedDevices(String resourceName, ContainerId containerId) + throws ResourceHandlerException{ + Container c = nmContext.getContainers().get(containerId); + Map usedDevices = allUsedDevices.get(resourceName); + Set allowedDevices = allAllowedDevices.get(resourceName); + if (null == c) { + throw new ResourceHandlerException( + "This shouldn't happen, cannot find container with id=" + + containerId); + } + + for (Serializable deviceSerializable : c.getResourceMappings() + .getAssignedResources(resourceName)) { + if (!(deviceSerializable instanceof Device)) { + throw new ResourceHandlerException( + "Trying to recover device id, however it" + + " is not Device instance, this shouldn't happen"); + } + + Device device = (Device) deviceSerializable; + + // Make sure it is in allowed device. + if (!allowedDevices.contains(device)) { + throw new ResourceHandlerException( + "Try to recover device = " + device + + " however it is not in allowed device list:" + StringUtils + .join(",", allowedDevices)); + } + + // Make sure it is not occupied by anybody else + if (usedDevices.containsKey(device)) { + throw new ResourceHandlerException( + "Try to recover device id = " + device + + " however it is already assigned to container=" + + usedDevices.get(device) + + ", please double check what happened."); + } + + usedDevices.put(device, containerId); + } + } + + public synchronized void cleanupAssignedDevices(String resourceName, ContainerId containerId) { + Iterator> iter = + allUsedDevices.get(resourceName).entrySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getValue().equals(containerId)) { + iter.remove(); + } + } + } + + public synchronized int getRequestedDeviceCount(String resourceName, Resource requestedResource) { + try { + return Long.valueOf(requestedResource.getResourceValue( + resourceName)).intValue(); + } catch (ResourceNotFoundException e) { + return 0; + } + } + + public synchronized int getAvailableDevices(String resourceName) { + return allAllowedDevices.get(resourceName).size() - + allUsedDevices.get(resourceName).size(); + } + + private synchronized long getReleasingDevices(String resourceName) { + long releasingDevices = 0; + Map used = allUsedDevices.get(resourceName); + Iterator> iter = used.entrySet() + .iterator(); + while (iter.hasNext()) { + ContainerId containerId = iter.next().getValue(); + Container container; + if ((container = nmContext.getContainers().get(containerId)) != null) { + if (container.isContainerInFinalStates()) { + releasingDevices = releasingDevices + container.getResource() + .getResourceInformation(resourceName).getValue(); + } + } + } + return releasingDevices; + } + + public boolean isPreferCustomizedScheduler() { + return preferCustomizedScheduler; + } + + public void setPreferCustomizedScheduler(boolean preferCustomizedScheduler) { + this.preferCustomizedScheduler = preferCustomizedScheduler; + } + + static class DeviceAllocation { + private String resourceName; + private Set allowed = Collections.emptySet(); + private Set denied = Collections.emptySet(); + + DeviceAllocation(String resourceName, Set allowed, Setdenied) { + this.resourceName = resourceName; + if (allowed != null) { + this.allowed = ImmutableSet.copyOf(allowed); + } + if (denied != null) { + this.denied = ImmutableSet.copyOf(denied); + } + } + + @Override + public String toString() { + return "ResourceType: " + resourceName + + ", Allowed Devices: " + allowed + + ", Denied Devices: " + denied; + } + + } + + @VisibleForTesting + public void addDevicePluginScheduler(String resourceName, + DevicePluginScheduler s) { + this.devicePluginSchedulers.put(resourceName, + Objects.requireNonNull(s)); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java new file mode 100644 index 00000000000..f4e47dc1ceb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/examples/FakeDevicePlugin.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +public class FakeDevicePlugin + implements DevicePlugin,DevicePluginScheduler { + public final static String resourceName = "cmp.com/cmp"; + @Override + public DeviceRegisterRequest register() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName(resourceName) + .setPluginVersion("v1.0").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/cmp0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) { + return null; + } + + @Override + public void OnDevicesReleased(Set allocatedDevices) { + + } + + @Override + public Set allocateDevices(Set availableDevices, Integer count) { + Set allocated = new TreeSet<>(); + int number = 0; + for (Device d : availableDevices) { + allocated.add(d); + number++; + if (number == count) { + break; + } + } + return allocated; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java index 6ed7c568899..1f748826ffd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; @@ -41,27 +43,41 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin1; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin2; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.FakeTestDevicePlugin3; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.io.File; +import java.util.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class TestResourcePluginManager extends NodeManagerTestBase { private NodeManager nm; + private YarnConfiguration conf; + + private String tempResourceTypesFile; + + @Before + public void setup() throws Exception { + this.conf = createNMConfig(); + // setup resource-types.xml + ResourceUtils.resetResourceTypes(); + String resourceTypesFile = "resource-types-pluggable-devices.xml"; + this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile); + } + ResourcePluginManager stubResourcePluginmanager() { // Stub ResourcePluginManager final ResourcePluginManager rpm = mock(ResourcePluginManager.class); @@ -94,6 +110,11 @@ public void tearDown() { // ignore } } + // cleanup resource-types.xml + File dest = new File(this.tempResourceTypesFile); + if (dest.exists()) { + dest.delete(); + } } private class CustomizedResourceHandler implements ResourceHandler { @@ -145,7 +166,7 @@ public MyMockNM(ResourcePluginManager rpm) { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - ((NodeManager.NMContext)context).setResourcePluginManager(rpm); + ((NodeManager.NMContext) context).setResourcePluginManager(rpm); return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, metrics, new BaseResourceTrackerForTest()); } @@ -157,7 +178,7 @@ protected ContainerManagerImpl createContainerManager(Context context, ApplicationACLsManager aclsManager, LocalDirsHandlerService diskhandler) { return new MyContainerManager(context, exec, del, nodeStatusUpdater, - metrics, diskhandler); + metrics, diskhandler); } @Override @@ -222,7 +243,7 @@ public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Except @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { - ((NMContext)context).setResourcePluginManager(rpm); + ((NMContext) context).setResourcePluginManager(rpm); return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, metrics, new BaseResourceTrackerForTest()); } @@ -239,7 +260,7 @@ protected ContainerManagerImpl createContainerManager(Context context, @Override protected ContainerExecutor createContainerExecutor(Configuration conf) { - ((NMContext)this.getNMContext()).setResourcePluginManager(rpm); + ((NMContext) this.getNMContext()).setResourcePluginManager(rpm); lce.setConf(conf); return lce; } @@ -257,6 +278,9 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) { boolean newHandlerAdded = false; for (ResourceHandler h : ((ResourceHandlerChain) handler) .getResourceHandlerList()) { + if (h instanceof DevicePluginAdapter) { + Assert.assertTrue(false); + } if (h instanceof CustomizedResourceHandler) { newHandlerAdded = true; break; @@ -264,4 +288,175 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) { } Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded); } + + // Disabled pluggable framework. + // We use spy object of real rpm to verify "initializePluggableDevicePlugins" + // because use mock rpm will not working + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkDisabled() throws Exception { + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + false); + nm.init(conf); + nm.start(); + verify(rpmSpy, times(1)).initialize( + any(Context.class)); + verify(rpmSpy, times(0)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + } + + // no configuration set. + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkDisabled2() throws Exception { + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + nm.init(conf); + nm.start(); + verify(rpmSpy, times(1)).initialize( + any(Context.class)); + verify(rpmSpy, times(0)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + } + + // Enable framework and configure pluggable device classes + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkEnabled() throws Exception { + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeDevicePlugin.class.getCanonicalName()); + nm.init(conf); + nm.start(); + verify(rpmSpy, times(1)).initialize( + any(Context.class)); + verify(rpmSpy, times(1)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + } + + // Enable pluggable framework, but leave device classes un-configured + // initializePluggableDevicePlugins invoked but it should throw an exception + @Test(timeout = 30000) + public void testInitializationWithPluggableDeviceFrameworkEnabled2() + throws ClassNotFoundException{ + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + Boolean fail = false; + try { + YarnConfiguration conf = createNMConfig(); + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + + nm.init(conf); + nm.start(); + } catch (YarnRuntimeException e) { + fail = true; + } catch (Exception e) { + + } + verify(rpmSpy, times(1)).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), any(Map.class)); + Assert.assertTrue(fail); + } + + @Test(timeout = 30000) + public void testNormalInitializationOfPluggableDeviceClasses() + throws Exception{ + + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeTestDevicePlugin1.class.getCanonicalName() + "," + + FakeDevicePlugin.class.getCanonicalName()); + nm.init(conf); + nm.start(); + Map pluginMap = rpmSpy.getNameToPlugins(); + Assert.assertEquals(2,pluginMap.size()); + ResourcePlugin rp = pluginMap.get("cmp.com/cmp"); + if (! (rp instanceof DevicePluginAdapter)) { + Assert.assertTrue(false); + } + } + + // Fail to load a class which doesn't implement interface DevicePlugin + @Test(timeout = 30000) + public void testLoadInvalidPluggableDeviceClasses() + throws Exception{ + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeTestDevicePlugin2.class.getCanonicalName()); + + String expectedMessage = "Class: " + FakeTestDevicePlugin2.class.getCanonicalName() + + " not instance of " + DevicePlugin.class.getCanonicalName(); + String actualMessage = ""; + try { + nm.init(conf); + nm.start(); + } catch (YarnRuntimeException e) { + actualMessage = e.getMessage(); + } + Assert.assertEquals(expectedMessage, actualMessage); + } + + @Test(timeout = 30000) + public void testLoadDuplicateResourceNameDevicePlugin() + throws Exception{ + ResourcePluginManager rpm = new ResourcePluginManager(); + + ResourcePluginManager rpmSpy = spy(rpm); + nm = new MyMockNM(rpmSpy); + + conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + true); + conf.setStrings(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, + FakeTestDevicePlugin1.class.getCanonicalName() + "," + + FakeTestDevicePlugin3.class.getCanonicalName()); + + String expectedMessage = "cmpA.com/hdwA" + + " has already been registered! Please change a resource type name"; + String actualMessage = ""; + try { + nm.init(conf); + nm.start(); + } catch (YarnRuntimeException e) { + actualMessage = e.getMessage(); + } + Assert.assertEquals(expectedMessage, actualMessage); + } + + @Test(timeout = 30000) + public void testRequestedResourceNameIsConfigured() + throws Exception{ + ResourcePluginManager rpm = new ResourcePluginManager(); + String resourceName = "a.com/a"; + Assert.assertFalse(rpm.isValidAndConfiguredResourceName(resourceName)); + resourceName = "cmp.com/cmp"; + Assert.assertTrue(rpm.isValidAndConfiguredResourceName(resourceName)); + } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java new file mode 100644 index 00000000000..68f28c95b2c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin1.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +/** + * Used only for testing. + * A fake normal vendor plugin + * */ +public class FakeTestDevicePlugin1 implements DevicePlugin { + @Override + public DeviceRegisterRequest register() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName("cmpA.com/hdwA").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) { + return null; + } + + @Override + public void OnDevicesReleased(Set allocatedDevices) { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java new file mode 100644 index 00000000000..8c1a61dc21c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin2.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +/** + * Only used for testing. + * This isn't a implementation of DevicePlugin + * */ +public class FakeTestDevicePlugin2 { + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java new file mode 100644 index 00000000000..b457c2e5b0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/FakeTestDevicePlugin3.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*; + +import java.util.Set; +import java.util.TreeSet; + +/** + * Only used for testing + * This plugin register a same name with FakeTestDevicePlugin1 + * */ +public class FakeTestDevicePlugin3 implements DevicePlugin { + @Override + public DeviceRegisterRequest register() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName("cmpA.com/hdwA").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(243) + .setMinorNumber(0) + .setBusID("0000:65:00.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) { + return null; + } + + @Override + public void OnDevicesReleased(Set allocatedDevices) { + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java new file mode 100644 index 00000000000..d20b8887312 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/TestDevicePluginAdapter.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework; + +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.*; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest; +import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.examples.FakeDevicePlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.verify; + +public class TestDevicePluginAdapter { + + private YarnConfiguration conf; + private String tempResourceTypesFile; + + @Before + public void setup() throws Exception { + this.conf = new YarnConfiguration(); + // setup resource-types.xml + ResourceUtils.resetResourceTypes(); + String resourceTypesFile = "resource-types-pluggable-devices.xml"; + this.tempResourceTypesFile = TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile); + } + + @After + public void tearDown() { + // cleanup resource-types.xml + File dest = new File(this.tempResourceTypesFile); + if (dest.exists()) { + dest.delete(); + } + } + + + /** + * Use the MyPlugin which doesn't implement scheduler interfaces + * Test Adapter's "Bootstrap", "preStart" and "postComplete" + * Test plugin's "getDevices". + * Plugin's initialization and "register" invocation is tested in + * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.TestResourcePluginManager} + * + * */ + @Test + public void testBasicWorkflow() + throws ResourceHandlerException, IOException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService storeService = mock(NMStateStoreService.class); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + MyPlugin plugin = new MyPlugin(); + MyPlugin spyPlugin = spy(plugin); + String resourceName = MyPlugin.resourceName; + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter(rpm, + resourceName, + spyPlugin); + // Bootstrap, adding device + adapter.bootstrap(conf); + int size = dsm.getAvailableDevices(resourceName); + Assert.assertEquals(3, size); + + // A container c1 requests 1 device + Container c1 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.preStart(c1); + // check book keeping + Assert.assertEquals(2, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(1, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.postComplete(getContainerId(0)); + Assert.assertEquals(3, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + + // A container c2 requests 3 device + Container c2 = mockContainerWithDeviceRequest(1, + resourceName, + 3,false); + // preStart + adapter.preStart(c2); + // check book keeping + Assert.assertEquals(0, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(3, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.postComplete(getContainerId(1)); + Assert.assertEquals(3, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(3, + dsm.getAllAllowedDevices().get(resourceName).size()); + } + + /** + * Use {@link FakeDevicePlugin} which implements + * {@link org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler} + * interface. + * Test Adapter's "Bootstrap", "preStart" and "postComplete" + * Test plugin's "getDevices", "allocateDevices" + * + * */ + @Test + public void testBasicWorkflowWithPluginAndPluginScheduler() + throws ResourceHandlerException, IOException { + NodeManager.NMContext context = mock(NodeManager.NMContext.class); + NMStateStoreService storeService = mock(NMStateStoreService.class); + when(context.getNMStateStore()).thenReturn(storeService); + doNothing().when(storeService).storeAssignedResources(isA(Container.class), + isA(String.class), + isA(ArrayList.class)); + // Init scheduler manager + DeviceSchedulerManager dsm = new DeviceSchedulerManager(context); + + ResourcePluginManager rpm = mock(ResourcePluginManager.class); + when(rpm.getDeviceSchedulerManager()).thenReturn(dsm); + + // Init an plugin + FakeDevicePlugin plugin = new FakeDevicePlugin(); + FakeDevicePlugin spyPlugin = spy(plugin); + String resourceName = FakeDevicePlugin.resourceName; + // Add customized device plugin scheduler + dsm.setPreferCustomizedScheduler(true); + dsm.addDevicePluginScheduler(resourceName,spyPlugin); + // Init an adapter for the plugin + DevicePluginAdapter adapter = new DevicePluginAdapter(rpm, + resourceName, + spyPlugin); + // Bootstrap, adding device + adapter.bootstrap(conf); + int size = dsm.getAvailableDevices(resourceName); + Assert.assertEquals(1, size); + + // A container requests 1 device + Container c1 = mockContainerWithDeviceRequest(0, + resourceName, + 1,false); + // preStart + adapter.preStart(c1); + // check if the plugin's own scheduler works + verify(spyPlugin, times(1)) + .allocateDevices(isA(Set.class),isA(Integer.class)); + // check book keeping + Assert.assertEquals(0, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(1, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dsm.getAllAllowedDevices().get(resourceName).size()); + // postComplete + adapter.postComplete(getContainerId(0)); + Assert.assertEquals(1, + dsm.getAvailableDevices(resourceName)); + Assert.assertEquals(0, + dsm.getAllUsedDevices().get(resourceName).size()); + Assert.assertEquals(1, + dsm.getAllAllowedDevices().get(resourceName).size()); + } + + private static Container mockContainerWithDeviceRequest(int id, + String resourceName, + int numDeviceRequest, + boolean dockerContainerEnabled) { + Container c = mock(Container.class); + when(c.getContainerId()).thenReturn(getContainerId(id)); + + Resource res = Resource.newInstance(1024, 1); + ResourceMappings resMapping = new ResourceMappings(); + + res.setResourceValue(resourceName, numDeviceRequest); + when(c.getResource()).thenReturn(res); + when(c.getResourceMappings()).thenReturn(resMapping); + + ContainerLaunchContext clc = mock(ContainerLaunchContext.class); + Map env = new HashMap<>(); + if (dockerContainerEnabled) { + env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE, + ContainerRuntimeConstants.CONTAINER_RUNTIME_DOCKER); + } + when(clc.getEnvironment()).thenReturn(env); + when(c.getLaunchContext()).thenReturn(clc); + return c; + } + + private static ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), id); + } + + + private class MyPlugin implements DevicePlugin { + private final static String resourceName = "cmpA.com/hdwA"; + @Override + public DeviceRegisterRequest register() { + return DeviceRegisterRequest.Builder.newInstance() + .setResourceName(resourceName) + .setPluginVersion("v1.0").build(); + } + + @Override + public Set getDevices() { + TreeSet r = new TreeSet<>(); + r.add(Device.Builder.newInstance() + .setID(0) + .setDevPath("/dev/hdwA0") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:00.0") + .setHealthy(true) + .build()); + r.add(Device.Builder.newInstance() + .setID(1) + .setDevPath("/dev/hdwA1") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:01.0") + .setHealthy(true) + .build()); + r.add(Device.Builder.newInstance() + .setID(2) + .setDevPath("/dev/hdwA2") + .setMajorNumber(256) + .setMinorNumber(0) + .setBusID("0000:80:02.0") + .setHealthy(true) + .build()); + return r; + } + + @Override + public DeviceRuntimeSpec OnDevicesAllocated(Set allocatedDevices) { + return null; + } + + @Override + public void OnDevicesReleased(Set releasedDevices) { + + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml new file mode 100644 index 00000000000..65b8e752f1c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/resource-types-pluggable-devices.xml @@ -0,0 +1,7 @@ + + + + yarn.resource-types + cmp.com/cmp,cmpA.com/hdwA + + \ No newline at end of file