diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5292a250533..d6e79f74927 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3698,6 +3698,13 @@ public static boolean areNodeLabelsEnabled( public static final String DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD = "/usr/bin/numactl"; + public static final String NODE_HEARTBEAT_PLIUGIN_CLASS = + NM_PREFIX + "node-heartbeat-plugin.class"; + public static final String DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS = + "org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPluginImpl"; + public static final String NULL_NODE_HEARTBEAT_PLIUGIN_CLASS = + "org.apache.hadoop.yarn.server.resourcemanager.api.NullNodeHeartBeatPluginImpl"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 66065e33bae..57af8906f82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPlugin; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; @@ -113,6 +114,7 @@ private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; private ResourceProfilesManager resourceProfilesManager; + private NodeHeartBeatPlugin nodeHeartBeatPlugin; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -524,4 +526,12 @@ public void setResourceProfilesManager( ResourceProfilesManager resourceProfilesManager) { this.resourceProfilesManager = resourceProfilesManager; } + + public void setNodeHeartBeatPlugin(NodeHeartBeatPlugin nodeHeartBeatPlugin) { + this.nodeHeartBeatPlugin = nodeHeartBeatPlugin; + } + + public NodeHeartBeatPlugin getNodeHeartBeatPlugin() { + return this.nodeHeartBeatPlugin; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index eb91a311a3a..5ad774b0fc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPlugin; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -177,4 +178,8 @@ void setRMDelegatedNodeLabelsUpdater( void setPlacementConstraintManager( PlacementConstraintManager placementConstraintManager); + + NodeHeartBeatPlugin getNodeHeartBeatPlugin(); + + void setNodeHeartBeatPlugin(NodeHeartBeatPlugin nodeHeartBeatPlugin); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 84e0f6f6b58..852222e0e63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPlugin; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -619,5 +620,15 @@ public String getAppProxyUrl(Configuration conf, ApplicationId applicationId) public void setResourceProfilesManager(ResourceProfilesManager mgr) { this.activeServiceContext.setResourceProfilesManager(mgr); } + + @Override + public void setNodeHeartBeatPlugin(NodeHeartBeatPlugin nodeHeartBeatPlugin) { + this.activeServiceContext.setNodeHeartBeatPlugin(nodeHeartBeatPlugin); + } + + @Override + public NodeHeartBeatPlugin getNodeHeartBeatPlugin() { + return this.activeServiceContext.getNodeHeartBeatPlugin(); + } // Note: Read java doc before adding any services over here. } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c53311127c0..645f7737f68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -64,6 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; +import org.apache.hadoop.yarn.server.resourcemanager.api.BaseNodeHeartBeatPluginImpl; +import org.apache.hadoop.yarn.server.resourcemanager.api.NodeHeartBeatPlugin; import org.apache.hadoop.yarn.server.resourcemanager.federation.FederationStateStoreService; import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; @@ -333,6 +335,10 @@ protected void serviceInit(Configuration conf) throws Exception { addIfService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); + NodeHeartBeatPlugin nodeHeartBeatPlugin = BaseNodeHeartBeatPluginImpl + .getNodeHeartBeatPluginImpl(this.conf); + rmContext.setNodeHeartBeatPlugin(nodeHeartBeatPlugin); + super.serviceInit(this.conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java index e69de29bb2d..a9314728aab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/BaseNodeHeartBeatPluginImpl.java @@ -0,0 +1,58 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public abstract class BaseNodeHeartBeatPluginImpl + implements NodeHeartBeatPlugin { + + private static final Log LOG = + LogFactory.getLog(BaseNodeHeartBeatPluginImpl.class); + + public BaseNodeHeartBeatPluginImpl() { + + } + + @Override + public abstract void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode); + + public static NodeHeartBeatPlugin getNodeHeartBeatPluginImpl( + Configuration conf) { + + NodeHeartBeatPlugin plugin = null; + String nodeHeartBeatPluginClass = + conf.get(YarnConfiguration.NODE_HEARTBEAT_PLIUGIN_CLASS, + YarnConfiguration.DEFAULT_NODE_HEARTBEAT_PLIUGIN_CLASS); + LOG.info("Using Node HeartBeat plugin: " + nodeHeartBeatPluginClass); + try { + Class clazz = Class.forName(nodeHeartBeatPluginClass); + if (NodeHeartBeatPlugin.class.isAssignableFrom(clazz)) { + plugin = (NodeHeartBeatPlugin) ReflectionUtils.newInstance(clazz, conf); + + if (plugin == null) { + plugin = (NodeHeartBeatPlugin) ReflectionUtils.newInstance( + Class.forName( + YarnConfiguration.NULL_NODE_HEARTBEAT_PLIUGIN_CLASS), + conf); + } + } else { + throw new YarnRuntimeException( + "Class: " + nodeHeartBeatPluginClass + " not instance of " + + NodeHeartBeatPlugin.class.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException( + "Could not instantiate Node HeartBeat plugin: " + + nodeHeartBeatPluginClass, + e); + } + return plugin; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java index e69de29bb2d..a8a428638ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPlugin.java @@ -0,0 +1,8 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public interface NodeHeartBeatPlugin { + void onNodeHeartBeat(RMContext rmContext, SchedulerNode schedulerNode); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java index e69de29bb2d..cdf3092cfe0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NodeHeartBeatPluginImpl.java @@ -0,0 +1,24 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class NodeHeartBeatPluginImpl extends BaseNodeHeartBeatPluginImpl { + + public NodeHeartBeatPluginImpl() { + } + + @Override + public void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode) { + updateUnallocatedResource(schedulerNode); + } + + private void updateUnallocatedResource(SchedulerNode schedulerNode) { + + // calculate UnallocatedResource based on different properties (for + // example, isSchedulerExternal()) and update. + // schedulerNode may need to expose corresponding setter method to set + // the value + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java index e69de29bb2d..725a4ff9460 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/api/NullNodeHeartBeatPluginImpl.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.yarn.server.resourcemanager.api; + +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +public class NullNodeHeartBeatPluginImpl extends BaseNodeHeartBeatPluginImpl { + + public NullNodeHeartBeatPluginImpl() { + } + + @Override + public void onNodeHeartBeat(RMContext rmContext, + SchedulerNode schedulerNode) { + // do nothing + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d2e81a50d94..239ce7f6d94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.api.BaseNodeHeartBeatPluginImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; @@ -1139,6 +1140,9 @@ protected void nodeUpdate(RMNode nm) { updateNodeResourceUtilization(nm, schedulerNode); } + this.rmContext.getNodeHeartBeatPlugin().onNodeHeartBeat(rmContext, + schedulerNode); + // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( @@ -1452,6 +1456,10 @@ public void reinitialize(Configuration conf, RMContext rmContext) try { LOG.info("Reinitializing SchedulingMonitorManager ..."); schedulingMonitorManager.reinitialize(rmContext, conf); + + LOG.info("Reinitializing Node HeartPluginImpl Instance..."); + rmContext.setNodeHeartBeatPlugin( + BaseNodeHeartBeatPluginImpl.getNodeHeartBeatPluginImpl(conf)); } catch (YarnException e) { throw new IOException(e); }