diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index dbd3663..6b3ae0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1316,6 +1316,34 @@ public static boolean isAclEnabled(Configuration conf) { public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT = NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit"; + /** + * Prefix for FPGA configurations. Work in progress: This configuration + * parameter may be changed/removed in the future. + */ + @Private + public static final String NM_FPGA_RESOURCE_PREFIX = NM_PREFIX + + "resource.fpga."; + + /** + * This setting controls if resource handling for FPGA operations is enabled. + */ + @Private + public static final String NM_FPGA_RESOURCE_ENABLED = + NM_FPGA_RESOURCE_PREFIX + "enabled"; + + /** + * Settings for fpga vendor plugin + */ + @Private + public static final String NM_FPGA_PLUGIN_CLASS = + NM_PREFIX + "fpga.plugin.class"; + + /** + * FPGA as a resource is disabled by default. + **/ + @Private + public static final boolean DEFAULT_NM_FPGA_RESOURCE_ENABLED = false; + /** NM Webapp address.**/ public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final int DEFAULT_NM_WEBAPP_PORT = 8042; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 86cf872..2994c3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -392,6 +392,28 @@ public static String getUnits(String resourceValue) { return readOnlyNodeResources; } + /** + * Function to get the device allowed infomation. The value format should be comma separated majorNumber:minorNumber + * + * @param conf + * @return a map of resource type and allowed value string + */ + public static Map getResourceTypeAllowedValue(Configuration conf) { + Map allowedDevices = new HashMap<>(); + for (Map.Entry entry : conf) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(YarnConfiguration.NM_RESOURCES_PREFIX)) { + String[] parts = key.split("\\."); + LOG.info("Found allowed device resource entry " + key); + if (parts.length == 5 && parts[4].equalsIgnoreCase("allowed")) { + allowedDevices.put(parts[3], value); + } + } + } + return allowedDevices; + } + private static Map initializeNodeResourceInformation(Configuration conf) { Map nodeResources = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java index 8fc35a8..92cd6b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java @@ -69,6 +69,7 @@ String getName() { String CGROUP_CPU_QUOTA_US = "cfs_quota_us"; String CGROUP_CPU_SHARES = "shares"; + String CGROUP_PARAM_DEVICE_DENY = "deny"; /** * Mounts or initializes a cgroup controller. * @param controller - the controller being initialized diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/AbstractFpgaPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/AbstractFpgaPlugin.java new file mode 100644 index 0000000..2c64690 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/AbstractFpgaPlugin.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.List; + + +/** + * FPGA plugin interface for vendor to implement + * + * */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AbstractFpgaPlugin { + + boolean initPlugin(); + + String getExistingIPID(int major, int minor); + + String getFpgaType(); + + /** + * the vendor should check if the IP file has already been downloaded + * */ + String downloadIP(String id, String dstDir); + + boolean configureIP(String ipPath, List addresses); + + boolean cleanupFpgas(List address); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaPluginChain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaPluginChain.java new file mode 100644 index 0000000..758abd4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaPluginChain.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FpgaPluginChain{ + + private HashMap plugins; + + FpgaPluginChain(){} + + FpgaPluginChain(HashMap plugins) { + this.plugins = plugins; + } + + + public void addPlugin(AbstractFpgaPlugin plugin) { + if (null == plugins) { + plugins = new HashMap<>(); + } + plugins.put(plugin.getFpgaType(), plugin); + } + + public AbstractFpgaPlugin getPlugin(String type) { + return plugins.get(type); + } + + public List getPlugins() { + return new ArrayList<>(plugins.values()); + } + + public boolean initPlugin() { + for (AbstractFpgaPlugin plugin : plugins.values()) { + if (!plugin.initPlugin()) { + return false; + } + } + return true; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceAllocator.java new file mode 100644 index 0000000..cc8f5a3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceAllocator.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga; + + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.*; + +public class FpgaResourceAllocator { + + static final Log LOG = LogFactory.getLog(FpgaResourceAllocator.class); + + //key is resource type + private LinkedHashMap> availableFpga = new LinkedHashMap<>(); + + //key is requetor + private LinkedHashMap> usedFpgaByRequestor = new LinkedHashMap<>(); + + @VisibleForTesting + public HashMap> getAvailableFpga() { + return availableFpga; + } + + @VisibleForTesting + public int getAvailableFpgaCount() { + int count = 0; + for (List l : availableFpga.values()) { + count += l.size(); + } + return count; + } + @VisibleForTesting + public HashMap> getUsedFpga() { + return usedFpgaByRequestor; + } + + @VisibleForTesting + public int getUsedFpgaCount() { + int count = 0; + for (List l : usedFpgaByRequestor.values()) { + count += l.size(); + } + return count; + } + public static class FpgaAllocation { + + private List allowed = Collections.emptyList(); + + private List denied = Collections.emptyList(); + + FpgaAllocation(List allowed, List denied) { + if (allowed != null) { + this.allowed = ImmutableList.copyOf(allowed); + } + if (denied != null) { + this.denied = ImmutableList.copyOf(denied); + } + } + + public List getAllowed() { + return allowed; + } + + public List getDenied() { + return denied; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("\nFpgaAllocation\n\tAllowed:\n"); + for (FpgaDevice device : allowed) { + sb.append("\t\t"); + sb.append(device + "\n"); + } + sb.append("\tDenied\n"); + for (FpgaDevice device : denied) { + sb.append("\t\t"); + sb.append(device + "\n"); + } + return sb.toString(); + } + } + + static class FpgaDevice implements Comparable{ + + public String getType() { + return type; + } + + public Integer getMajor() { + return major; + } + + public Integer getMinor() { + return minor; + } + + public String getIPID() { + return IPID; + } + + public void setIPID(String IPID) { + this.IPID = IPID; + } + + private String type; + private Integer major; + private Integer minor; + private String IPID; + + FpgaDevice(String type, Integer major, Integer minor, String IPID) { + this.type = type; + this.major = major; + this.minor = minor; + this.IPID = IPID; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof FpgaDevice)) { + return false; + } + FpgaDevice other = (FpgaDevice)obj; + if (other.getType() == this.type && + other.getMajor() == this.major && + other.getMinor() == this.minor) { + return true; + } + return false; + } + + @Override + public int compareTo(FpgaDevice o) { + return 0; + } + + @Override + public String toString() { + return "FPGAdevice:(Type: " + this.type + ",Major: " + this.major + ", Minor: " + this.minor + ",IPID:" + this.IPID + ")"; + } + } + + public synchronized void addFpga(String type, int majorNumber, int minorNumber, String IPID) { + FpgaDevice newDevice = new FpgaDevice(type, majorNumber, minorNumber, IPID); + if (availableFpga.get(type) == null) { + List list = new LinkedList<>(); + list.add(newDevice); + availableFpga.put(type,list); + } else { + availableFpga.get(type).add(newDevice); + } + LOG.info("Add a FPGADevice: " + newDevice); + } + + public synchronized void updateFpga(String requestor, FpgaAllocation allocation, String newIPID) { + List usedFpgas = usedFpgaByRequestor.get(requestor); + List reconfiguredFpgas = allocation.getAllowed(); + int index; + for (FpgaDevice device : reconfiguredFpgas) { + index = findMatchedFpga(usedFpgas, device); + if (-1 != index) { + usedFpgas.get(index).setIPID(newIPID); + } else { + LOG.warn("unknown reason that no record for this allocated device:" + device); + } + } + } + + private synchronized int findMatchedFpga(List devices, FpgaDevice item) { + int i = 0; + for (; i < devices.size(); i++) { + if (devices.get(i) == item) { + return i; + } + } + return -1; + } + + public synchronized FpgaAllocation assignFpga(String type, long count, String requestor, String preference) { + List currentAvailableFpga = availableFpga.get(type); + if (null == currentAvailableFpga) { + LOG.warn("No such type of resource available: " + type); + } + if (count <= 0 || count > currentAvailableFpga.size()) { + LOG.warn("Invalid request count or no enough FPGA:" + count + ", available:" + getAvailableFpgaCount()); + return null; + } + List assignedFpgas = new ArrayList<>(); + int matchIPCount = 0; + for (FpgaDevice device : currentAvailableFpga) { + if (device.getIPID() == preference) { + assignedFpgas.add(device); + currentAvailableFpga.remove(device); + matchIPCount++; + } + } + int remaining = (int)count - matchIPCount; + while (remaining > 0) { + assignedFpgas.add(currentAvailableFpga.remove(0)); + remaining--; + } + if (null == usedFpgaByRequestor.get(requestor)) { + usedFpgaByRequestor.put(requestor, assignedFpgas); + } else { + usedFpgaByRequestor.get(requestor).addAll(assignedFpgas); + } + + return new FpgaAllocation(assignedFpgas, currentAvailableFpga); + } + + public synchronized void cleanupAssignFpgas(String requestor) { + List usedFpgas = usedFpgaByRequestor.get(requestor); + for (FpgaDevice device : usedFpgas) { + availableFpga.get(device.getType()).add(device);//add back to availableFpga + } + usedFpgaByRequestor.remove(requestor); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceHandler.java new file mode 100644 index 0000000..a150fee --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceHandler.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; + +/** + * Resource handler for cpu resources. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface FpgaResourceHandler extends ResourceHandler { +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceHandlerImpl.java new file mode 100644 index 0000000..f714936 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/Fpga/FpgaResourceHandlerImpl.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@InterfaceStability.Unstable +@InterfaceAudience.Private +public class FpgaResourceHandlerImpl implements FpgaResourceHandler { + + static final Log LOG = LogFactory.getLog(FpgaResourceHandlerImpl.class); + + private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID"; + + private FpgaPluginChain pluginChain; + + private FpgaResourceAllocator allocator; + + private CGroupsHandler cGroupsHandler; + + public FpgaResourceHandlerImpl(CGroupsHandler cGroupsHandler, Configuration conf) { + + LOG.info("FPGA Plugin Chain init."); + + allocator = new FpgaResourceAllocator(); + //init all plugins based on configurations or hardcode + pluginChain = new FpgaPluginChain(); + + String[] fpgaPluginClassStrs = conf.getStrings(YarnConfiguration.NM_FPGA_PLUGIN_CLASS); + if(fpgaPluginClassStrs == null) { + LOG.info("No FPGA plugin can be loaded."); + } else { + + for (String fpgaPluginClass : fpgaPluginClassStrs) { + LOG.info("FPGA Plugin Class " + fpgaPluginClass); + try { + Constructor constructor = Class.forName(fpgaPluginClass).getConstructor(); + AbstractFpgaPlugin fpgaPlugin = (AbstractFpgaPlugin) constructor.newInstance(); + pluginChain.addPlugin(fpgaPlugin); + LOG.info(fpgaPluginClass + " loaded"); + } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + throw new YarnRuntimeException(e); + } + } + } + this.cGroupsHandler = cGroupsHandler; + } + + @VisibleForTesting + public void addFpgaPlugin(AbstractFpgaPlugin plugin) { + pluginChain.addPlugin(plugin); + } + + @VisibleForTesting + public static String getDeviceDeniedValue(int deviceMajorNumber, int deviceMinorNumber) { + String val = String.format("c %d:%d rwm", deviceMajorNumber, deviceMinorNumber); + LOG.info("Add denied devices to cgroups:" + val); + return val; + } + + @VisibleForTesting + public FpgaResourceAllocator getFpgaAllocator() { + return allocator; + } + + public String getRequestedIPID(Container container) { + Map envs = container.getLaunchContext().getEnvironment(); + return envs.get(REQUEST_FPGA_IP_ID_KEY); + } + + @Override + public List bootstrap(Configuration configuration) throws ResourceHandlerException { + // get vendor plugin type, major and minor number from configuration + // add FPGA devices to allocator + if (!pluginChain.initPlugin()){ + throw new ResourceHandlerException("Fpga plugin initialization failed", null); + } + //get major number and minor number from configuration node-resource.xml + Map allowed = ResourceUtils.getResourceTypeAllowedValue(configuration); + for (AbstractFpgaPlugin plugin : pluginChain.getPlugins()) { + if (allowed.keySet().contains(plugin.getFpgaType())) { + String temp = allowed.get(plugin.getFpgaType()); + String[] parts = temp.split(","); + for (String deviceNumber : parts) { + String[] majorAndMinor = deviceNumber.split(":"); + if (majorAndMinor.length == 2) { + int major = Integer.parseInt(majorAndMinor[0]); + int minor = Integer.parseInt(majorAndMinor[1]); + allocator.addFpga(plugin.getFpgaType(), major, minor, + plugin.getExistingIPID(major, minor)); + } else { + LOG.warn("wrong format of allowed device value:" + temp); + } + } + } else { + LOG.warn("no allowed device number configured for FPGA plugin:" + plugin.getFpgaType()); + } + } + return null; + } + + @Override + public List preStart(Container container) throws ResourceHandlerException { + //1. get requested FPGA type and count, choose corresponding FPGA vendor plugin + //2. use allocator.assignFpga(type, count) to get FPGAAllocation + //3. downloadIP and configureIP + //4. isolate device + List ret = new ArrayList<>(); + String containerIdStr = container.getContainerId().toString(); + Map requestedResources = container.getResource().getResources(); + + // Create device cgroups for the container + cGroupsHandler.createCGroup(CGroupsHandler.CGroupController.DEVICES, + containerIdStr); + + AbstractFpgaPlugin tempPlugin; + long deviceCount; + String resourceName; + String ipFilePath; + for (Map.Entry entry : requestedResources.entrySet()) { + resourceName = entry.getKey(); + //find the proper plugin to handle the matching resource name + tempPlugin = pluginChain.getPlugin(resourceName); + if (tempPlugin != null) { + //we have plugin that can handle this FPGA device request, allocate real device + deviceCount = entry.getValue().getValue(); + FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga( + resourceName, deviceCount, containerIdStr, getRequestedIPID(container)); + LOG.info("FpgaAllocation:" + allocation); + if (null == allocation) { + LOG.warn("null allocation for FPGA type: " + resourceName + ", requestor:" + containerIdStr); + throw new ResourceHandlerException("Not enough devices! request:" + deviceCount + ",available:" + allocator.getAvailableFpgaCount()); + } + try { + //update cgroup device param + for (FpgaResourceAllocator.FpgaDevice device : allocation.getDenied()) { + cGroupsHandler.updateCGroupParam( + CGroupsHandler.CGroupController.DEVICES, containerIdStr, + CGroupsHandler.CGROUP_PARAM_DEVICE_DENY, + getDeviceDeniedValue(device.getMajor(), device.getMinor())); + } + //downloadIp and configure IP + ipFilePath = tempPlugin.downloadIP(getRequestedIPID(container), container.getWorkDir()); + if (null == ipFilePath) { + throw new ResourceHandlerException("Fpga plugin failed to download IP", null); + } + List allowed = allocation.getAllowed(); + List addresses = new ArrayList<>(); + for(int i = 0; i < allowed.size(); i++) { + addresses.add(allowed.get(i).getMajor() + ":" + allowed.get(i).getMinor()); + } + + if (!tempPlugin.configureIP(ipFilePath, addresses)) { + throw new ResourceHandlerException("Fpga plugin failed to configure IP", null); + } + //update the allocator that we update an IP of a device + allocator.updateFpga(containerIdStr, allocation, getRequestedIPID(container)); + //TODO: update the node constraint label + } catch (ResourceHandlerException re) { + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, + containerIdStr); + throw re; + } + //isolation operation + ret.add(new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + PrivilegedOperation.CGROUP_ARG_PREFIX + + cGroupsHandler.getPathForCGroupTasks( + CGroupsHandler.CGroupController.DEVICES, containerIdStr))); + }//end if + }//end for + return ret; + } + + @Override + public List reacquireContainer(ContainerId containerId) throws ResourceHandlerException { + return null; + } + + @Override + public List postComplete(ContainerId containerId) throws ResourceHandlerException { + allocator.cleanupAssignFpgas(containerId.toString()); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, + containerId.toString()); + return null; + } + + @Override + public List teardown() throws ResourceHandlerException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java index 7fc04bd..fb9e361 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga.FpgaResourceHandlerImpl; import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; @@ -87,6 +88,18 @@ public static CGroupsHandler getCGroupsHandler() { return cGroupsHandler; } + private static FpgaResourceHandlerImpl getFpgaResourceHandler( + Configuration conf) throws ResourceHandlerException { + boolean fpgaEnabled = conf.getBoolean( + YarnConfiguration.NM_FPGA_RESOURCE_ENABLED, + YarnConfiguration.DEFAULT_NM_FPGA_RESOURCE_ENABLED); + if (fpgaEnabled) { + return new FpgaResourceHandlerImpl( + getInitializedCGroupsHandler(conf), conf); + } + return null; + } + private static CGroupsCpuResourceHandlerImpl getCGroupsCpuResourceHandler( Configuration conf) throws ResourceHandlerException { boolean cgroupsCpuEnabled = @@ -205,6 +218,7 @@ private static void initializeConfiguredResourceHandlerChain( addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf)); + addHandlerIfNotNull(handlerList, getFpgaResourceHandler(conf)); resourceHandlerChain = new ResourceHandlerChain(handlerList); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestFpgaResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestFpgaResourceHandler.java new file mode 100644 index 0000000..3fbbe22 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestFpgaResourceHandler.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga.AbstractFpgaPlugin; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.Fpga.FpgaResourceHandlerImpl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class TestFpgaResourceHandler { + private FpgaResourceHandlerImpl fpgaResourceHandler; + private Configuration configuration; + + /** + * it's better to define allowed devices in the node-resource.xml: + * + * yarn.nodemanager.resource-types.MCP + * 2 + * + * + * yarn.nodemanager.resource-types.MCP.allowed + * 244:0,245:1 + * + * + * yarn.nodemanager.resource-types.DCP + * 2 + * + * + * yarn.nodemanager.resource-types.DCP.allowed + * 100:0,100:1 + * + */ + @Before + public void setup() { + configuration = new YarnConfiguration(); + fpgaResourceHandler = new FpgaResourceHandlerImpl(mock(CGroupsHandler.class), configuration); + configuration.set(YarnConfiguration.NM_RESOURCES_PREFIX + "MCP.allowed", "244:0,245:1"); + } + + @Test + public void testBootstrap() throws ResourceHandlerException { + fpgaResourceHandler.bootstrap(configuration); + Assert.assertEquals(1, + fpgaResourceHandler.getFpgaAllocator().getAvailableFpga().keySet().size()); + Assert.assertEquals(2, + fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount()); + + } + + @Test + public void testPreStartWithOnePlugin() throws ResourceHandlerException { + fpgaResourceHandler.bootstrap(configuration); + //the id-0 container request 1 FPGA of MCP type and GEMM IP + fpgaResourceHandler.preStart(mockContainer(0,"MCP",1,"GEMM")); + Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount()); + //the id-1 container request 2 FPGA of MCP and GEMM IP. this should failed + try{ + fpgaResourceHandler.preStart(mockContainer(1,"MCP",2,"GEMM")); + } catch (ResourceHandlerException e) { + Assert.assertTrue(true); + } + //release the id-0 container + fpgaResourceHandler.postComplete(getContainerId(0)); + Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount()); + Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount()); + //re-allocate for the id-1 container + fpgaResourceHandler.preStart(mockContainer(1,"MCP",2,"GEMM")); + Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount()); + Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount()); + //release container id-1 + fpgaResourceHandler.postComplete(getContainerId(1)); + //id-2 and id-3 + fpgaResourceHandler.preStart(mockContainer(2,"MCP",1,"GEMM")); + fpgaResourceHandler.preStart(mockContainer(3,"MCP",1,"GEMM")); + Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount()); + Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount()); + fpgaResourceHandler.postComplete(getContainerId(2)); + fpgaResourceHandler.postComplete(getContainerId(3)); + + } + + @Test + public void testPreStartWithMultiplePlugins() throws ResourceHandlerException { + fpgaResourceHandler.addFpgaPlugin(mockPlugin("DCP")); + configuration.set(YarnConfiguration.NM_RESOURCES_PREFIX + "DCP.allowed", "100:0,100:1"); + fpgaResourceHandler.bootstrap(configuration); + Assert.assertEquals(2, + fpgaResourceHandler.getFpgaAllocator().getAvailableFpga().keySet().size()); + Assert.assertEquals(4, + fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount()); + fpgaResourceHandler.preStart(mockContainer(0,"MCP",2,"GEMM")); + fpgaResourceHandler.preStart(mockContainer(1,"DCP",1,"LZO")); + Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpga().get("MCP").size()); + Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpga().get(getContainerId(0).toString()).size()); + Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpga().get("DCP").size()); + Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpga().get(getContainerId(1).toString()).size()); + } + + private static AbstractFpgaPlugin mockPlugin(String type) { + AbstractFpgaPlugin plugin = mock(AbstractFpgaPlugin.class); + when(plugin.initPlugin()).thenReturn(true); + when(plugin.getFpgaType()).thenReturn(type); + when(plugin.getExistingIPID(Mockito.anyInt(), Mockito.anyInt())).thenReturn("LZO"); + when(plugin.cleanupFpgas(Mockito.anyObject())).thenReturn(true); + when(plugin.downloadIP(Mockito.anyString(), Mockito.anyString())).thenReturn("/tmp"); + when(plugin.configureIP(Mockito.anyString(), Mockito.anyObject())).thenReturn(true); + return plugin; + } + + private static Container mockContainer(int id, String type, int numFpga, String IPID) { + Container c = mock(Container.class); + ResourceInformation resourceInformation = ResourceInformation.newInstance(type,numFpga); + Map map = new HashMap<>(); + map.put(type,resourceInformation); + Resource r = mock(Resource.class); + when(c.getResource()).thenReturn(r); + when(r.getResources()).thenReturn(map); + when(c.getContainerId()).thenReturn(getContainerId(id)); + ContainerLaunchContext clc = mock(ContainerLaunchContext.class); + Map envs = new HashMap<>(); + envs.put("REQUESTED_FPGA_IP_ID", IPID); + when(c.getLaunchContext()).thenReturn(clc); + when(clc.getEnvironment()).thenReturn(envs); + when(c.getWorkDir()).thenReturn("/tmp"); + return c; + } + + private static ContainerId getContainerId(int id) { + return ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), id); + } +}