diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5292a250533..d6e79f74927 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3698,6 +3698,13 @@ public static boolean areNodeLabelsEnabled( public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD = "/usr/bin/numactl"; + public static final String NODE_HEARTBEAT_PLIUGIN_CLASS = + NM_PREFIX + "node-heartbeat-plugin.class"; + public static final String DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS = + "org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPluginImpl"; + public static final String NULL_NODE_HEARTBEAT_PLIUGIN_CLASS = + "org.apache.hadoop.yarn.server.resourcemanager.api.NullNodeHeartBeatPluginImpl"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java index e69de29bb2d..cb9725987f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public abstract class BaseNodeHeartBeatPluginImpl + implements NodeHeartBeatPlugin { + + protected static final Log LOG = + LogFactory.getLog(BaseNodeHeartBeatPluginImpl.class); + + protected boolean initialized = false; + protected Configuration conf; + + public BaseNodeHeartBeatPluginImpl() { + + } + + @Override + public boolean init(Configuration conf) { + initialized = true; + this.conf = conf; + return initialized; + } + + @Override + public void stop() { + if (!initialized) { + LOG.error("Plugin is not initialized."); + } + } + + @Override + public void refreshConfigs(Configuration newConf) { + this.conf = newConf; + } + + @Override + public abstract void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode); + + public static NodeHeartBeatPlugin getNodeHeartBeatPluginImpl( + Configuration conf) { + + NodeHeartBeatPlugin plugin = null; + String nodeHeartBeatPluginClass = + conf.get(YarnConfiguration.NODE_HEARTBEAT_PLIUGIN_CLASS, + YarnConfiguration.DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS); + LOG.info("Using Node HeartBeat plugin: " + nodeHeartBeatPluginClass); + try { + Class clazz = Class.forName(nodeHeartBeatPluginClass); + if (NodeHeartBeatPlugin.class.isAssignableFrom(clazz)) { + plugin = (NodeHeartBeatPlugin) ReflectionUtils.newInstance(clazz, conf); + + if (plugin == null) { + plugin = (NodeHeartBeatPlugin) ReflectionUtils.newInstance( + Class.forName( + YarnConfiguration.NULL_NODE_HEARTBEAT_PLIUGIN_CLASS), + conf); + } + } else { + throw new YarnRuntimeException( + "Class: " + nodeHeartBeatPluginClass + " not instance of " + + NodeHeartBeatPlugin.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate Node HeartBeat plugin: " + + nodeHeartBeatPluginClass, + e); + } + return plugin; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java index e69de29bb2d..f5a4b6a1cab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public interface NodeHeartBeatPlugin { + + boolean init(Configuration conf); + + void stop(); + + void onNodeHeartBeat(RMContext rmContext, SchedulerNode schedulerNode); + + void addNode(RMNode rmNode); + + void removeNode(RMNode rmNode); + + void refreshConfigs(Configuration conf); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java index e69de29bb2d..11844c010ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class NodeHeartBeatPluginImpl extends BaseNodeHeartBeatPluginImpl { + + public NodeHeartBeatPluginImpl() { + } + + @Override + public void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode) { + updateUnallocatedResource(schedulerNode); + } + + private void updateUnallocatedResource(SchedulerNode schedulerNode) { + + // calculate UnallocatedResource based on different properties (for + // example, isSchedulerExternal()) and update. + // schedulerNode may need to expose corresponding setter method to set + // the value + } + + @Override + public void addNode(RMNode rmNode) { + } + + @Override + public void removeNode(RMNode rmNode) { + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java index e69de29bb2d..4c06c51a279 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java @@ -0,0 +1,27 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class NullNodeHeartBeatPluginImpl extends BaseNodeHeartBeatPluginImpl { + + public NullNodeHeartBeatPluginImpl() { + } + + @Override + public void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode) { + // do nothing + } + + @Override + public void addNode(RMNode rmNode) { + // do nothing + } + + @Override + public void removeNode(RMNode rmNode) { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d2e81a50d94..2ac38c7d30b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -65,6 +65,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.api.BaseNodeHeartBeatPluginImpl; +import org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPlugin; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; @@ -1139,6 +1141,9 @@ protected void nodeUpdate(RMNode nm) { updateNodeResourceUtilization(nm, schedulerNode); } + this.rmContext.getNodeHeartBeatPlugin().onNodeHeartBeat(rmContext, + schedulerNode); + // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( @@ -1452,6 +1457,23 @@ public void reinitialize(Configuration conf, RMContext rmContext) try { LOG.info("Reinitializing SchedulingMonitorManager ..."); schedulingMonitorManager.reinitialize(rmContext, conf); + + String nodeHeartBeatPluginClass = + conf.get(YarnConfiguration.NODE_HEARTBEAT_PLIUGIN_CLASS, + YarnConfiguration.DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS); + + if (!rmContext.getNodeHeartBeatPlugin().getClass().getCanonicalName() + .equals(nodeHeartBeatPluginClass)) { + LOG.info("Reinitializing Node HeartPluginImpl Instance..."); + NodeHeartBeatPlugin nodeHeartBeatPlugin = + BaseNodeHeartBeatPluginImpl.getNodeHeartBeatPluginImpl(conf); + nodeHeartBeatPlugin.init(conf); + rmContext.setNodeHeartBeatPlugin(nodeHeartBeatPlugin); + } else { + LOG.info("Refreshing configs of Node HeartPluginImpl Instance..."); + rmContext.getNodeHeartBeatPlugin().refreshConfigs(conf); + } + } catch (YarnException e) { throw new IOException(e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 50ab70d03ec..04f38ef6052 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1884,6 +1884,8 @@ private void addNode(RMNode nodeManager) { t.beginSchedule(); } } + + this.rmContext.getNodeHeartBeatPlugin().addNode(nodeManager); } finally { writeLock.unlock(); } @@ -1936,6 +1938,9 @@ private void removeNode(RMNode nodeInfo) { LOG.info( "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + getClusterResource()); + + this.rmContext.getNodeHeartBeatPlugin().removeNode(nodeInfo); + } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index eb9f6af7101..12e24a9402a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -784,6 +784,7 @@ private void addNode(List containerReports, recoverContainersOnNode(containerReports, node); updateRootQueueMetrics(); + this.rmContext.getNodeHeartBeatPlugin().addNode(node); } finally { writeLock.unlock(); } @@ -825,6 +826,8 @@ private void removeNode(RMNode rmNode) { LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); + + this.rmContext.getNodeHeartBeatPlugin().removeNode(rmNode); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8396db54ad8..34fcdb5603a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -891,6 +891,7 @@ private synchronized void removeNode(RMNode nodeInfo) { RMContainerEventType.KILL); } nodeTracker.removeNode(nodeInfo.getNodeID()); + this.rmContext.getNodeHeartBeatPlugin().removeNode(nodeInfo); } @Override @@ -913,6 +914,7 @@ private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); nodeTracker.addNode(schedulerNode); + this.rmContext.getNodeHeartBeatPlugin().addNode(nodeManager); } @Override