diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6488ebfc4ec..8a6d6990e9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3810,6 +3810,11 @@ public static boolean areNodeLabelsEnabled( public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD = "/usr/bin/numactl"; + public static final String NODE_HEARTBEAT_PLIUGIN_CLASS = + NM_PREFIX + "node-heartbeat-plugin.class"; + public static final String DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS = + "org.apache.hadoop.yarn.server.resourcemanager.api.NullNodeHeartBeatPluginImpl"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0acfca79110..c2b82f9b079 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; @@ -88,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.NodeHeartBeatPlugin; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -174,6 +177,7 @@ protected SchedulingMonitorManager schedulingMonitorManager = new SchedulingMonitorManager(); + private NodeHeartBeatPlugin nodeHeartBeatPlugin; /** * Construct the service. * @@ -213,6 +217,9 @@ public void serviceInit(Configuration conf) throws Exception { new RMCriticalThreadUncaughtExceptionHandler(rmContext)); updateThread.setDaemon(true); } + + this.nodeHeartBeatPlugin = createNodeHeartBeatPlugin(conf); + super.serviceInit(conf); } @@ -1149,6 +1156,9 @@ protected void nodeUpdate(RMNode nm) { updateNodeResourceUtilization(nm, schedulerNode); } + getNodeHeartBeatPlugin().onNodeHeartBeat(rmContext, + schedulerNode); + // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( @@ -1475,11 +1485,64 @@ public void reinitialize(Configuration conf, RMContext rmContext) try { LOG.info("Reinitializing SchedulingMonitorManager ..."); schedulingMonitorManager.reinitialize(rmContext, conf); + + String nodeHeartBeatPluginClass = + conf.get(YarnConfiguration.NODE_HEARTBEAT_PLIUGIN_CLASS, + YarnConfiguration.DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS); + + if (!getNodeHeartBeatPlugin().getClass().getCanonicalName() + .equals(nodeHeartBeatPluginClass)) { + LOG.info("Reinitializing Node HeartPluginImpl Instance..."); + this.nodeHeartBeatPlugin = createNodeHeartBeatPlugin(conf); + } else { + LOG.info("Refreshing configs of Node HeartPluginImpl Instance..."); + getNodeHeartBeatPlugin().refreshConfigs(conf); + } + } catch (YarnException e) { throw new IOException(e); } } + public static NodeHeartBeatPlugin createNodeHeartBeatPlugin( + Configuration conf) { + + NodeHeartBeatPlugin plugin = null; + String nodeHeartBeatPluginClass = + conf.get(YarnConfiguration.NODE_HEARTBEAT_PLIUGIN_CLASS, + YarnConfiguration.DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS); + LOG.info("Using Node HeartBeat plugin: " + nodeHeartBeatPluginClass); + try { + Class clazz = Class.forName(nodeHeartBeatPluginClass); + if (NodeHeartBeatPlugin.class.isAssignableFrom(clazz)) { + plugin = (NodeHeartBeatPlugin) ReflectionUtils.newInstance(clazz, conf); + + if (plugin == null) { + plugin = + (NodeHeartBeatPlugin) ReflectionUtils.newInstance( + Class.forName( + YarnConfiguration.DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS), + conf); + } + plugin.init(conf); + } else { + throw new YarnRuntimeException( + "Class: " + nodeHeartBeatPluginClass + " not instance of " + + NodeHeartBeatPlugin.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate Node HeartBeat plugin: " + + nodeHeartBeatPluginClass, + e); + } + return plugin; + } + + public NodeHeartBeatPlugin getNodeHeartBeatPlugin() { + return this.nodeHeartBeatPlugin; + } + /** * Default implementation. Always returns false. * @param appAttempt ApplicationAttempt. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 75d61442577..5df48f01132 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2010,6 +2010,8 @@ private void addNode(RMNode nodeManager) { t.beginSchedule(); } } + + getNodeHeartBeatPlugin().addNode(nodeManager); } finally { writeLock.unlock(); } @@ -2062,6 +2064,9 @@ private void removeNode(RMNode nodeInfo) { LOG.info( "Removed node " + nodeInfo.getNodeAddress() + " clusterResource: " + getClusterResource()); + + getNodeHeartBeatPlugin().removeNode(nodeInfo); + } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeHeartBeatPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeHeartBeatPlugin.java index e69de29bb2d..d86d7989edb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeHeartBeatPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NodeHeartBeatPlugin.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Interface to define the extra behaviour processing required for node + * heartbeats + * + */ +@InterfaceStability.Unstable +@InterfaceAudience.Private +public interface NodeHeartBeatPlugin { + + boolean init(Configuration conf); + + void stop(); + + void onNodeHeartBeat(RMContext rmContext, SchedulerNode schedulerNode); + + void addNode(RMNode rmNode); + + void removeNode(RMNode rmNode); + + void refreshConfigs(Configuration conf); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NullNodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NullNodeHeartBeatPluginImpl.java index e69de29bb2d..ef4ca38cb03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NullNodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/NullNodeHeartBeatPluginImpl.java @@ -0,0 +1,48 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Dummy implementation of {@link NodeHeartBeatPlugin}. If this implementation + * is used, no extra behaviour processing would be done with respect to node + * heartbeats. + * + */ +@Unstable +@Private +public class NullNodeHeartBeatPluginImpl implements NodeHeartBeatPlugin { + + public NullNodeHeartBeatPluginImpl() { + } + + @Override + public boolean init(Configuration conf) { + return false; + } + + @Override + public void stop() { + } + + @Override + public void refreshConfigs(Configuration conf) { + } + + @Override + public void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode) { + } + + @Override + public void addNode(RMNode rmNode) { + } + + @Override + public void removeNode(RMNode rmNode) { + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index da5e4c9347e..77f331d97b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -787,6 +787,7 @@ private void addNode(List containerReports, recoverContainersOnNode(containerReports, node); updateRootQueueMetrics(); + getNodeHeartBeatPlugin().addNode(node); } finally { writeLock.unlock(); } @@ -828,6 +829,8 @@ private void removeNode(RMNode rmNode) { LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); + + getNodeHeartBeatPlugin().removeNode(rmNode); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8396db54ad8..79539d32039 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -891,6 +891,7 @@ private synchronized void removeNode(RMNode nodeInfo) { RMContainerEventType.KILL); } nodeTracker.removeNode(nodeInfo.getNodeID()); + getNodeHeartBeatPlugin().removeNode(nodeInfo); } @Override @@ -913,6 +914,7 @@ private synchronized void addNode(RMNode nodeManager) { FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager, usePortForNodeName); nodeTracker.addNode(schedulerNode); + getNodeHeartBeatPlugin().addNode(nodeManager); } @Override