diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a7f485d..751c08c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -872,6 +872,15 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_NM_WEBAPP_HTTPS_PORT; + /** How often to monitor the node.*/ + public static final String NM_NODE_MON_INTERVAL_MS = NM_PREFIX + + "node-monitor.interval-ms"; + public final static int DEFAULT_NM_NODE_MON_INTERVAL_MS = 3000; + + /** Class that calculates nodes current resource utilization.*/ + public static final String NM_NODE_MON_RESOURCE_CALCULATOR = + NM_PREFIX + "node-monitor.resource-calculator.class"; + /** How often to monitor containers.*/ public final static String NM_CONTAINER_MON_INTERVAL_MS = NM_PREFIX + "container-monitor.interval-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 50d7165..c6debc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -58,6 +58,11 @@ message ResourceProto { optional int32 virtual_cores = 2; } +message ResourceUtilizationProto { + optional int32 memory = 1; + optional float virtual_cores = 2; +} + message ResourceOptionProto { optional ResourceProto resource = 1; optional int32 over_commit_timeout = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java index e646948..c2f5282 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -53,8 +53,7 @@ private void maybeInitBuilder() { } viaProto = false; } - - + @Override public int getMemory() { ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java index c8d9a6a..6dbae2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java @@ -48,20 +48,20 @@ public static int roundDown(int a, int b) { /** * Compute the number of containers which can be allocated given - * available and required resources. + * available and required resource utilizations. * - * @param available available resources - * @param required required resources + * @param available available resource utilizations + * @param required required resource utilizations * @return number of containers which can be allocated */ public abstract int computeAvailableContainers( ResourceUtilization available, ResourceUtilization required); /** - * Multiply resource r by factor by + * Multiply resource utilization r by factor by * and normalize up using step-factor stepFactor. * - * @param r resource to be multiplied + * @param r resource utilization to be multiplied * @param by multiplier * @param stepFactor factor by which to normalize up * @return resulting normalized resource @@ -70,10 +70,10 @@ public abstract ResourceUtilization multiplyAndNormalizeUp( ResourceUtilization r, double by, ResourceUtilization stepFactor); /** - * Multiply resource r by factor by + * Multiply resource utilization r by factor by * and normalize down using step-factor stepFactor. * - * @param r resource to be multiplied + * @param r resource utilization to be multiplied * @param by multiplier * @param stepFactor factor by which to normalize down * @return resulting normalized resource @@ -82,58 +82,69 @@ public abstract ResourceUtilization multiplyAndNormalizeDown( ResourceUtilization r, double by, ResourceUtilization stepFactor); /** - * Normalize resource r given the base + * Normalize resource utilization r given the base * minimumResourceUtilization and verify against max allowed * maximumResourceUtilization * - * @param r resource + * @param r resource utilization * @param minimumResourceUtilization step-factor - * @param maximumResourceUtilization the upper bound of the resource to be allocated + * @param maximumResourceUtilization the upper bound of the resource to be + * allocated * @return normalized resource */ - public ResourceUtilization normalize(ResourceUtilization r, ResourceUtilization minimumResourceUtilization, + public ResourceUtilization normalize(ResourceUtilization r, + ResourceUtilization minimumResourceUtilization, ResourceUtilization maximumResourceUtilization) { - return normalize(r, minimumResourceUtilization, maximumResourceUtilization, minimumResourceUtilization); + return normalize(r, minimumResourceUtilization, maximumResourceUtilization, + minimumResourceUtilization); } /** - * Normalize resource r given the base + * Normalize resource utilization r given the base * minimumResourceUtilization and verify against max allowed - * maximumResourceUtilization using a step factor for hte normalization. + * maximumResourceUtilization using a step factor for the + * normalization. * - * @param r resource + * @param r resource utilization * @param minimumResourceUtilization minimum value - * @param maximumResourceUtilization the upper bound of the resource to be allocated - * @param stepFactor the increment for resources to be allocated + * @param maximumResourceUtilization the upper bound of the resource to be + * allocated + * @param stepFactor the increment for resource utilizations to be allocated * @return normalized resource */ - public abstract ResourceUtilization normalize(ResourceUtilization r, ResourceUtilization minimumResourceUtilization, - ResourceUtilization maximumResourceUtilization, - ResourceUtilization stepFactor); + public abstract ResourceUtilization normalize(ResourceUtilization r, + ResourceUtilization minimumResourceUtilization, + ResourceUtilization maximumResourceUtilization, + ResourceUtilization stepFactor); /** - * Round-up resource r given factor stepFactor. + * Round-up resource utilization r given factor + * stepFactor. * - * @param r resource + * @param r resource utilization * @param stepFactor step-factor * @return rounded resource */ - public abstract ResourceUtilization roundUp(ResourceUtilization r, ResourceUtilization stepFactor); + public abstract ResourceUtilization roundUp(ResourceUtilization r, + ResourceUtilization stepFactor); /** - * Round-down resource r given factor stepFactor. + * Round-down resource utilization r given factor + * stepFactor. * - * @param r resource + * @param r resource utilization * @param stepFactor step-factor * @return rounded resource */ - public abstract ResourceUtilization roundDown(ResourceUtilization r, ResourceUtilization stepFactor); + public abstract ResourceUtilization roundDown(ResourceUtilization r, + ResourceUtilization stepFactor); /** - * Divide resource numerator by resource denominator - * using specified policy (domination, average, fairness etc.); hence overall - * clusterResourceUtilization is provided for context. + * Divide resource utilization numerator by resource utilization + * denominator using specified policy (domination, average, + * fairness etc.); hence overall clusterResourceUtilization is + * provided for context. * * @param clusterResourceUtilization cluster resources * @param numerator numerator @@ -142,13 +153,14 @@ public abstract ResourceUtilization normalize(ResourceUtilization r, ResourceUti * using specific policy */ public abstract float divide( - ResourceUtilization clusterResourceUtilization, ResourceUtilization numerator, ResourceUtilization denominator); + ResourceUtilization clusterResourceUtilization, + ResourceUtilization numerator, ResourceUtilization denominator); /** - * Determine if a resource is not suitable for use as a divisor + * Determine if a resource utilization is not suitable for use as a divisor * (will result in divide by 0, etc) * - * @param r resource + * @param r resource utilization * @return true if divisor is invalid (should not be used), false else */ public abstract boolean isInvalidDivisor(ResourceUtilization r); @@ -167,8 +179,9 @@ public abstract float divide( * * @param numerator numerator resource * @param denominator denominator - * @return resultant resource + * @return resultant resource utilization */ - public abstract ResourceUtilization divideAndCeil(ResourceUtilization numerator, int denominator); + public abstract ResourceUtilization divideAndCeil( + ResourceUtilization numerator, int denominator); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java index b21b880..ba37166 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.util.Records; /** @@ -90,4 +91,16 @@ public static NodeHealthStatus newInstance(boolean isNodeHealthy, @Private @Unstable public abstract void setLastHealthReportTime(long lastHealthReport); + + /** + * Get the resource utilization of the node. + * @return resource utilization of the node + */ + @Public + @Stable + public abstract ResourceUtilization getUtilization(); + + @Private + @Unstable + public abstract void setUtilization(ResourceUtilization utilization); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java index 75aa3d1..9d89fc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProtoOrBuilder; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -128,4 +131,31 @@ public void setLastHealthReportTime(long lastHealthReport) { this.builder.setLastHealthReportTime((lastHealthReport)); } + @Override + public ResourceUtilization getUtilization() { + NodeHealthStatusProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasUtilization()) { + return null; + } + return convertFromProtoFormat(p.getUtilization()); + } + + @Override + public void setUtilization(ResourceUtilization utilization) { + maybeInitBuilder(); + if (utilization == null) { + this.builder.clearUtilization(); + return; + } + this.builder.setUtilization(convertToProtoFormat(utilization)); + } + + private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { + return ((ResourceUtilizationPBImpl) r).getProto(); + } + + private ResourceUtilizationPBImpl convertFromProtoFormat(ResourceUtilizationProto p) { + return new ResourceUtilizationPBImpl(p); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 99149ac..503ad94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -47,6 +47,7 @@ message NodeHealthStatusProto { optional bool is_node_healthy = 1; optional string health_report = 2; optional int64 last_health_report_time = 3; + optional ResourceUtilizationProto utilization = 4; } message VersionProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 4a28c6f..e8d0929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -121,7 +121,7 @@ protected NodeLabelsProvider createNodeLabelsProvider( } protected NodeResourceMonitor createNodeResourceMonitor() { - return new NodeResourceMonitorImpl(); + return new NodeResourceMonitorImpl(context); } protected ContainerManagerImpl createContainerManager(Context context, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java index be13d22..bb9b3de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java @@ -19,7 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; public interface NodeResourceMonitor extends Service { - + public ResourceUtilization getUtilization(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java index ea82546..7aa53f5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java @@ -18,13 +18,119 @@ package org.apache.hadoop.yarn.server.nodemanager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.resource.ResourceUtilizations; public class NodeResourceMonitorImpl extends AbstractService implements NodeResourceMonitor { - public NodeResourceMonitorImpl() { + final static Log LOG = LogFactory + .getLog(NodeResourceMonitorImpl.class); + + private long monitoringInterval; + private MonitoringThread monitoringThread; + + private final Context context; + private ResourceCalculatorPlugin resourceCalculatorPlugin; + private Configuration conf; + + private ResourceUtilization nodeUtilization; + + public NodeResourceMonitorImpl(Context context) { super(NodeResourceMonitorImpl.class.getName()); + + this.context = context; + + this.monitoringThread = new MonitoringThread(); + } + + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + + this.monitoringInterval = + conf.getLong(YarnConfiguration.NM_NODE_MON_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_NODE_MON_INTERVAL_MS); + + Class clazz = + conf.getClass(YarnConfiguration.NM_NODE_MON_RESOURCE_CALCULATOR, null, + ResourceCalculatorPlugin.class); + this.resourceCalculatorPlugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); + LOG.info(" Using ResourceCalculatorPlugin : " + + this.resourceCalculatorPlugin); + } + + private boolean isEnabled() { + if (resourceCalculatorPlugin == null) { + LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + + this.getClass().getName() + " is disabled."); + return false; + } + return true; } + @Override + protected void serviceStart() throws Exception { + if (this.isEnabled()) { + this.monitoringThread.start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.isEnabled()) { + this.monitoringThread.interrupt(); + try { + this.monitoringThread.join(); + } catch (InterruptedException e) { + ; + } + } + super.serviceStop(); + } + + private class MonitoringThread extends Thread { + public MonitoringThread() { + super("Node Resource Monitor"); + } + + @Override + public void run() { + while (true) { + // Get node utilization and save it into the health status + long memory = resourceCalculatorPlugin.getPhysicalMemorySize() - + resourceCalculatorPlugin.getAvailablePhysicalMemorySize(); + float cpu = resourceCalculatorPlugin.getCpuUsage(); + nodeUtilization = ResourceUtilizations.createResourceUtilization( + (int)(memory >> 20), // B -> MB + cpu); // CPU% -> VCores (1CPU at 100% is 1.0) + // Store this info in the health status to send it to the RM + context.getNodeHealthStatus().setUtilization(nodeUtilization); + + LOG.debug("Node utilization: " + + context.getNodeHealthStatus().getUtilization() + " CPU=" + cpu + + " Memory=" + memory); + + try { + Thread.sleep(monitoringInterval); + } catch (InterruptedException e) { + LOG.warn(NodeResourceMonitorImpl.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } + + @Override + public ResourceUtilization getUtilization() { + return this.nodeUtilization; + } }