diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a7f485d..751c08c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -872,6 +872,15 @@ private static void addDeprecatedKeys() {
public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:"
+ DEFAULT_NM_WEBAPP_HTTPS_PORT;
+ /** How often to monitor the node.*/
+ public static final String NM_NODE_MON_INTERVAL_MS = NM_PREFIX +
+ "node-monitor.interval-ms";
+ public final static int DEFAULT_NM_NODE_MON_INTERVAL_MS = 3000;
+
+ /** Class that calculates nodes current resource utilization.*/
+ public static final String NM_NODE_MON_RESOURCE_CALCULATOR =
+ NM_PREFIX + "node-monitor.resource-calculator.class";
+
/** How often to monitor containers.*/
public final static String NM_CONTAINER_MON_INTERVAL_MS =
NM_PREFIX + "container-monitor.interval-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 50d7165..c6debc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -58,6 +58,11 @@ message ResourceProto {
optional int32 virtual_cores = 2;
}
+message ResourceUtilizationProto {
+ optional int32 memory = 1;
+ optional float virtual_cores = 2;
+}
+
message ResourceOptionProto {
optional ResourceProto resource = 1;
optional int32 over_commit_timeout = 2;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java
index e646948..c2f5282 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java
@@ -53,8 +53,7 @@ private void maybeInitBuilder() {
}
viaProto = false;
}
-
-
+
@Override
public int getMemory() {
ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java
index c8d9a6a..6dbae2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java
@@ -48,20 +48,20 @@ public static int roundDown(int a, int b) {
/**
* Compute the number of containers which can be allocated given
- * available and required resources.
+ * available and required resource utilizations.
*
- * @param available available resources
- * @param required required resources
+ * @param available available resource utilizations
+ * @param required required resource utilizations
* @return number of containers which can be allocated
*/
public abstract int computeAvailableContainers(
ResourceUtilization available, ResourceUtilization required);
/**
- * Multiply resource r by factor by
+ * Multiply resource utilization r by factor by
* and normalize up using step-factor stepFactor.
*
- * @param r resource to be multiplied
+ * @param r resource utilization to be multiplied
* @param by multiplier
* @param stepFactor factor by which to normalize up
* @return resulting normalized resource
@@ -70,10 +70,10 @@ public abstract ResourceUtilization multiplyAndNormalizeUp(
ResourceUtilization r, double by, ResourceUtilization stepFactor);
/**
- * Multiply resource r by factor by
+ * Multiply resource utilization r by factor by
* and normalize down using step-factor stepFactor.
*
- * @param r resource to be multiplied
+ * @param r resource utilization to be multiplied
* @param by multiplier
* @param stepFactor factor by which to normalize down
* @return resulting normalized resource
@@ -82,58 +82,69 @@ public abstract ResourceUtilization multiplyAndNormalizeDown(
ResourceUtilization r, double by, ResourceUtilization stepFactor);
/**
- * Normalize resource r given the base
+ * Normalize resource utilization r given the base
* minimumResourceUtilization and verify against max allowed
* maximumResourceUtilization
*
- * @param r resource
+ * @param r resource utilization
* @param minimumResourceUtilization step-factor
- * @param maximumResourceUtilization the upper bound of the resource to be allocated
+ * @param maximumResourceUtilization the upper bound of the resource to be
+ * allocated
* @return normalized resource
*/
- public ResourceUtilization normalize(ResourceUtilization r, ResourceUtilization minimumResourceUtilization,
+ public ResourceUtilization normalize(ResourceUtilization r,
+ ResourceUtilization minimumResourceUtilization,
ResourceUtilization maximumResourceUtilization) {
- return normalize(r, minimumResourceUtilization, maximumResourceUtilization, minimumResourceUtilization);
+ return normalize(r, minimumResourceUtilization, maximumResourceUtilization,
+ minimumResourceUtilization);
}
/**
- * Normalize resource r given the base
+ * Normalize resource utilization r given the base
* minimumResourceUtilization and verify against max allowed
- * maximumResourceUtilization using a step factor for hte normalization.
+ * maximumResourceUtilization using a step factor for the
+ * normalization.
*
- * @param r resource
+ * @param r resource utilization
* @param minimumResourceUtilization minimum value
- * @param maximumResourceUtilization the upper bound of the resource to be allocated
- * @param stepFactor the increment for resources to be allocated
+ * @param maximumResourceUtilization the upper bound of the resource to be
+ * allocated
+ * @param stepFactor the increment for resource utilizations to be allocated
* @return normalized resource
*/
- public abstract ResourceUtilization normalize(ResourceUtilization r, ResourceUtilization minimumResourceUtilization,
- ResourceUtilization maximumResourceUtilization,
- ResourceUtilization stepFactor);
+ public abstract ResourceUtilization normalize(ResourceUtilization r,
+ ResourceUtilization minimumResourceUtilization,
+ ResourceUtilization maximumResourceUtilization,
+ ResourceUtilization stepFactor);
/**
- * Round-up resource r given factor stepFactor.
+ * Round-up resource utilization r given factor
+ * stepFactor.
*
- * @param r resource
+ * @param r resource utilization
* @param stepFactor step-factor
* @return rounded resource
*/
- public abstract ResourceUtilization roundUp(ResourceUtilization r, ResourceUtilization stepFactor);
+ public abstract ResourceUtilization roundUp(ResourceUtilization r,
+ ResourceUtilization stepFactor);
/**
- * Round-down resource r given factor stepFactor.
+ * Round-down resource utilization r given factor
+ * stepFactor.
*
- * @param r resource
+ * @param r resource utilization
* @param stepFactor step-factor
* @return rounded resource
*/
- public abstract ResourceUtilization roundDown(ResourceUtilization r, ResourceUtilization stepFactor);
+ public abstract ResourceUtilization roundDown(ResourceUtilization r,
+ ResourceUtilization stepFactor);
/**
- * Divide resource numerator by resource denominator
- * using specified policy (domination, average, fairness etc.); hence overall
- * clusterResourceUtilization is provided for context.
+ * Divide resource utilization numerator by resource utilization
+ * denominator using specified policy (domination, average,
+ * fairness etc.); hence overall clusterResourceUtilization is
+ * provided for context.
*
* @param clusterResourceUtilization cluster resources
* @param numerator numerator
@@ -142,13 +153,14 @@ public abstract ResourceUtilization normalize(ResourceUtilization r, ResourceUti
* using specific policy
*/
public abstract float divide(
- ResourceUtilization clusterResourceUtilization, ResourceUtilization numerator, ResourceUtilization denominator);
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization numerator, ResourceUtilization denominator);
/**
- * Determine if a resource is not suitable for use as a divisor
+ * Determine if a resource utilization is not suitable for use as a divisor
* (will result in divide by 0, etc)
*
- * @param r resource
+ * @param r resource utilization
* @return true if divisor is invalid (should not be used), false else
*/
public abstract boolean isInvalidDivisor(ResourceUtilization r);
@@ -167,8 +179,9 @@ public abstract float divide(
*
* @param numerator numerator resource
* @param denominator denominator
- * @return resultant resource
+ * @return resultant resource utilization
*/
- public abstract ResourceUtilization divideAndCeil(ResourceUtilization numerator, int denominator);
+ public abstract ResourceUtilization divideAndCeil(
+ ResourceUtilization numerator, int denominator);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
index b21b880..ba37166 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -90,4 +91,16 @@ public static NodeHealthStatus newInstance(boolean isNodeHealthy,
@Private
@Unstable
public abstract void setLastHealthReportTime(long lastHealthReport);
+
+ /**
+ * Get the resource utilization of the node.
+ * @return resource utilization of the node
+ */
+ @Public
+ @Stable
+ public abstract ResourceUtilization getUtilization();
+
+ @Private
+ @Unstable
+ public abstract void setUtilization(ResourceUtilization utilization);
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java
index 75aa3d1..9d89fc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.api.records.impl.pb;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -128,4 +131,31 @@ public void setLastHealthReportTime(long lastHealthReport) {
this.builder.setLastHealthReportTime((lastHealthReport));
}
+ @Override
+ public ResourceUtilization getUtilization() {
+ NodeHealthStatusProtoOrBuilder p =
+ this.viaProto ? this.proto : this.builder;
+ if (!p.hasUtilization()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getUtilization());
+ }
+
+ @Override
+ public void setUtilization(ResourceUtilization utilization) {
+ maybeInitBuilder();
+ if (utilization == null) {
+ this.builder.clearUtilization();
+ return;
+ }
+ this.builder.setUtilization(convertToProtoFormat(utilization));
+ }
+
+ private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
+ return ((ResourceUtilizationPBImpl) r).getProto();
+ }
+
+ private ResourceUtilizationPBImpl convertFromProtoFormat(ResourceUtilizationProto p) {
+ return new ResourceUtilizationPBImpl(p);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 99149ac..503ad94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -47,6 +47,7 @@ message NodeHealthStatusProto {
optional bool is_node_healthy = 1;
optional string health_report = 2;
optional int64 last_health_report_time = 3;
+ optional ResourceUtilizationProto utilization = 4;
}
message VersionProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 4a28c6f..e8d0929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -121,7 +121,7 @@ protected NodeLabelsProvider createNodeLabelsProvider(
}
protected NodeResourceMonitor createNodeResourceMonitor() {
- return new NodeResourceMonitorImpl();
+ return new NodeResourceMonitorImpl(context);
}
protected ContainerManagerImpl createContainerManager(Context context,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
index be13d22..bb9b3de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
@@ -19,7 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
public interface NodeResourceMonitor extends Service {
-
+ public ResourceUtilization getUtilization();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
index ea82546..7aa53f5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
@@ -18,13 +18,119 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.resource.ResourceUtilizations;
public class NodeResourceMonitorImpl extends AbstractService implements
NodeResourceMonitor {
- public NodeResourceMonitorImpl() {
+ final static Log LOG = LogFactory
+ .getLog(NodeResourceMonitorImpl.class);
+
+ private long monitoringInterval;
+ private MonitoringThread monitoringThread;
+
+ private final Context context;
+ private ResourceCalculatorPlugin resourceCalculatorPlugin;
+ private Configuration conf;
+
+ private ResourceUtilization nodeUtilization;
+
+ public NodeResourceMonitorImpl(Context context) {
super(NodeResourceMonitorImpl.class.getName());
+
+ this.context = context;
+
+ this.monitoringThread = new MonitoringThread();
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+
+ this.monitoringInterval =
+ conf.getLong(YarnConfiguration.NM_NODE_MON_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_NM_NODE_MON_INTERVAL_MS);
+
+ Class extends ResourceCalculatorPlugin> clazz =
+ conf.getClass(YarnConfiguration.NM_NODE_MON_RESOURCE_CALCULATOR, null,
+ ResourceCalculatorPlugin.class);
+ this.resourceCalculatorPlugin =
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+ LOG.info(" Using ResourceCalculatorPlugin : "
+ + this.resourceCalculatorPlugin);
+ }
+
+ private boolean isEnabled() {
+ if (resourceCalculatorPlugin == null) {
+ LOG.info("ResourceCalculatorPlugin is unavailable on this system. "
+ + this.getClass().getName() + " is disabled.");
+ return false;
+ }
+ return true;
}
+ @Override
+ protected void serviceStart() throws Exception {
+ if (this.isEnabled()) {
+ this.monitoringThread.start();
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.isEnabled()) {
+ this.monitoringThread.interrupt();
+ try {
+ this.monitoringThread.join();
+ } catch (InterruptedException e) {
+ ;
+ }
+ }
+ super.serviceStop();
+ }
+
+ private class MonitoringThread extends Thread {
+ public MonitoringThread() {
+ super("Node Resource Monitor");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ // Get node utilization and save it into the health status
+ long memory = resourceCalculatorPlugin.getPhysicalMemorySize() -
+ resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
+ float cpu = resourceCalculatorPlugin.getCpuUsage();
+ nodeUtilization = ResourceUtilizations.createResourceUtilization(
+ (int)(memory >> 20), // B -> MB
+ cpu); // CPU% -> VCores (1CPU at 100% is 1.0)
+ // Store this info in the health status to send it to the RM
+ context.getNodeHealthStatus().setUtilization(nodeUtilization);
+
+ LOG.debug("Node utilization: " +
+ context.getNodeHealthStatus().getUtilization() + " CPU=" + cpu +
+ " Memory=" + memory);
+
+ try {
+ Thread.sleep(monitoringInterval);
+ } catch (InterruptedException e) {
+ LOG.warn(NodeResourceMonitorImpl.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public ResourceUtilization getUtilization() {
+ return this.nodeUtilization;
+ }
}