diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 70b87f3..37db6bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -902,14 +902,23 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_NM_WEBAPP_HTTPS_PORT; + /** How often to monitor the node.*/ + public static final String NM_NODE_MON_INTERVAL_MS = NM_PREFIX + + "node-monitor.interval-ms"; + public final static int DEFAULT_NM_NODE_MON_INTERVAL_MS = 3000; + + /** Class that calculates nodes current resource utilization.*/ + public static final String NM_NODE_MON_RESOURCE_CALCULATOR = + NM_PREFIX + "node-monitor.resource-calculator.class"; + /** How often to monitor containers.*/ - public final static String NM_CONTAINER_MON_INTERVAL_MS = + public static final String NM_CONTAINER_MON_INTERVAL_MS = NM_PREFIX + "container-monitor.interval-ms"; - public final static int DEFAULT_NM_CONTAINER_MON_INTERVAL_MS = 3000; + public static final int DEFAULT_NM_CONTAINER_MON_INTERVAL_MS = 3000; /** Class that calculates containers current resource utilization.*/ public static final String NM_CONTAINER_MON_RESOURCE_CALCULATOR = - NM_PREFIX + "container-monitor.resource-calculator.class"; + NM_PREFIX + "container-monitor.resource-calculator.class"; /** Class that calculates process tree resource utilization.*/ public static final String NM_CONTAINER_MON_PROCESS_TREE = NM_PREFIX + "container-monitor.process-tree.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 6718b53..e1a533d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -121,7 +121,7 @@ protected NodeLabelsProvider createNodeLabelsProvider( } protected NodeResourceMonitor createNodeResourceMonitor() { - return new NodeResourceMonitorImpl(); + return new NodeResourceMonitorImpl(context); } protected ContainerManagerImpl createContainerManager(Context context, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java index be13d22..306dbc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java @@ -19,7 +19,15 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.Resource; +/** + * Interface for monitoring the resources of a node. + */ public interface NodeResourceMonitor extends Service { - + /** + * Get the resource utilization of the node. + * @return resource utilization of the node. + */ + public Resource getUtilization(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java index ea82546..27777b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java @@ -18,13 +18,153 @@ package org.apache.hadoop.yarn.server.nodemanager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.resource.Resources; +/** + * Implementation of the node resource monitor. It periodically tracks the + * resource utilization of the node and reports it to the NM. + */ public class NodeResourceMonitorImpl extends AbstractService implements NodeResourceMonitor { - public NodeResourceMonitorImpl() { + /** Logging infrastructure. */ + final static Log LOG = LogFactory + .getLog(NodeResourceMonitorImpl.class); + + /** Interval to monitor the node resource utilization. */ + private long monitoringInterval; + /** Thread to monitor the node resource utilization. */ + private MonitoringThread monitoringThread; + + /** Node manager context. */ + private final Context context; + /** Resource calculator. */ + private ResourceCalculatorPlugin resourceCalculatorPlugin; + + /** Current resource utilization of the node. */ + private Resource nodeUtilization; + + /** + * Initialize the node resource monitor. + * @param context Node manager context. + */ + public NodeResourceMonitorImpl(Context context) { super(NodeResourceMonitorImpl.class.getName()); + + this.context = context; + + this.monitoringThread = new MonitoringThread(); + } + + /** + * Initialize the service with the proper parameters. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.monitoringInterval = + conf.getLong(YarnConfiguration.NM_NODE_MON_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_NODE_MON_INTERVAL_MS); + + Class clazz = + conf.getClass(YarnConfiguration.NM_NODE_MON_RESOURCE_CALCULATOR, null, + ResourceCalculatorPlugin.class); + + this.resourceCalculatorPlugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); + + LOG.info(" Using ResourceCalculatorPlugin : " + + this.resourceCalculatorPlugin); } + /** + * Check if we should be monitoring. + * @return true if we can monitor the node resource utilization. + */ + private boolean isEnabled() { + if (resourceCalculatorPlugin == null) { + LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + + this.getClass().getName() + " is disabled."); + return false; + } + return true; + } + + /** + * Start the thread that does the node resource utilization monitoring. + */ + @Override + protected void serviceStart() throws Exception { + if (this.isEnabled()) { + this.monitoringThread.start(); + } + super.serviceStart(); + } + + /** + * Stop the thread that does the node resource utilization monitoring. + */ + @Override + protected void serviceStop() throws Exception { + if (this.isEnabled()) { + this.monitoringThread.interrupt(); + try { + this.monitoringThread.join(); + } catch (InterruptedException e) { + ; + } + } + super.serviceStop(); + } + + /** + * Thread that monitors the resource utilization of this node. + */ + private class MonitoringThread extends Thread { + /** + * Initialize the node resource monitoring thread. + */ + public MonitoringThread() { + super("Node Resource Monitor"); + } + + /** + * Periodically monitor the resource utilization of the node. + */ + @Override + public void run() { + while (true) { + // Get node utilization and save it into the health status + long memory = resourceCalculatorPlugin.getPhysicalMemorySize() - + resourceCalculatorPlugin.getAvailablePhysicalMemorySize(); + float cpu = resourceCalculatorPlugin.getCpuUsage(); + nodeUtilization = Resources.createResource( + (int)(memory >> 20), // B -> MB + (int)Math.ceil(cpu)); // CPU% -> VCores (1CPU at 100% is 1) + + try { + Thread.sleep(monitoringInterval); + } catch (InterruptedException e) { + LOG.warn(NodeResourceMonitorImpl.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } + + /** + * Get the resource utilization of the node. + * @return resource utilization of the node. + */ + @Override + public Resource getUtilization() { + return this.nodeUtilization; + } }