From 881945f29e01805b0ca96c63f48aed67cb584074 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Tue, 31 Oct 2017 18:56:06 +0530 Subject: [PATCH] YARN-6909 --- .../apache/hadoop/yarn/api/records/Resource.java | 41 +++++-- .../yarn/api/records/impl/HeavyWeightResource.java | 123 +++++++++++++++++++++ .../yarn/api/records/impl/LightWeightResource.java | 21 +--- .../hadoop/yarn/util/resource/ResourceUtils.java | 1 + .../yarn/api/records/impl/pb/ResourcePBImpl.java | 71 +++++------- .../scheduler/ClusterNodeTracker.java | 2 +- 6 files changed, 185 insertions(+), 74 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/HeavyWeightResource.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 796b6662ddf..d30320f21a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -28,9 +28,10 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.records.impl.HeavyWeightResource; import org.apache.hadoop.yarn.api.records.impl.LightWeightResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.ResourceUtils; /** @@ -76,10 +77,7 @@ @Stable public static Resource newInstance(int memory, int vCores) { if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { - Resource ret = Records.newRecord(Resource.class); - ret.setMemorySize(memory); - ret.setVirtualCores(vCores); - return ret; + return new HeavyWeightResource(memory, vCores); } return new LightWeightResource(memory, vCores); } @@ -88,10 +86,7 @@ public static Resource newInstance(int memory, int vCores) { @Stable public static Resource newInstance(long memory, int vCores) { if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { - Resource ret = Records.newRecord(Resource.class); - ret.setMemorySize(memory); - ret.setVirtualCores(vCores); - return ret; + return new HeavyWeightResource(memory, vCores); } return new LightWeightResource(memory, vCores); } @@ -99,11 +94,14 @@ public static Resource newInstance(long memory, int vCores) { @InterfaceAudience.Private @InterfaceStability.Unstable public static Resource newInstance(Resource resource) { - Resource ret = Resource.newInstance(resource.getMemorySize(), - resource.getVirtualCores()); + Resource ret; if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + ret = new HeavyWeightResource(); Resource.copy(resource, ret); + return ret; } + ret = Resource.newInstance(resource.getMemorySize(), + resource.getVirtualCores()); return ret; } @@ -364,7 +362,7 @@ public void setResourceValue(int index, long value) } } - private void throwExceptionWhenArrayOutOfBound(int index) { + protected void throwExceptionWhenArrayOutOfBound(int index) { String exceptionMsg = String.format( "Trying to access ResourceInformation for given index=%d. " + "Acceptable index range is [0,%d), please check double check " @@ -483,4 +481,23 @@ protected static int castToIntSafely(long value) { } return Long.valueOf(value).intValue(); } + + /** + * Create ResourceInformation with basic fields. + * @param name Resource Type Name + * @param unit Default unit of provided resource type + * @param value Value associated with giveb resource + * @return ResourceInformation object + */ + protected static ResourceInformation newDefaultInformation(String name, + String unit, long value) { + ResourceInformation ri = new ResourceInformation(); + ri.setName(name); + ri.setValue(value); + ri.setResourceType(ResourceTypes.COUNTABLE); + ri.setUnitsWithoutValidation(unit); + ri.setMinimumAllocation(0); + ri.setMaximumAllocation(Long.MAX_VALUE); + return ri; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/HeavyWeightResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/HeavyWeightResource.java new file mode 100644 index 00000000000..9edecba351c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/HeavyWeightResource.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_MB; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI; +import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI; + +/** + *

+ * HeavyWeightResource extends Resource to handle all resources. + *

+ * + *

+ * Apart from memory and CPU, it models other resources also. + *

+ * + *

+ * The unit for memory is megabytes. CPU is modeled with virtual cores (vcores), + * a unit for expressing parallelism. A node's capacity should be configured + * with virtual cores equal to its number of physical cores. A container should + * be requested with the number of cores it can saturate, i.e. the average + * number of threads it expects to have runnable at a time. + *

+ * + *

+ * Virtual cores take integer values and thus currently CPU-scheduling is very + * coarse. A complementary axis for CPU requests that represents processing + * power will likely be added in the future to enable finer-grained resource + * configuration. + *

+ * + *

+ * For all other resources, information such as units, resource type, min/max + * values etc will be retrieved from 'resource-types.xml'. + *

+ * @see Resource + */ +@InterfaceAudience.Private +@Unstable +public class HeavyWeightResource extends Resource { + + public HeavyWeightResource(long memory, long vcores) { + ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); + resources = new ResourceInformation[types.length]; + + resources[MEMORY_INDEX] = newDefaultInformation(MEMORY_URI, + MEMORY_MB.getUnits(), memory); + resources[VCORES_INDEX] = newDefaultInformation(VCORES_URI, "", vcores); + + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + for (int i = 2; i < maxLength; i++) { + resources[i] = new ResourceInformation(); + ResourceInformation.copy(types[i], resources[i]); + } + } + + public HeavyWeightResource() { + ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); + int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + resources = new ResourceInformation[maxLength]; + for (int i = 0; i < maxLength; i++) { + resources[i] = new ResourceInformation(); + ResourceInformation.copy(types[i], resources[i]); + } + } + + @Override + public int getMemory() { + // memory should always be present + return castToIntSafely(this.getMemorySize()); + } + + @Override + public long getMemorySize() { + // memory should always be present + ResourceInformation ri = resources[MEMORY_INDEX]; + return ri.getValue(); + } + + @Override + public void setMemory(int memory) { + resources[MEMORY_INDEX].setValue(memory); + } + + @Override + public void setMemorySize(long memory) { + resources[MEMORY_INDEX].setValue(memory); + } + + @Override + public int getVirtualCores() { + // vcores should always be present + return castToIntSafely(resources[VCORES_INDEX].getValue()); + } + + @Override + public void setVirtualCores(int vCores) { + resources[VCORES_INDEX].setValue(vCores); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java index a64d242a4ab..d557b003bf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/LightWeightResource.java @@ -30,7 +30,7 @@ /** *

- * LightResource extends Resource to handle base resources such + * LightWeightResource extends Resource to handle base resources such * as memory and CPU. * TODO: We have a long term plan to use AbstractResource when additional * resource types are to be handled as well. @@ -67,28 +67,15 @@ private ResourceInformation vcoresResInfo; public LightWeightResource(long memory, long vcores) { - this.memoryResInfo = LightWeightResource.newDefaultInformation(MEMORY_URI, - MEMORY_MB.getUnits(), memory); - this.vcoresResInfo = LightWeightResource.newDefaultInformation(VCORES_URI, - "", vcores); + this.memoryResInfo = newDefaultInformation(MEMORY_URI, MEMORY_MB.getUnits(), + memory); + this.vcoresResInfo = newDefaultInformation(VCORES_URI, "", vcores); resources = new ResourceInformation[NUM_MANDATORY_RESOURCES]; resources[MEMORY_INDEX] = memoryResInfo; resources[VCORES_INDEX] = vcoresResInfo; } - private static ResourceInformation newDefaultInformation(String name, - String unit, long value) { - ResourceInformation ri = new ResourceInformation(); - ri.setName(name); - ri.setValue(value); - ri.setResourceType(ResourceTypes.COUNTABLE); - ri.setUnitsWithoutValidation(unit); - ri.setMinimumAllocation(0); - ri.setMaximumAllocation(Long.MAX_VALUE); - return ri; - } - @Override @SuppressWarnings("deprecation") public int getMemory() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 3a09de5d6ba..db6e7f10699 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -262,6 +262,7 @@ public static void initializeResourcesFromResourceInformationMap( updateKnownResources(); updateResourceTypeIndex(); initializedResources = true; + numKnownResourceTypes = resourceTypes.size(); } private static void updateKnownResources() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java index 4ae64c2582a..6fbfcf83884 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java @@ -35,7 +35,6 @@ import java.util.Map; - @Private @Unstable public class ResourcePBImpl extends Resource { @@ -50,7 +49,7 @@ static ResourceProto getProto(Resource r) { final ResourcePBImpl pb; if (r instanceof ResourcePBImpl) { - pb = (ResourcePBImpl)r; + pb = (ResourcePBImpl) r; } else { pb = new ResourcePBImpl(); pb.setMemorySize(r.getMemorySize()); @@ -111,7 +110,7 @@ public void setMemory(int memory) { @Override public void setMemorySize(long memory) { maybeInitBuilder(); - getResourceInformation(ResourceInformation.MEMORY_URI).setValue(memory); + resources[MEMORY_INDEX].setValue(memory); } @Override @@ -123,7 +122,7 @@ public int getVirtualCores() { @Override public void setVirtualCores(int vCores) { maybeInitBuilder(); - getResourceInformation(ResourceInformation.VCORES_URI).setValue(vCores); + resources[VCORES_INDEX].setValue(vCores); } private void initResources() { @@ -131,31 +130,33 @@ private void initResources() { return; } ResourceProtoOrBuilder p = viaProto ? proto : builder; - initResourcesMap(); + ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); Map indexMap = ResourceUtils.getResourceTypeIndex(); + resources = new ResourceInformation[types.length]; + for (ResourceInformationProto entry : p.getResourceValueMapList()) { - ResourceTypes type = - entry.hasType() ? ProtoUtils.convertFromProtoFormat(entry.getType()) : - ResourceTypes.COUNTABLE; - - // When unit not specified in proto, use the default unit. - String units = - entry.hasUnits() ? entry.getUnits() : ResourceUtils.getDefaultUnit( - entry.getKey()); - long value = entry.hasValue() ? entry.getValue() : 0L; - ResourceInformation ri = ResourceInformation - .newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE); Integer index = indexMap.get(entry.getKey()); if (index == null) { - LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping"); + LOG.warn("Got unknown resource type: " + entry.getKey() + "; skipping"); } else { - resources[index].setResourceType(ri.getResourceType()); - resources[index].setUnits(ri.getUnits()); - resources[index].setValue(value); + resources[index] = newDefaultInformation(types[index], entry); } } - this.setMemorySize(p.getMemory()); - this.setVirtualCores(p.getVirtualCores()); + } + + private static ResourceInformation newDefaultInformation( + ResourceInformation resourceInformation, ResourceInformationProto entry) { + ResourceInformation ri = new ResourceInformation(); + ri.setName(resourceInformation.getName()); + ri.setMinimumAllocation(resourceInformation.getMinimumAllocation()); + ri.setMaximumAllocation(resourceInformation.getMaximumAllocation()); + ri.setResourceType(entry.hasType() + ? ProtoUtils.convertFromProtoFormat(entry.getType()) + : ResourceTypes.COUNTABLE); + ri.setUnits( + entry.hasUnits() ? entry.getUnits() : resourceInformation.getUnits()); + ri.setValue(entry.hasValue() ? entry.getValue() : 0L); + return ri; } @Override @@ -166,10 +167,8 @@ public void setResourceInformation(String resource, throw new IllegalArgumentException( "resource and/or resourceInformation cannot be null"); } - if (!resource.equals(resourceInformation.getName())) { - resourceInformation.setName(resource); - } - ResourceInformation storedResourceInfo = getResourceInformation(resource); + ResourceInformation storedResourceInfo = super.getResourceInformation( + resource); ResourceInformation.copy(resourceInformation, storedResourceInfo); } @@ -195,25 +194,9 @@ public long getResourceValue(String resource) return super.getResourceValue(resource); } - private void initResourcesMap() { - if (resources == null) { - ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); - if (types == null) { - throw new YarnRuntimeException( - "Got null return value from ResourceUtils.getResourceTypes()"); - } - - resources = new ResourceInformation[types.length]; - for (ResourceInformation entry : types) { - int index = ResourceUtils.getResourceTypeIndex().get(entry.getName()); - resources[index] = ResourceInformation.newInstance(entry); - } - } - } - synchronized private void mergeLocalToBuilder() { builder.clearResourceValueMap(); - if(resources != null && resources.length != 0) { + if (resources != null && resources.length != 0) { for (ResourceInformation resInfo : resources) { ResourceInformationProto.Builder e = ResourceInformationProto .newBuilder(); @@ -236,4 +219,4 @@ private void mergeLocalToProto() { proto = builder.build(); viaProto = true; } -} +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index ccec6bc6a89..86802c6b1b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -55,7 +55,7 @@ private Map nodeNameToNodeMap = new HashMap<>(); private Map> nodesPerRack = new HashMap<>(); - private Resource clusterCapacity = Resources.clone(Resources.none()); + private Resource clusterCapacity = Resources.createResource(0, 0); private Resource staleClusterCapacity = null; // Max allocation -- 2.13.5 (Apple Git-94)