diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java new file mode 100644 index 0000000..8b8e76e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; + +/** + *

ResourceUtilization models a set of computer resource utilizations + * in the cluster.

+ * + *

Currently it models both memory and CPU.

+ * + *

The unit for memory is megabytes. CPU is modeled with virtual cores + * (vcores), a unit for expressing parallelism. A node's capacity should + * be configured with virtual cores equal to its number of physical cores. A + * container should be requested with the number of cores it can saturate, i.e. + * the average number of threads it expects to have runnable at a time.

+ * + *

Typically, applications request ResourceUtilization of suitable + * capability to run their component tasks.

+ * + * @see ResourceRequest + * @see ApplicationMasterProtocol#allocate(org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest) + */ +@Public +@Stable +public abstract class ResourceUtilization implements Comparable { + + @Public + @Stable + public static ResourceUtilization newInstance(int memory, float vCores) { + ResourceUtilization resourceUtilization = Records.newRecord(ResourceUtilization.class); + resourceUtilization.setMemory(memory); + resourceUtilization.setVirtualCores(vCores); + return resourceUtilization; + } + + /** + * Get memory of the resource utilization. + * @return memory of the resource utilization + */ + @Public + @Stable + public abstract int getMemory(); + + /** + * Set memory of the resource utilization. + * @param memory memory of the resource utilization + */ + @Public + @Stable + public abstract void setMemory(int memory); + + + /** + * Get number of virtual cpu cores of the resource utilization. + * + * Virtual cores are a unit for expressing CPU parallelism. A node's capacity + * should be configured with virtual cores equal to its number of physical cores. + * A container should be requested with the number of cores it can saturate, i.e. + * the average number of threads it expects to have runnable at a time. + * + * @return num of virtual cpu cores of the resource utilization + */ + @Public + @Evolving + public abstract float getVirtualCores(); + + /** + * Set number of virtual cpu cores of the resource utilization. + * + * Virtual cores are a unit for expressing CPU parallelism. A node's capacity + * should be configured with virtual cores equal to its number of physical cores. + * A container should be requested with the number of cores it can saturate, i.e. + * the average number of threads it expects to have runnable at a time. + * + * @param vCores number of virtual cpu cores of the resource utilization + */ + @Public + @Evolving + public abstract void setVirtualCores(float vCores); + + @Override + public int hashCode() { + final int prime = 263167; + int result = 3571; + result = 939769357 + getMemory(); // prime * result = 939769357 initially + result = prime * result + Float.valueOf(getVirtualCores()).hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof ResourceUtilization)) + return false; + ResourceUtilization other = (ResourceUtilization) obj; + if (getMemory() != other.getMemory() || + getVirtualCores() != other.getVirtualCores()) { + return false; + } + return true; + } + + @Override + public String toString() { + return ""; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a7f485d..751c08c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -872,6 +872,15 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_NM_WEBAPP_HTTPS_ADDRESS = "0.0.0.0:" + DEFAULT_NM_WEBAPP_HTTPS_PORT; + /** How often to monitor the node.*/ + public static final String NM_NODE_MON_INTERVAL_MS = NM_PREFIX + + "node-monitor.interval-ms"; + public final static int DEFAULT_NM_NODE_MON_INTERVAL_MS = 3000; + + /** Class that calculates nodes current resource utilization.*/ + public static final String NM_NODE_MON_RESOURCE_CALCULATOR = + NM_PREFIX + "node-monitor.resource-calculator.class"; + /** How often to monitor containers.*/ public final static String NM_CONTAINER_MON_INTERVAL_MS = NM_PREFIX + "container-monitor.interval-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index c45081a..9b2321a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -58,6 +58,11 @@ message ResourceProto { optional int32 virtual_cores = 2; } +message ResourceUtilizationProto { + optional int32 memory = 1; + optional float virtual_cores = 2; +} + message ResourceOptionProto { optional ResourceProto resource = 1; optional int32 over_commit_timeout = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java new file mode 100644 index 0000000..c2f5282 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceUtilizationPBImpl.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProtoOrBuilder; + +@Private +@Unstable +public class ResourceUtilizationPBImpl extends ResourceUtilization { + ResourceUtilizationProto proto = ResourceUtilizationProto.getDefaultInstance(); + ResourceUtilizationProto.Builder builder = null; + boolean viaProto = false; + + public ResourceUtilizationPBImpl() { + builder = ResourceUtilizationProto.newBuilder(); + } + + public ResourceUtilizationPBImpl(ResourceUtilizationProto proto) { + this.proto = proto; + viaProto = true; + } + + public ResourceUtilizationProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceUtilizationProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getMemory() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return (p.getMemory()); + } + + @Override + public void setMemory(int memory) { + maybeInitBuilder(); + builder.setMemory((memory)); + } + + @Override + public float getVirtualCores() { + ResourceUtilizationProtoOrBuilder p = viaProto ? proto : builder; + return (p.getVirtualCores()); + } + + @Override + public void setVirtualCores(float vCores) { + maybeInitBuilder(); + builder.setVirtualCores((vCores)); + } + + @Override + public int compareTo(ResourceUtilization other) { + int diff = this.getMemory() - other.getMemory(); + if (diff == 0) { + diff = Float.compare(this.getVirtualCores(), other.getVirtualCores()); + } + return diff; + } + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java new file mode 100644 index 0000000..6dbae2b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizationCalculator.java @@ -0,0 +1,187 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.util.resource; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; + +/** + * A set of {@link ResourceUtilization} comparison and manipulation interfaces. + */ +@Private +@Unstable +public abstract class ResourceUtilizationCalculator { + + public abstract int + compare(ResourceUtilization clusterResourceUtilization, ResourceUtilization lhs, ResourceUtilization rhs); + + public static int divideAndCeil(int a, int b) { + if (b == 0) { + return 0; + } + return (a + (b - 1)) / b; + } + + public static int roundUp(int a, int b) { + return divideAndCeil(a, b) * b; + } + + public static int roundDown(int a, int b) { + return (a / b) * b; + } + + /** + * Compute the number of containers which can be allocated given + * available and required resource utilizations. + * + * @param available available resource utilizations + * @param required required resource utilizations + * @return number of containers which can be allocated + */ + public abstract int computeAvailableContainers( + ResourceUtilization available, ResourceUtilization required); + + /** + * Multiply resource utilization r by factor by + * and normalize up using step-factor stepFactor. + * + * @param r resource utilization to be multiplied + * @param by multiplier + * @param stepFactor factor by which to normalize up + * @return resulting normalized resource + */ + public abstract ResourceUtilization multiplyAndNormalizeUp( + ResourceUtilization r, double by, ResourceUtilization stepFactor); + + /** + * Multiply resource utilization r by factor by + * and normalize down using step-factor stepFactor. + * + * @param r resource utilization to be multiplied + * @param by multiplier + * @param stepFactor factor by which to normalize down + * @return resulting normalized resource + */ + public abstract ResourceUtilization multiplyAndNormalizeDown( + ResourceUtilization r, double by, ResourceUtilization stepFactor); + + /** + * Normalize resource utilization r given the base + * minimumResourceUtilization and verify against max allowed + * maximumResourceUtilization + * + * @param r resource utilization + * @param minimumResourceUtilization step-factor + * @param maximumResourceUtilization the upper bound of the resource to be + * allocated + * @return normalized resource + */ + public ResourceUtilization normalize(ResourceUtilization r, + ResourceUtilization minimumResourceUtilization, + ResourceUtilization maximumResourceUtilization) { + return normalize(r, minimumResourceUtilization, maximumResourceUtilization, + minimumResourceUtilization); + } + + /** + * Normalize resource utilization r given the base + * minimumResourceUtilization and verify against max allowed + * maximumResourceUtilization using a step factor for the + * normalization. + * + * @param r resource utilization + * @param minimumResourceUtilization minimum value + * @param maximumResourceUtilization the upper bound of the resource to be + * allocated + * @param stepFactor the increment for resource utilizations to be allocated + * @return normalized resource + */ + public abstract ResourceUtilization normalize(ResourceUtilization r, + ResourceUtilization minimumResourceUtilization, + ResourceUtilization maximumResourceUtilization, + ResourceUtilization stepFactor); + + + /** + * Round-up resource utilization r given factor + * stepFactor. + * + * @param r resource utilization + * @param stepFactor step-factor + * @return rounded resource + */ + public abstract ResourceUtilization roundUp(ResourceUtilization r, + ResourceUtilization stepFactor); + + /** + * Round-down resource utilization r given factor + * stepFactor. + * + * @param r resource utilization + * @param stepFactor step-factor + * @return rounded resource + */ + public abstract ResourceUtilization roundDown(ResourceUtilization r, + ResourceUtilization stepFactor); + + /** + * Divide resource utilization numerator by resource utilization + * denominator using specified policy (domination, average, + * fairness etc.); hence overall clusterResourceUtilization is + * provided for context. + * + * @param clusterResourceUtilization cluster resources + * @param numerator numerator + * @param denominator denominator + * @return numerator/denominator + * using specific policy + */ + public abstract float divide( + ResourceUtilization clusterResourceUtilization, + ResourceUtilization numerator, ResourceUtilization denominator); + + /** + * Determine if a resource utilization is not suitable for use as a divisor + * (will result in divide by 0, etc) + * + * @param r resource utilization + * @return true if divisor is invalid (should not be used), false else + */ + public abstract boolean isInvalidDivisor(ResourceUtilization r); + + /** + * Ratio of resource a to resource b. + * + * @param a resource + * @param b resource + * @return ratio of resource a to resource b + */ + public abstract float ratio(ResourceUtilization a, ResourceUtilization b); + + /** + * Divide-and-ceil numerator by denominator. + * + * @param numerator numerator resource + * @param denominator denominator + * @return resultant resource utilization + */ + public abstract ResourceUtilization divideAndCeil( + ResourceUtilization numerator, int denominator); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizations.java new file mode 100644 index 0000000..61b060a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtilizations.java @@ -0,0 +1,268 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.util.resource; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.util.Records; + +@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) +@Unstable +public class ResourceUtilizations { + + // Java doesn't have const :( + private static final ResourceUtilization NONE = new ResourceUtilization() { + + @Override + public int getMemory() { + return 0; + } + + @Override + public void setMemory(int memory) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public float getVirtualCores() { + return 0.0f; + } + + @Override + public void setVirtualCores(float cores) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public int compareTo(ResourceUtilization o) { + int diff = 0 - o.getMemory(); + if (diff == 0) { + diff = Float.compare(0, o.getVirtualCores()); + } + return diff; + } + + }; + + private static final ResourceUtilization UNBOUNDED = new ResourceUtilization() { + + @Override + public int getMemory() { + return Integer.MAX_VALUE; + } + + @Override + public void setMemory(int memory) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public float getVirtualCores() { + return Float.MAX_VALUE; + } + + @Override + public void setVirtualCores(float cores) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public int compareTo(ResourceUtilization o) { + int diff = 0 - o.getMemory(); + if (diff == 0) { + diff = Float.compare(0, o.getVirtualCores()); + } + return diff; + } + + }; + + public static ResourceUtilization createResourceUtilization(int memory) { + return createResourceUtilization(memory, (memory > 0) ? 1 : 0); + } + + public static ResourceUtilization createResourceUtilization(int memory, float cores) { + ResourceUtilization resourceUtilization = Records.newRecord(ResourceUtilization.class); + resourceUtilization.setMemory(memory); + resourceUtilization.setVirtualCores(cores); + return resourceUtilization; + } + + public static ResourceUtilization fromResource(Resource resource) { + return ResourceUtilizations.createResourceUtilization(resource.getMemory(), 1.0f*resource.getVirtualCores()); + } + + public static ResourceUtilization none() { + return NONE; + } + + public static ResourceUtilization unbounded() { + return UNBOUNDED; + } + + public static ResourceUtilization clone(ResourceUtilization res) { + return createResourceUtilization(res.getMemory(), res.getVirtualCores()); + } + + public static ResourceUtilization addTo(ResourceUtilization lhs, ResourceUtilization rhs) { + lhs.setMemory(lhs.getMemory() + rhs.getMemory()); + lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores()); + return lhs; + } + + public static ResourceUtilization add(ResourceUtilization lhs, ResourceUtilization rhs) { + return addTo(clone(lhs), rhs); + } + + public static ResourceUtilization subtractFrom(ResourceUtilization lhs, ResourceUtilization rhs) { + lhs.setMemory(lhs.getMemory() - rhs.getMemory()); + lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores()); + return lhs; + } + + public static ResourceUtilization subtract(ResourceUtilization lhs, ResourceUtilization rhs) { + return subtractFrom(clone(lhs), rhs); + } + + public static ResourceUtilization negate(ResourceUtilization resource) { + return subtract(NONE, resource); + } + + public static ResourceUtilization multiplyTo(ResourceUtilization lhs, double by) { + lhs.setMemory((int)(lhs.getMemory() * by)); + lhs.setVirtualCores((float)(lhs.getVirtualCores() * by)); + return lhs; + } + + public static ResourceUtilization multiply(ResourceUtilization lhs, double by) { + return multiplyTo(clone(lhs), by); + } + + public static ResourceUtilization multiplyAndNormalizeUp( + ResourceUtilizationCalculator calculator, ResourceUtilization lhs, double by, ResourceUtilization factor) { + return calculator.multiplyAndNormalizeUp(lhs, by, factor); + } + + public static ResourceUtilization multiplyAndNormalizeDown( + ResourceUtilizationCalculator calculator, ResourceUtilization lhs, double by, ResourceUtilization factor) { + return calculator.multiplyAndNormalizeDown(lhs, by, factor); + } + + public static ResourceUtilization multiplyAndRoundDown(ResourceUtilization lhs, double by) { + ResourceUtilization out = clone(lhs); + out.setMemory((int)(lhs.getMemory() * by)); + out.setVirtualCores((float)(lhs.getVirtualCores() * by)); + return out; + } + + public static ResourceUtilization normalize( + ResourceUtilizationCalculator calculator, ResourceUtilization lhs, ResourceUtilization min, + ResourceUtilization max, ResourceUtilization increment) { + return calculator.normalize(lhs, min, max, increment); + } + + public static ResourceUtilization roundUp( + ResourceUtilizationCalculator calculator, ResourceUtilization lhs, ResourceUtilization factor) { + return calculator.roundUp(lhs, factor); + } + + public static ResourceUtilization roundDown( + ResourceUtilizationCalculator calculator, ResourceUtilization lhs, ResourceUtilization factor) { + return calculator.roundDown(lhs, factor); + } + + public static boolean isInvalidDivisor( + ResourceUtilizationCalculator resourceCalculator, ResourceUtilization divisor) { + return resourceCalculator.isInvalidDivisor(divisor); + } + + public static float ratio( + ResourceUtilizationCalculator resourceCalculator, ResourceUtilization lhs, ResourceUtilization rhs) { + return resourceCalculator.ratio(lhs, rhs); + } + + public static float divide( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, ResourceUtilization lhs, ResourceUtilization rhs) { + return resourceCalculator.divide(clusterResourceUtilization, lhs, rhs); + } + + public static ResourceUtilization divideAndCeil( + ResourceUtilizationCalculator resourceCalculator, ResourceUtilization lhs, int rhs) { + return resourceCalculator.divideAndCeil(lhs, rhs); + } + + public static boolean equals(ResourceUtilization lhs, ResourceUtilization rhs) { + return lhs.equals(rhs); + } + + public static boolean lessThan( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, + ResourceUtilization lhs, ResourceUtilization rhs) { + return (resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) < 0); + } + + public static boolean lessThanOrEqual( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, + ResourceUtilization lhs, ResourceUtilization rhs) { + return (resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) <= 0); + } + + public static boolean greaterThan( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, + ResourceUtilization lhs, ResourceUtilization rhs) { + return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) > 0; + } + + public static boolean greaterThanOrEqual( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, + ResourceUtilization lhs, ResourceUtilization rhs) { + return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) >= 0; + } + + public static ResourceUtilization min( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, + ResourceUtilization lhs, ResourceUtilization rhs) { + return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) <= 0 ? lhs : rhs; + } + + public static ResourceUtilization max( + ResourceUtilizationCalculator resourceCalculator, + ResourceUtilization clusterResourceUtilization, + ResourceUtilization lhs, ResourceUtilization rhs) { + return resourceCalculator.compare(clusterResourceUtilization, lhs, rhs) >= 0 ? lhs : rhs; + } + + public static boolean fitsIn(ResourceUtilization smaller, ResourceUtilization bigger) { + return smaller.getMemory() <= bigger.getMemory() && + smaller.getVirtualCores() <= bigger.getVirtualCores(); + } + + public static ResourceUtilization componentwiseMin(ResourceUtilization lhs, ResourceUtilization rhs) { + return createResourceUtilization(Math.min(lhs.getMemory(), rhs.getMemory()), + Math.min(lhs.getVirtualCores(), rhs.getVirtualCores())); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java index b21b880..ba37166 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeHealthStatus.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.util.Records; /** @@ -90,4 +91,16 @@ public static NodeHealthStatus newInstance(boolean isNodeHealthy, @Private @Unstable public abstract void setLastHealthReportTime(long lastHealthReport); + + /** + * Get the resource utilization of the node. + * @return resource utilization of the node + */ + @Public + @Stable + public abstract ResourceUtilization getUtilization(); + + @Private + @Unstable + public abstract void setUtilization(ResourceUtilization utilization); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java index 75aa3d1..9d89fc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeHealthStatusPBImpl.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProtoOrBuilder; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -128,4 +131,31 @@ public void setLastHealthReportTime(long lastHealthReport) { this.builder.setLastHealthReportTime((lastHealthReport)); } + @Override + public ResourceUtilization getUtilization() { + NodeHealthStatusProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasUtilization()) { + return null; + } + return convertFromProtoFormat(p.getUtilization()); + } + + @Override + public void setUtilization(ResourceUtilization utilization) { + maybeInitBuilder(); + if (utilization == null) { + this.builder.clearUtilization(); + return; + } + this.builder.setUtilization(convertToProtoFormat(utilization)); + } + + private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { + return ((ResourceUtilizationPBImpl) r).getProto(); + } + + private ResourceUtilizationPBImpl convertFromProtoFormat(ResourceUtilizationProto p) { + return new ResourceUtilizationPBImpl(p); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 99149ac..503ad94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -47,6 +47,7 @@ message NodeHealthStatusProto { optional bool is_node_healthy = 1; optional string health_report = 2; optional int64 last_health_report_time = 3; + optional ResourceUtilizationProto utilization = 4; } message VersionProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 4a28c6f..e8d0929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -121,7 +121,7 @@ protected NodeLabelsProvider createNodeLabelsProvider( } protected NodeResourceMonitor createNodeResourceMonitor() { - return new NodeResourceMonitorImpl(); + return new NodeResourceMonitorImpl(context); } protected ContainerManagerImpl createContainerManager(Context context, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java index be13d22..bb9b3de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitor.java @@ -19,7 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; public interface NodeResourceMonitor extends Service { - + public ResourceUtilization getUtilization(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java index ea82546..7aa53f5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeResourceMonitorImpl.java @@ -18,13 +18,119 @@ package org.apache.hadoop.yarn.server.nodemanager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.resource.ResourceUtilizations; public class NodeResourceMonitorImpl extends AbstractService implements NodeResourceMonitor { - public NodeResourceMonitorImpl() { + final static Log LOG = LogFactory + .getLog(NodeResourceMonitorImpl.class); + + private long monitoringInterval; + private MonitoringThread monitoringThread; + + private final Context context; + private ResourceCalculatorPlugin resourceCalculatorPlugin; + private Configuration conf; + + private ResourceUtilization nodeUtilization; + + public NodeResourceMonitorImpl(Context context) { super(NodeResourceMonitorImpl.class.getName()); + + this.context = context; + + this.monitoringThread = new MonitoringThread(); + } + + protected void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + + this.monitoringInterval = + conf.getLong(YarnConfiguration.NM_NODE_MON_INTERVAL_MS, + YarnConfiguration.DEFAULT_NM_NODE_MON_INTERVAL_MS); + + Class clazz = + conf.getClass(YarnConfiguration.NM_NODE_MON_RESOURCE_CALCULATOR, null, + ResourceCalculatorPlugin.class); + this.resourceCalculatorPlugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf); + LOG.info(" Using ResourceCalculatorPlugin : " + + this.resourceCalculatorPlugin); + } + + private boolean isEnabled() { + if (resourceCalculatorPlugin == null) { + LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + + this.getClass().getName() + " is disabled."); + return false; + } + return true; } + @Override + protected void serviceStart() throws Exception { + if (this.isEnabled()) { + this.monitoringThread.start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.isEnabled()) { + this.monitoringThread.interrupt(); + try { + this.monitoringThread.join(); + } catch (InterruptedException e) { + ; + } + } + super.serviceStop(); + } + + private class MonitoringThread extends Thread { + public MonitoringThread() { + super("Node Resource Monitor"); + } + + @Override + public void run() { + while (true) { + // Get node utilization and save it into the health status + long memory = resourceCalculatorPlugin.getPhysicalMemorySize() - + resourceCalculatorPlugin.getAvailablePhysicalMemorySize(); + float cpu = resourceCalculatorPlugin.getCpuUsage(); + nodeUtilization = ResourceUtilizations.createResourceUtilization( + (int)(memory >> 20), // B -> MB + cpu); // CPU% -> VCores (1CPU at 100% is 1.0) + // Store this info in the health status to send it to the RM + context.getNodeHealthStatus().setUtilization(nodeUtilization); + + LOG.debug("Node utilization: " + + context.getNodeHealthStatus().getUtilization() + " CPU=" + cpu + + " Memory=" + memory); + + try { + Thread.sleep(monitoringInterval); + } catch (InterruptedException e) { + LOG.warn(NodeResourceMonitorImpl.class.getName() + + " is interrupted. Exiting."); + break; + } + } + } + } + + @Override + public ResourceUtilization getUtilization() { + return this.nodeUtilization; + } }