diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java new file mode 100644 index 0000000..b684097 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + *
ResourceUtilization models a set of computer resource utilizations
+ * in the cluster.
Currently it models both memory and CPU.
+ * + *The unit for memory is megabytes. CPU is modeled with virtual cores + * (vcores), a unit for expressing parallelism. A node's capacity should + * be configured with virtual cores equal to its number of physical cores. A + * container should be requested with the number of cores it can saturate, i.e. + * the average number of threads it expects to have runnable at a time.
+ * + *Typically, applications request ResourceUtilization of suitable
+ * capability to run their component tasks.
available and required resource utilizations.
+ *
+ * @param available available resource utilizations
+ * @param required required resource utilizations
+ * @return number of containers which can be allocated
+ */
+ public abstract int computeAvailableContainers(
+ ResourceUtilization available, ResourceUtilization required);
+
+ /**
+ * Multiply resource utilization r by factor by
+ * and normalize up using step-factor stepFactor.
+ *
+ * @param r resource utilization to be multiplied
+ * @param by multiplier
+ * @param stepFactor factor by which to normalize up
+ * @return resulting normalized resource
+ */
+ public abstract ResourceUtilization multiplyAndNormalizeUp(
+ ResourceUtilization r, double by, ResourceUtilization stepFactor);
+
+ /**
+ * Multiply resource utilization r by factor by
+ * and normalize down using step-factor stepFactor.
+ *
+ * @param r resource utilization to be multiplied
+ * @param by multiplier
+ * @param stepFactor factor by which to normalize down
+ * @return resulting normalized resource
+ */
+ public abstract ResourceUtilization multiplyAndNormalizeDown(
+ ResourceUtilization r, double by, ResourceUtilization stepFactor);
+
+ /**
+ * Normalize resource utilization r given the base
+ * minimumResourceUtilization and verify against max allowed
+ * maximumResourceUtilization
+ *
+ * @param r resource utilization
+ * @param minimumResourceUtilization step-factor
+ * @param maximumResourceUtilization the upper bound of the resource to be
+ * allocated
+ * @return normalized resource
+ */
+ public ResourceUtilization normalize(ResourceUtilization r,
+ ResourceUtilization minimumResourceUtilization,
+ ResourceUtilization maximumResourceUtilization) {
+ return normalize(r, minimumResourceUtilization, maximumResourceUtilization,
+ minimumResourceUtilization);
+ }
+
+ /**
+ * Normalize resource utilization r given the base
+ * minimumResourceUtilization and verify against max allowed
+ * maximumResourceUtilization using a step factor for the
+ * normalization.
+ *
+ * @param r resource utilization
+ * @param minimumResourceUtilization minimum value
+ * @param maximumResourceUtilization the upper bound of the resource to be
+ * allocated
+ * @param stepFactor the increment for resource utilizations to be allocated
+ * @return normalized resource
+ */
+ public abstract ResourceUtilization normalize(ResourceUtilization r,
+ ResourceUtilization minimumResourceUtilization,
+ ResourceUtilization maximumResourceUtilization,
+ ResourceUtilization stepFactor);
+
+ /**
+ * Round-up resource utilization r given factor
+ * stepFactor.
+ *
+ * @param r resource utilization
+ * @param stepFactor step-factor
+ * @return rounded resource
+ */
+ public abstract ResourceUtilization roundUp(ResourceUtilization r,
+ ResourceUtilization stepFactor);
+
+ /**
+ * Round-down resource utilization r given factor
+ * stepFactor.
+ *
+ * @param r resource utilization
+ * @param stepFactor step-factor
+ * @return rounded resource
+ */
+ public abstract ResourceUtilization roundDown(ResourceUtilization r,
+ ResourceUtilization stepFactor);
+
+ /**
+ * Divide resource utilization numerator by resource utilization
+ * denominator using specified policy (domination, average,
+ * fairness etc.); hence overall clusterResourceUtilization is
+ * provided for context.
+ *
+ * @param clusterResourceUtilization cluster resources
+ * @param numerator numerator
+ * @param denominator denominator
+ * @return numerator/denominator
+ * using specific policy
+ */
+ public abstract float divide(
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization numerator, ResourceUtilization denominator);
+
+ /**
+ * Determine if a resource utilization is not suitable for use as a divisor
+ * (will result in divide by 0, etc)
+ *
+ * @param r resource utilization
+ * @return true if divisor is invalid (should not be used), false else
+ */
+ public abstract boolean isInvalidDivisor(ResourceUtilization r);
+
+ /**
+ * Ratio of resource a to resource b.
+ *
+ * @param a resource
+ * @param b resource
+ * @return ratio of resource a to resource b
+ */
+ public abstract float ratio(ResourceUtilization a, ResourceUtilization b);
+
+ /**
+ * Divide-and-ceil numerator by denominator.
+ *
+ * @param numerator numerator resource
+ * @param denominator denominator
+ * @return resultant resource utilization
+ */
+ public abstract ResourceUtilization divideAndCeil(
+ ResourceUtilization numerator, int denominator);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizations.java
new file mode 100644
index 0000000..3fa33eb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizations.java
@@ -0,0 +1,266 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.util.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.util.Records;
+
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+@Unstable
+public class ResourceUtilizations {
+
+ // Java doesn't have const :(
+ private static final ResourceUtilization NONE = new ResourceUtilization() {
+
+ @Override
+ public int getMemory() {
+ return 0;
+ }
+
+ @Override
+ public void setMemory(int memory) {
+ throw new RuntimeException("NONE cannot be modified!");
+ }
+
+ @Override
+ public float getVirtualCores() {
+ return 0.0f;
+ }
+
+ @Override
+ public void setVirtualCores(float cores) {
+ throw new RuntimeException("NONE cannot be modified!");
+ }
+
+ @Override
+ public int compareTo(ResourceUtilization o) {
+ int diff = 0 - o.getMemory();
+ if (diff == 0) {
+ diff = Float.compare(0, o.getVirtualCores());
+ }
+ return diff;
+ }
+ };
+
+ private static final ResourceUtilization UNBOUNDED = new ResourceUtilization() {
+
+ @Override
+ public int getMemory() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void setMemory(int memory) {
+ throw new RuntimeException("NONE cannot be modified!");
+ }
+
+ @Override
+ public float getVirtualCores() {
+ return Float.MAX_VALUE;
+ }
+
+ @Override
+ public void setVirtualCores(float cores) {
+ throw new RuntimeException("NONE cannot be modified!");
+ }
+
+ @Override
+ public int compareTo(ResourceUtilization o) {
+ int diff = 0 - o.getMemory();
+ if (diff == 0) {
+ diff = Float.compare(0, o.getVirtualCores());
+ }
+ return diff;
+ }
+ };
+
+ public static ResourceUtilization createResourceUtilization(int memory) {
+ return createResourceUtilization(memory, (memory > 0) ? 1 : 0);
+ }
+
+ public static ResourceUtilization createResourceUtilization(int memory, float cores) {
+ ResourceUtilization resourceUtilization = Records.newRecord(ResourceUtilization.class);
+ resourceUtilization.setMemory(memory);
+ resourceUtilization.setVirtualCores(cores);
+ return resourceUtilization;
+ }
+
+ public static ResourceUtilization fromResource(Resource resource) {
+ return ResourceUtilizations.createResourceUtilization(resource.getMemory(), 1.0f*resource.getVirtualCores());
+ }
+
+ public static ResourceUtilization none() {
+ return NONE;
+ }
+
+ public static ResourceUtilization unbounded() {
+ return UNBOUNDED;
+ }
+
+ public static ResourceUtilization clone(ResourceUtilization res) {
+ return createResourceUtilization(res.getMemory(), res.getVirtualCores());
+ }
+
+ public static ResourceUtilization addTo(ResourceUtilization lhs, ResourceUtilization rhs) {
+ lhs.setMemory(lhs.getMemory() + rhs.getMemory());
+ lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
+ return lhs;
+ }
+
+ public static ResourceUtilization add(ResourceUtilization lhs, ResourceUtilization rhs) {
+ return addTo(clone(lhs), rhs);
+ }
+
+ public static ResourceUtilization subtractFrom(ResourceUtilization lhs, ResourceUtilization rhs) {
+ lhs.setMemory(lhs.getMemory() - rhs.getMemory());
+ lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
+ return lhs;
+ }
+
+ public static ResourceUtilization subtract(ResourceUtilization lhs, ResourceUtilization rhs) {
+ return subtractFrom(clone(lhs), rhs);
+ }
+
+ public static ResourceUtilization negate(ResourceUtilization resource) {
+ return subtract(NONE, resource);
+ }
+
+ public static ResourceUtilization multiplyTo(ResourceUtilization lhs, double by) {
+ lhs.setMemory((int)(lhs.getMemory() * by));
+ lhs.setVirtualCores((float)(lhs.getVirtualCores() * by));
+ return lhs;
+ }
+
+ public static ResourceUtilization multiply(ResourceUtilization lhs, double by) {
+ return multiplyTo(clone(lhs), by);
+ }
+
+ public static ResourceUtilization multiplyAndNormalizeUp(
+ ResourceUtilizationCalculator calculator, ResourceUtilization lhs, double by, ResourceUtilization factor) {
+ return calculator.multiplyAndNormalizeUp(lhs, by, factor);
+ }
+
+ public static ResourceUtilization multiplyAndNormalizeDown(
+ ResourceUtilizationCalculator calculator, ResourceUtilization lhs, double by, ResourceUtilization factor) {
+ return calculator.multiplyAndNormalizeDown(lhs, by, factor);
+ }
+
+ public static ResourceUtilization multiplyAndRoundDown(ResourceUtilization lhs, double by) {
+ ResourceUtilization out = clone(lhs);
+ out.setMemory((int)(lhs.getMemory() * by));
+ out.setVirtualCores((float)(lhs.getVirtualCores() * by));
+ return out;
+ }
+
+ public static ResourceUtilization normalize(
+ ResourceUtilizationCalculator calculator, ResourceUtilization lhs, ResourceUtilization min,
+ ResourceUtilization max, ResourceUtilization increment) {
+ return calculator.normalize(lhs, min, max, increment);
+ }
+
+ public static ResourceUtilization roundUp(
+ ResourceUtilizationCalculator calculator, ResourceUtilization lhs, ResourceUtilization factor) {
+ return calculator.roundUp(lhs, factor);
+ }
+
+ public static ResourceUtilization roundDown(
+ ResourceUtilizationCalculator calculator, ResourceUtilization lhs, ResourceUtilization factor) {
+ return calculator.roundDown(lhs, factor);
+ }
+
+ public static boolean isInvalidDivisor(
+ ResourceUtilizationCalculator resourceCalculator, ResourceUtilization divisor) {
+ return resourceCalculator.isInvalidDivisor(divisor);
+ }
+
+ public static float ratio(
+ ResourceUtilizationCalculator resourceCalculator, ResourceUtilization lhs, ResourceUtilization rhs) {
+ return resourceCalculator.ratio(lhs, rhs);
+ }
+
+ public static float divide(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization, ResourceUtilization lhs, ResourceUtilization rhs) {
+ return resourceCalculator.divide(clusterResourceUtilization, lhs, rhs);
+ }
+
+ public static ResourceUtilization divideAndCeil(
+ ResourceUtilizationCalculator resourceCalculator, ResourceUtilization lhs, int rhs) {
+ return resourceCalculator.divideAndCeil(lhs, rhs);
+ }
+
+ public static boolean equals(ResourceUtilization lhs, ResourceUtilization rhs) {
+ return lhs.equals(rhs);
+ }
+
+ public static boolean lessThan(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization lhs, ResourceUtilization rhs) {
+ return (resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) < 0);
+ }
+
+ public static boolean lessThanOrEqual(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization lhs, ResourceUtilization rhs) {
+ return (resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) <= 0);
+ }
+
+ public static boolean greaterThan(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization lhs, ResourceUtilization rhs) {
+ return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) > 0;
+ }
+
+ public static boolean greaterThanOrEqual(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization lhs, ResourceUtilization rhs) {
+ return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) >= 0;
+ }
+
+ public static ResourceUtilization min(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization lhs, ResourceUtilization rhs) {
+ return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) <= 0 ? lhs : rhs;
+ }
+
+ public static ResourceUtilization max(
+ ResourceUtilizationCalculator resourceCalculator,
+ ResourceUtilization clusterResourceUtilization,
+ ResourceUtilization lhs, ResourceUtilization rhs) {
+ return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) >= 0 ? lhs : rhs;
+ }
+
+ public static boolean fitsIn(ResourceUtilization smaller, ResourceUtilization bigger) {
+ return smaller.getMemory() <= bigger.getMemory() &&
+ smaller.getVirtualCores() <= bigger.getVirtualCores();
+ }
+
+ public static ResourceUtilization componentwiseMin(ResourceUtilization lhs, ResourceUtilization rhs) {
+ return createResourceUtilization(Math.min(lhs.getMemory(), rhs.getMemory()),
+ Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
index b21b880..1e99be8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -90,4 +91,16 @@ public static NodeHealthStatus newInstance(boolean isNodeHealthy,
@Private
@Unstable
public abstract void setLastHealthReportTime(long lastHealthReport);
+
+ /**
+ * Get the resource utilization of the node.
+ * @return resource utilization of the node
+ */
+ @Public
+ @Stable
+ public abstract ResourceUtilization getUtilization();
+
+ @Private
+ @Unstable
+ public abstract void setUtilization(ResourceUtilization utilization);
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java
index 75aa3d1..fd044af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.api.records.impl.pb;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -128,4 +131,31 @@ public void setLastHealthReportTime(long lastHealthReport) {
this.builder.setLastHealthReportTime((lastHealthReport));
}
+ @Override
+ public ResourceUtilization getUtilization() {
+ NodeHealthStatusProtoOrBuilder p =
+ this.viaProto ? this.proto : this.builder;
+ if (!p.hasUtilization()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getUtilization());
+ }
+
+ @Override
+ public void setUtilization(ResourceUtilization utilization) {
+ maybeInitBuilder();
+ if (utilization == null) {
+ this.builder.clearUtilization();
+ return;
+ }
+ this.builder.setUtilization(convertToProtoFormat(utilization));
+ }
+
+ private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) {
+ return ((ResourceUtilizationPBImpl) r).getProto();
+ }
+
+ private ResourceUtilizationPBImpl convertFromProtoFormat(ResourceUtilizationProto p) {
+ return new ResourceUtilizationPBImpl(p);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 99149ac..503ad94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -47,6 +47,7 @@ message NodeHealthStatusProto {
optional bool is_node_healthy = 1;
optional string health_report = 2;
optional int64 last_health_report_time = 3;
+ optional ResourceUtilizationProto utilization = 4;
}
message VersionProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 4a28c6f..e8d0929 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -121,7 +121,7 @@ protected NodeLabelsProvider createNodeLabelsProvider(
}
protected NodeResourceMonitor createNodeResourceMonitor() {
- return new NodeResourceMonitorImpl();
+ return new NodeResourceMonitorImpl(context);
}
protected ContainerManagerImpl createContainerManager(Context context,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
index be13d22..bb9b3de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java
@@ -19,7 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
public interface NodeResourceMonitor extends Service {
-
+ public ResourceUtilization getUtilization();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
index ea82546..9f7ecaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java
@@ -18,13 +18,119 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.resource.ResourceUtilizations;
public class NodeResourceMonitorImpl extends AbstractService implements
NodeResourceMonitor {
- public NodeResourceMonitorImpl() {
+ final static Log LOG = LogFactory
+ .getLog(NodeResourceMonitorImpl.class);
+
+ private long monitoringInterval;
+ private MonitoringThread monitoringThread;
+
+ private final Context context;
+ private ResourceCalculatorPlugin resourceCalculatorPlugin;
+ private Configuration conf;
+
+ private ResourceUtilization nodeUtilization;
+
+ public NodeResourceMonitorImpl(Context context) {
super(NodeResourceMonitorImpl.class.getName());
+
+ this.context = context;
+
+ this.monitoringThread = new MonitoringThread();
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ this.conf = conf;
+
+ this.monitoringInterval =
+ conf.getLong(YarnConfiguration.NM_NODE_MON_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_NM_NODE_MON_INTERVAL_MS);
+
+ Class extends ResourceCalculatorPlugin> clazz =
+ conf.getClass(YarnConfiguration.NM_NODE_MON_RESOURCE_CALCULATOR, null,
+ ResourceCalculatorPlugin.class);
+ this.resourceCalculatorPlugin =
+ ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+ LOG.info(" Using ResourceCalculatorPlugin : "
+ + this.resourceCalculatorPlugin);
+ }
+
+ private boolean isEnabled() {
+ if (resourceCalculatorPlugin == null) {
+ LOG.info("ResourceCalculatorPlugin is unavailable on this system. "
+ + this.getClass().getName() + " is disabled.");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (this.isEnabled()) {
+ this.monitoringThread.start();
+ }
+ super.serviceStart();
}
+ @Override
+ protected void serviceStop() throws Exception {
+ if (this.isEnabled()) {
+ this.monitoringThread.interrupt();
+ try {
+ this.monitoringThread.join();
+ } catch (InterruptedException e) {
+ ;
+ }
+ }
+ super.serviceStop();
+ }
+
+ private class MonitoringThread extends Thread {
+ public MonitoringThread() {
+ super("Node Resource Monitor");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ // Get node utilization and save it into the health status
+ long memory = resourceCalculatorPlugin.getPhysicalMemorySize() -
+ resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
+ float cpu = resourceCalculatorPlugin.getCpuUsage();
+ nodeUtilization = ResourceUtilizations.createResourceUtilization(
+ (int)(memory >> 20), // B -> MB
+ cpu); // CPU% -> VCores (1CPU at 100% is 1.0)
+ // Store this info in the health status to send it to the RM
+ context.getNodeHealthStatus().setUtilization(nodeUtilization);
+
+ LOG.debug("Node utilization: " +
+ context.getNodeHealthStatus().getUtilization() + " CPU=" + cpu +
+ " Memory=" + memory);
+
+ try {
+ Thread.sleep(monitoringInterval);
+ } catch (InterruptedException e) {
+ LOG.warn(NodeResourceMonitorImpl.class.getName()
+ + " is interrupted. Exiting.");
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public ResourceUtilization getUtilization() {
+ return this.nodeUtilization;
+ }
}