From 20208b243c3b307148670de934ae6eee4f73f440 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Fri, 21 Jul 2017 21:16:14 +0530 Subject: [PATCH] YARN-6788-YARN-3926 --- .../hadoop/yarn/api/records/ProfileCapability.java | 8 +- .../apache/hadoop/yarn/api/records/Resource.java | 242 ++++++++++----------- .../yarn/api/records/ResourceInformation.java | 13 +- .../hadoop/yarn/api/records/impl/BaseResource.java | 133 +++++++++++ .../hadoop/yarn/api/records/impl/package-info.java | 20 ++ .../hadoop/yarn/util/UnitsConversionUtil.java | 8 +- .../hadoop/yarn/util/resource/ResourceUtils.java | 114 ++++++++-- .../yarn/api/records/impl/pb/ProtoUtils.java | 5 +- .../yarn/api/records/impl/pb/ResourcePBImpl.java | 113 +++++----- .../util/resource/DominantResourceCalculator.java | 47 ++-- .../hadoop/yarn/util/resource/Resources.java | 153 ++++++------- .../yarn/util/resource/TestResourceUtils.java | 14 +- .../hadoop/yarn/util/resource/TestResources.java | 11 +- .../resource/ResourceProfilesManagerImpl.java | 8 +- .../rmapp/attempt/RMAppAttemptMetrics.java | 11 +- .../scheduler/SchedulerApplicationAttempt.java | 9 +- .../scheduler/activities/ActivitiesLogger.java | 33 +-- .../scheduler/capacity/LeafQueue.java | 83 +++++-- .../resourcemanager/webapp/dao/SchedulerInfo.java | 3 +- .../scheduler/capacity/TestCapacityScheduler.java | 146 ++++++++++++- .../scheduler/capacity/TestLeafQueue.java | 10 +- 21 files changed, 781 insertions(+), 403 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/package-info.java rename hadoop-yarn-project/hadoop-yarn/{hadoop-yarn-common => hadoop-yarn-api}/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java (84%) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java index 1a8d1c3..2cb4670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java @@ -162,10 +162,10 @@ public static Resource toResource(ProfileCapability capability, if (capability.getProfileCapabilityOverride() != null && !capability.getProfileCapabilityOverride().equals(none)) { - for (Map.Entry entry : capability - .getProfileCapabilityOverride().getResources().entrySet()) { - if (entry.getValue() != null && entry.getValue().getValue() >= 0) { - resource.setResourceInformation(entry.getKey(), entry.getValue()); + for (ResourceInformation entry : capability + .getProfileCapabilityOverride().getResources()) { + if (entry != null && entry.getValue() >= 0) { + resource.setResourceInformation(entry.getName(), entry); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 9a8e2ec..c61594b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.api.records; +import java.util.Arrays; + import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -25,13 +27,11 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; /** *

Resource models a set of computer resources in the @@ -60,97 +60,49 @@ @Stable public abstract class Resource implements Comparable { - private static Resource tmpResource = Records.newRecord(Resource.class); - - private static class SimpleResource extends Resource { - private long memory; - private long vcores; - private Map resourceInformationMap; - - SimpleResource(long memory, long vcores) { - this.memory = memory; - this.vcores = vcores; - - } - @Override - public int getMemory() { - return (int)memory; - } - @Override - public void setMemory(int memory) { - this.memory = memory; - } - @Override - public long getMemorySize() { - return memory; - } - @Override - public void setMemorySize(long memory) { - this.memory = memory; - } - @Override - public int getVirtualCores() { - return (int)vcores; - } - @Override - public void setVirtualCores(int vcores) { - this.vcores = vcores; - } - @Override - public Map getResources() { - if (resourceInformationMap == null) { - resourceInformationMap = new HashMap<>(); - resourceInformationMap.put(ResourceInformation.MEMORY_MB.getName(), - ResourceInformation.newInstance(ResourceInformation.MEMORY_MB)); - resourceInformationMap.put(ResourceInformation.VCORES.getName(), - ResourceInformation.newInstance(ResourceInformation.VCORES)); - } - resourceInformationMap.get(ResourceInformation.MEMORY_MB.getName()) - .setValue(this.memory); - resourceInformationMap.get(ResourceInformation.VCORES.getName()) - .setValue(this.vcores); - return Collections.unmodifiableMap(resourceInformationMap); - } - } + protected static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); + protected static final String VCORES = ResourceInformation.VCORES.getName(); @Public @Stable public static Resource newInstance(int memory, int vCores) { - if (tmpResource.getResources().size() > 2) { + if (ResourceUtils.getResourceTypesArray().length > 2) { Resource ret = Records.newRecord(Resource.class); ret.setMemorySize(memory); ret.setVirtualCores(vCores); return ret; } - return new SimpleResource(memory, vCores); + return new BaseResource(memory, vCores); } @Public @Stable public static Resource newInstance(long memory, int vCores) { - if (tmpResource.getResources().size() > 2) { + if (ResourceUtils.getResourceTypesArray().length > 2) { Resource ret = Records.newRecord(Resource.class); ret.setMemorySize(memory); ret.setVirtualCores(vCores); return ret; } - return new SimpleResource(memory, vCores); + return new BaseResource(memory, vCores); } @InterfaceAudience.Private @InterfaceStability.Unstable public static Resource newInstance(Resource resource) { - Resource ret = Resource.newInstance(0, 0); - Resource.copy(resource, ret); + Resource ret = Resource.newInstance(resource.getMemorySize(), + resource.getVirtualCores()); + if (ResourceUtils.getResourceTypesArray().length > 2) { + Resource.copy(resource, ret); + } return ret; } @InterfaceAudience.Private @InterfaceStability.Unstable public static void copy(Resource source, Resource dest) { - for (Map.Entry entry : source.getResources() - .entrySet()) { - dest.setResourceInformation(entry.getKey(), entry.getValue()); + for (ResourceInformation entry : source.getResources()) { + dest.setResourceInformation(entry.getName(), entry); } } @@ -251,7 +203,7 @@ public void setMemorySize(long memory) { */ @Public @Evolving - public abstract Map getResources(); + public abstract ResourceInformation[] getResources(); /** * Get ResourceInformation for a specified resource. @@ -264,12 +216,13 @@ public void setMemorySize(long memory) { @Evolving public ResourceInformation getResourceInformation(String resource) throws YarnException { - if (getResources().containsKey(resource)) { - return getResources().get(resource); + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + ResourceInformation[] resources = getResources(); + if (index != null) { + return resources[index]; } - throw new YarnException( - "Unknown resource '" + resource + "'. Known resources are " - + getResources().keySet()); + throw new YarnException("Unknown resource '" + resource + + "'. Known resources are " + Arrays.toString(resources)); } /** @@ -282,13 +235,14 @@ public ResourceInformation getResourceInformation(String resource) */ @Public @Evolving - public Long getResourceValue(String resource) throws YarnException { - if (getResources().containsKey(resource)) { - return getResources().get(resource).getValue(); + public long getResourceValue(String resource) throws YarnException { + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + ResourceInformation[] resources = getResources(); + if (index != null) { + return resources[index].getValue(); } - throw new YarnException( - "Unknown resource '" + resource + "'. Known resources are " - + getResources().keySet()); + throw new YarnException("Unknown resource '" + resource + + "'. Known resources are " + Arrays.toString(resources)); } /** @@ -301,23 +255,24 @@ public Long getResourceValue(String resource) throws YarnException { @Public @Evolving public void setResourceInformation(String resource, - ResourceInformation resourceInformation) throws ResourceNotFoundException { - if (resource.equals(ResourceInformation.MEMORY_MB.getName())) { + ResourceInformation resourceInformation) + throws ResourceNotFoundException { + if (resource.equals(MEMORY)) { this.setMemorySize(resourceInformation.getValue()); return; } - if (resource.equals(ResourceInformation.VCORES.getName())) { + if (resource.equals(VCORES)) { this.setVirtualCores((int) resourceInformation.getValue()); return; } - if (getResources().containsKey(resource)) { - ResourceInformation - .copy(resourceInformation, getResources().get(resource)); + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + ResourceInformation[] resources = getResources(); + if (index != null) { + ResourceInformation.copy(resourceInformation, resources[index]); return; } - throw new ResourceNotFoundException( - "Unknown resource '" + resource + "'. Known resources are " - + getResources().keySet()); + throw new ResourceNotFoundException("Unknown resource '" + resource + + "'. Known resources are " + Arrays.toString(resources)); } /** @@ -332,21 +287,23 @@ public void setResourceInformation(String resource, @Evolving public void setResourceValue(String resource, Long value) throws ResourceNotFoundException { - if (resource.equals(ResourceInformation.MEMORY_MB.getName())) { + if (resource.equals(MEMORY)) { this.setMemorySize(value); return; } - if (resource.equals(ResourceInformation.VCORES.getName())) { + if (resource.equals(VCORES)) { this.setVirtualCores(value.intValue()); return; } - if (getResources().containsKey(resource)) { - getResources().get(resource).setValue(value); + + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + ResourceInformation[] resources = getResources(); + if (index != null) { + resources[index].setValue(value); return; } - throw new ResourceNotFoundException( - "Unknown resource '" + resource + "'. Known resources are " - + getResources().keySet()); + throw new ResourceNotFoundException("Unknown resource '" + resource + + "'. Known resources are " + Arrays.toString(resources)); } @Override @@ -356,13 +313,12 @@ public int hashCode() { int result = (int) (939769357 + getMemorySize()); // prime * result = 939769357 initially result = prime * result + getVirtualCores(); - for (Map.Entry entry : getResources() - .entrySet()) { - if (entry.getKey().equals(ResourceInformation.MEMORY_MB.getName()) - || entry.getKey().equals(ResourceInformation.VCORES.getName())) { + for (ResourceInformation entry : getResources()) { + if (entry.getName().equals(MEMORY) + || entry.getName().equals(VCORES)) { continue; } - result = prime * result + entry.getValue().hashCode(); + result = prime * result + entry.hashCode(); } return result; } @@ -379,11 +335,32 @@ public boolean equals(Object obj) { return false; } Resource other = (Resource) obj; - if (getMemorySize() != other.getMemorySize() || getVirtualCores() != other - .getVirtualCores()) { + if (getMemorySize() != other.getMemorySize() + || getVirtualCores() != other.getVirtualCores()) { + return false; + } + + ResourceInformation[] myVectors = getResources(); + ResourceInformation[] otherVectors = other.getResources(); + + if (myVectors.length != otherVectors.length) { return false; } - return this.getResources().equals(other.getResources()); + + for (int i = 0; i < myVectors.length; i++) { + ResourceInformation a = myVectors[i]; + ResourceInformation b = otherVectors[i]; + if (a == b || (a == null && b == null)) { + continue; + } else if ((a == null && b != null) || (a != null && b == null)) { + return false; + } else { + if (!a.equals(b)) { + return false; + } + } + } + return true; } @Override @@ -391,21 +368,20 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append(" entry : getResources() - .entrySet()) { - if (entry.getKey().equals(ResourceInformation.MEMORY_MB.getName()) - && entry.getValue().getUnits() + for (ResourceInformation entry : getResources()) { + if (entry.getName().equals(MEMORY) + && entry.getUnits() .equals(ResourceInformation.MEMORY_MB.getUnits())) { continue; } - if (entry.getKey().equals(ResourceInformation.VCORES.getName()) - && entry.getValue().getUnits() + if (entry.getName().equals(VCORES) + && entry.getUnits() .equals(ResourceInformation.VCORES.getUnits())) { continue; } - sb.append(", ").append(entry.getKey()).append(": ") - .append(entry.getValue().getValue()) - .append(entry.getValue().getUnits()); + sb.append(", ").append(entry.getName()).append(": ") + .append(entry.getValue()) + .append(entry.getUnits()); } sb.append(">"); return sb.toString(); @@ -413,30 +389,30 @@ public String toString() { @Override public int compareTo(Resource other) { - Map thisResources, otherResources; - thisResources = this.getResources(); - otherResources = other.getResources(); - long diff = thisResources.size() - otherResources.size(); + ResourceInformation[] thisResources = this.getResources(); + ResourceInformation[] otherResources = other.getResources(); + + // compare memory and vcores first(in that order) to preserve + // existing behaviour + long diff = this.getMemorySize() - other.getMemorySize(); if (diff == 0) { - // compare memory and vcores first(in that order) to preserve - // existing behaviour - if (thisResources.keySet().equals(otherResources.keySet())) { - diff = this.getMemorySize() - other.getMemorySize(); - if (diff == 0) { - diff = this.getVirtualCores() - other.getVirtualCores(); - } - if (diff == 0) { - for (Map.Entry entry : thisResources - .entrySet()) { - if (entry.getKey().equals(ResourceInformation.MEMORY_MB.getName()) - || entry.getKey() - .equals(ResourceInformation.VCORES.getName())) { - continue; - } - diff = - entry.getValue().compareTo(otherResources.get(entry.getKey())); - if (diff != 0) { - break; + diff = this.getVirtualCores() - other.getVirtualCores(); + } + if (diff == 0) { + diff = thisResources.length - otherResources.length; + if (diff == 0) { + for (ResourceInformation entry : thisResources) { + if (entry.getName().equals(ResourceInformation.MEMORY_MB.getName()) + || entry.getName().equals(ResourceInformation.VCORES.getName())) { + continue; + } + + for (ResourceInformation otherEntry : otherResources) { + if (entry.getName().equals(otherEntry.getName())) { + diff = entry.compareTo(otherEntry); + if (diff != 0) { + break; + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java index d75b441..3ab7ccd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java @@ -242,10 +242,15 @@ public boolean equals(Object obj) { return false; } ResourceInformation r = (ResourceInformation) obj; - int cmp = - UnitsConversionUtil.compare(this.units, this.value, r.units, r.value); - return this.name.equals(r.getName()) && this.resourceType - .equals(r.getResourceType()) && (cmp == 0); + if (!this.name.equals(r.getName()) + || !this.resourceType.equals(r.getResourceType())) { + return false; + } + if (this.units.equals(r.units)) { + return this.value == r.value; + } + return (UnitsConversionUtil.compare(this.units, this.value, r.units, + r.value) == 0); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java new file mode 100644 index 0000000..1d3f216 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/BaseResource.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; + +import java.util.Arrays; + +/** + *

+ * BaseResource extends Resource to handle base resources such + * as memory and CPU. + * TODO: We have a long term plan to use AbstractResource when additional + * resource types are to be handled as well. + *

+ * + *

+ * Currently it models both memory and CPU. + *

+ * + *

+ * The unit for memory is megabytes. CPU is modeled with virtual cores (vcores), + * a unit for expressing parallelism. A node's capacity should be configured + * with virtual cores equal to its number of physical cores. A container should + * be requested with the number of cores it can saturate, i.e. the average + * number of threads it expects to have runnable at a time. + *

+ * + *

+ * Virtual cores take integer values and thus currently CPU-scheduling is very + * coarse. A complementary axis for CPU requests that represents processing + * power will likely be added in the future to enable finer-grained resource + * configuration. + *

+ * + * @see Resource + */ +@Public +@Unstable +public class BaseResource extends Resource { + + private ResourceInformation memoryResInfo; + private ResourceInformation vcoresResInfo; + protected ResourceInformation[] resources = null; + protected ResourceInformation[] readOnlyResources = null; + + protected enum MandatoryResources { + MEMORY(0), VCORES(1); + + private final int id; + + MandatoryResources(int id) { + this.id = id; + } + + public int getId() { + return this.id; + } + } + + public BaseResource() { + // Base constructor. + } + + public BaseResource(long memory, long vcores) { + this.memoryResInfo = ResourceInformation.newInstance(MEMORY, + ResourceInformation.MEMORY_MB.getUnits(), memory); + this.vcoresResInfo = ResourceInformation.newInstance(VCORES, "", vcores); + + resources = new ResourceInformation[MandatoryResources.values().length]; + readOnlyResources = new ResourceInformation[MandatoryResources + .values().length]; + resources[MandatoryResources.MEMORY.id] = memoryResInfo; + resources[MandatoryResources.VCORES.id] = vcoresResInfo; + Arrays.copyOf(readOnlyResources, resources.length); + } + + @Override + @SuppressWarnings("deprecation") + public int getMemory() { + return (int) memoryResInfo.getValue(); + } + + @Override + @SuppressWarnings("deprecation") + public void setMemory(int memory) { + this.memoryResInfo.setValue(memory); + } + + @Override + public long getMemorySize() { + return memoryResInfo.getValue(); + } + + @Override + public void setMemorySize(long memory) { + this.memoryResInfo.setValue(memory); + } + + @Override + public int getVirtualCores() { + return (int) vcoresResInfo.getValue(); + } + + @Override + public void setVirtualCores(int vcores) { + this.vcoresResInfo.setValue(vcores); + } + + @Override + public ResourceInformation[] getResources() { + return resources; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/package-info.java new file mode 100644 index 0000000..a43b3cb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.api.records.impl; +import org.apache.hadoop.classification.InterfaceAudience; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java index c7663de..7b737bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java @@ -186,11 +186,11 @@ public static int compare(String unitA, long valueA, String unitB, if (!KNOWN_UNITS.contains(unitB)) { throw new IllegalArgumentException("Unknown unit '" + unitB + "'"); } - Converter unitAC = getConverter(unitA); - Converter unitBC = getConverter(unitB); if (unitA.equals(unitB)) { - return Long.valueOf(valueA).compareTo(valueB); + return Long.compare(valueA, valueB); } + Converter unitAC = getConverter(unitA); + Converter unitBC = getConverter(unitB); int unitAPos = SORTED_UNITS.indexOf(unitA); int unitBPos = SORTED_UNITS.indexOf(unitB); try { @@ -201,7 +201,7 @@ public static int compare(String unitA, long valueA, String unitB, } else { tmpA = convert(unitA, unitB, valueA); } - return Long.valueOf(tmpA).compareTo(tmpB); + return Long.compare(tmpA, tmpB); } catch (IllegalArgumentException ie) { BigInteger tmpA = BigInteger.valueOf(valueA); BigInteger tmpB = BigInteger.valueOf(valueB); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java similarity index 84% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 86cf872..9924475 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -37,11 +37,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; /** * Helper class to read the resource-types to be supported by the system. @@ -66,7 +68,11 @@ } private static volatile Object lock; + private static final Map indexForResourceInformation = + new ConcurrentHashMap(); private static Map readOnlyResources; + private static String[] resourceNamesArray; + private static ResourceInformation[] readOnlyResourcesArray; private static volatile Object nodeLock; private static Map readOnlyNodeResources; @@ -252,6 +258,48 @@ static void initializeResourcesMap(Configuration conf, setMinimumAllocationForMandatoryResources(resourceInformationMap, conf); setMaximumAllocationForMandatoryResources(resourceInformationMap, conf); readOnlyResources = Collections.unmodifiableMap(resourceInformationMap); + updateReadOnlyResources(); + updateResourceTypeIndex(); + } + + private static void updateReadOnlyResources() { + readOnlyResourcesArray = new ResourceInformation[readOnlyResources.size()]; + readOnlyResourcesArray[0] = ResourceInformation + .newInstance(readOnlyResources.get(MEMORY)); + readOnlyResourcesArray[1] = ResourceInformation + .newInstance(readOnlyResources.get(VCORES)); + + if (readOnlyResources.size() > 2) { + int index = 2; + for (ResourceInformation resInfo : readOnlyResources.values()) { + if (resInfo.getName().equals(MEMORY) + || resInfo.getName().equals(VCORES)) { + continue; + } + readOnlyResourcesArray[index] = ResourceInformation + .newInstance(resInfo); + index++; + } + } + } + + private static void updateResourceTypeIndex() { + indexForResourceInformation.clear(); + + int index = 0; + for (ResourceInformation resInfo : readOnlyResourcesArray) { + indexForResourceInformation.put(resInfo.getName(), index); + index++; + } + } + + /** + * Get associate index of resource types such memory, cpu etc. + * This could help to access each resource types in a resource faster. + * @return Index map for all Resource Types. + */ + public static Map getResourceTypeIndex() { + return indexForResourceInformation; } /** @@ -264,6 +312,21 @@ static void initializeResourcesMap(Configuration conf, YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE); } + /** + * Get resource names array, this is mostly for performance perspective. Never + * modify returned array. + * + * @return resourceNamesArray + */ + public static String[] getResourceNamesArray() { + return resourceNamesArray; + } + + public static ResourceInformation[] getResourceTypesArray() { + getResourceTypes(null, YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE); + return readOnlyResourcesArray; + } + private static Map getResourceTypes( Configuration conf) { return getResourceTypes(conf, @@ -286,11 +349,21 @@ static void initializeResourcesMap(Configuration conf, initializeResourcesMap(conf, resources); lock = new Object(); } catch (FileNotFoundException fe) { - LOG.info("Unable to find '" + resourceFile - + "'. Falling back to memory and vcores as resources", fe); + LOG.info( + "Unable to find '" + resourceFile + + "'. Falling back to memory and vcores as resources", + fe); initializeResourcesMap(conf, resources); lock = new Object(); } + + // Update resource names. + resourceNamesArray = new String[resources.size()]; + int i = 0; + for (String s : readOnlyResources.keySet()) { + resourceNamesArray[i] = s; + i++; + } } } } @@ -383,6 +456,8 @@ public static String getUnits(String resourceValue) { initializeNodeResourceInformation(conf); addManadtoryResources(nodeResources); checkMandatatoryResources(nodeResources); + setMinimumAllocationForMandatoryResources(nodeResources, conf); + setMaximumAllocationForMandatoryResources(nodeResources, conf); readOnlyNodeResources = Collections.unmodifiableMap(nodeResources); nodeLock = new Object(); } @@ -437,24 +512,20 @@ synchronized public static void resetNodeResources() { } public static Resource getResourceTypesMinimumAllocation() { - Map resourceTypes = getResourceTypes(); Resource ret = Resource.newInstance(0, 0); - for (Map.Entry entry : resourceTypes - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : readOnlyResourcesArray) { + String name = entry.getName(); if (name.equals(ResourceInformation.MEMORY_MB.getName())) { - ret.setMemorySize(entry.getValue().getMinimumAllocation()); - continue; - } - if (name.equals(ResourceInformation.VCORES.getName())) { - Long tmp = entry.getValue().getMinimumAllocation(); + ret.setMemorySize(entry.getMinimumAllocation()); + } else if (name.equals(ResourceInformation.VCORES.getName())) { + Long tmp = entry.getMinimumAllocation(); if (tmp > Integer.MAX_VALUE) { tmp = (long) Integer.MAX_VALUE; } ret.setVirtualCores(tmp.intValue()); - continue; + } else { + ret.setResourceValue(name, entry.getMinimumAllocation()); } - ret.setResourceValue(name, entry.getValue().getMinimumAllocation()); } return ret; } @@ -464,24 +535,21 @@ public static Resource getResourceTypesMinimumAllocation() { * @return a Resource object with the maximum allocation for the scheduler */ public static Resource getResourceTypesMaximumAllocation() { - Map resourceTypes = getResourceTypes(); Resource ret = Resource.newInstance(0, 0); - for (Map.Entry entry : resourceTypes - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : readOnlyResourcesArray) { + String name = entry.getName(); if (name.equals(ResourceInformation.MEMORY_MB.getName())) { - ret.setMemorySize(entry.getValue().getMaximumAllocation()); - continue; - } - if (name.equals(ResourceInformation.VCORES.getName())) { - Long tmp = entry.getValue().getMaximumAllocation(); + ret.setMemorySize(entry.getMaximumAllocation()); + } else if (name.equals(ResourceInformation.VCORES.getName())) { + Long tmp = entry.getMaximumAllocation(); if (tmp > Integer.MAX_VALUE) { tmp = (long) Integer.MAX_VALUE; } ret.setVirtualCores(tmp.intValue()); continue; + } else { + ret.setResourceValue(name, entry.getMaximumAllocation()); } - ret.setResourceValue(name, entry.getValue().getMaximumAllocation()); } return ret; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 626ff9b..158c2ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -456,9 +456,8 @@ public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) { List pList) { Resource tmp = Resource.newInstance(0, 0); Map ret = new HashMap<>(); - for (Map.Entry entry : tmp.getResources() - .entrySet()) { - ret.put(entry.getKey(), 0L); + for (ResourceInformation entry : tmp.getResources()) { + ret.put(entry.getName(), 0L); } if (pList != null) { for (YarnProtos.StringLongMapProto p : pList) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java index 7bc7f5f..2dbaa87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java @@ -25,7 +25,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder; @@ -33,14 +35,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.UnitsConversionUtil; -import java.util.HashMap; import java.util.Map; -import java.util.Collections; @Private @Unstable -public class ResourcePBImpl extends Resource { +public class ResourcePBImpl extends BaseResource { private static final Log LOG = LogFactory.getLog(ResourcePBImpl.class); @@ -48,10 +48,6 @@ ResourceProto.Builder builder = null; boolean viaProto = false; - private Map resources; - private Map readOnlyResources; - - // call via ProtoUtils.convertToProtoFormat(Resource) static ResourceProto getProto(Resource r) { final ResourcePBImpl pb; @@ -72,8 +68,6 @@ public ResourcePBImpl() { public ResourcePBImpl(ResourceProto proto) { this.proto = proto; viaProto = true; - this.readOnlyResources = null; - this.resources = null; initResources(); } @@ -101,11 +95,13 @@ public int getMemory() { public long getMemorySize() { // memory should always be present initResources(); - ResourceInformation ri = - this.getResourceInformation(ResourceInformation.MEMORY_MB.getName()); - return UnitsConversionUtil - .convert(ri.getUnits(), ResourceInformation.MEMORY_MB.getUnits(), - ri.getValue()); + ResourceInformation ri = resources[MandatoryResources.MEMORY.getId()]; + + if (ri.getUnits().isEmpty()) { + return ri.getValue(); + } + return UnitsConversionUtil.convert(ri.getUnits(), + ResourceInformation.MEMORY_MB.getUnits(), ri.getValue()); } @Override @@ -117,23 +113,28 @@ public void setMemory(int memory) { @Override public void setMemorySize(long memory) { maybeInitBuilder(); - getResourceInformation(ResourceInformation.MEMORY_MB.getName()) - .setValue(memory); + try { + getResourceInformation(MEMORY).setValue(memory); + } catch (YarnException e) { + throw new ResourceNotFoundException(e); + } } @Override public int getVirtualCores() { // vcores should always be present initResources(); - return this.getResourceValue(ResourceInformation.VCORES.getName()) - .intValue(); + return (int) resources[MandatoryResources.VCORES.getId()].getValue(); } @Override public void setVirtualCores(int vCores) { maybeInitBuilder(); - getResourceInformation(ResourceInformation.VCORES.getName()) - .setValue(vCores); + try { + getResourceInformation(VCORES).setValue(vCores); + } catch (YarnException e) { + throw new ResourceNotFoundException(e); + } } private void initResources() { @@ -142,6 +143,7 @@ private void initResources() { } ResourceProtoOrBuilder p = viaProto ? proto : builder; initResourcesMap(); + Map indexMap = ResourceUtils.getResourceTypeIndex(); for (ResourceInformationProto entry : p.getResourceValueMapList()) { ResourceTypes type = entry.hasType() ? ProtoUtils.convertFromProtoFormat(entry.getType()) : @@ -150,12 +152,13 @@ private void initResources() { long value = entry.hasValue() ? entry.getValue() : 0L; ResourceInformation ri = ResourceInformation .newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE); - if (resources.containsKey(ri.getName())) { - resources.get(ri.getName()).setResourceType(ri.getResourceType()); - resources.get(ri.getName()).setUnits(ri.getUnits()); - resources.get(ri.getName()).setValue(value); - } else { + Integer index = indexMap.get(entry.getKey()); + if(index == null) { LOG.warn("Got unknown resource type: " + ri.getName() + "; skipping"); + } else { + resources[index].setResourceType(ri.getResourceType()); + resources[index].setUnits(ri.getUnits()); + resources[index].setValue(value); } } this.setMemorySize(p.getMemory()); @@ -174,8 +177,9 @@ public void setResourceInformation(String resource, resourceInformation.setName(resource); } initResources(); - if (resources.containsKey(resource)) { - ResourceInformation.copy(resourceInformation, resources.get(resource)); + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + if(index != null) { + ResourceInformation.copy(resourceInformation, resources[index]); } } @@ -187,65 +191,60 @@ public void setResourceValue(String resource, Long value) if (resource == null) { throw new IllegalArgumentException("resource type object cannot be null"); } - if (resources == null || (!resources.containsKey(resource))) { + + Integer index = ResourceUtils.getResourceTypeIndex().get(resource); + if (index == null) { throw new ResourceNotFoundException( "Resource " + resource + " not found"); } - resources.get(resource).setValue(value); + resources[index].setValue(value); } @Override - public Map getResources() { + public ResourceInformation[] getResources() { initResources(); - return readOnlyResources; + return super.getResources(); } @Override - public ResourceInformation getResourceInformation(String resource) { + public ResourceInformation getResourceInformation(String resource) + throws YarnException { initResources(); - if (this.resources.containsKey(resource)) { - return this.resources.get(resource); - } - throw new ResourceNotFoundException("Could not find entry for " + resource); + return super.getResourceInformation(resource); } @Override - public Long getResourceValue(String resource) { + public long getResourceValue(String resource) throws YarnException { initResources(); - if (this.resources.containsKey(resource)) { - return this.resources.get(resource).getValue(); - } - throw new ResourceNotFoundException("Could not find entry for " + resource); + return super.getResourceValue(resource); } private void initResourcesMap() { if (resources == null) { - resources = new HashMap<>(); - Map types = ResourceUtils.getResourceTypes(); + ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); if (types == null) { throw new YarnRuntimeException( "Got null return value from ResourceUtils.getResourceTypes()"); } - for (Map.Entry entry : types.entrySet()) { - resources.put(entry.getKey(), - ResourceInformation.newInstance(entry.getValue())); + + resources = new ResourceInformation[types.length]; + for (ResourceInformation entry : types) { + int index = ResourceUtils.getResourceTypeIndex().get(entry.getName()); + resources[index] = ResourceInformation.newInstance(entry); } - readOnlyResources = Collections.unmodifiableMap(resources); } } synchronized private void mergeLocalToBuilder() { builder.clearResourceValueMap(); - if (resources != null && !resources.isEmpty()) { - for (Map.Entry entry : - resources.entrySet()) { - ResourceInformationProto.Builder e = - ResourceInformationProto.newBuilder(); - e.setKey(entry.getKey()); - e.setUnits(entry.getValue().getUnits()); - e.setType( - ProtoUtils.converToProtoFormat(entry.getValue().getResourceType())); - e.setValue(entry.getValue().getValue()); + if(resources != null && resources.length != 0) { + for (ResourceInformation resInfo : resources) { + ResourceInformationProto.Builder e = ResourceInformationProto + .newBuilder(); + e.setKey(resInfo.getName()); + e.setUnits(resInfo.getUnits()); + e.setType(ProtoUtils.converToProtoFormat(resInfo.getResourceType())); + e.setValue(resInfo.getValue()); builder.addResourceValueMap(e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 79bb03d..02db115 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -56,10 +56,11 @@ LogFactory.getLog(DominantResourceCalculator.class); - private Set resourceNames; + private String[] resourceNames; public DominantResourceCalculator() { - resourceNames = ResourceUtils.getResourceTypes().keySet(); + ResourceUtils.getResourceTypes(); + resourceNames = ResourceUtils.getResourceNamesArray(); } /** @@ -206,14 +207,9 @@ public float divide(Resource clusterResource, @Override public boolean isInvalidDivisor(Resource r) { - for (String resource : resourceNames) { - try { - if (r.getResourceValue(resource).equals(0L)) { - return true; - } - } catch (YarnException ye) { - throw new IllegalArgumentException( - "Error getting resource value for " + resource, ye); + for (ResourceInformation res : r.getResources()) { + if (res.getValue() == 0L) { + return true; } } return false; @@ -372,25 +368,26 @@ private Resource multiplyAndNormalize(Resource r, double by, Resource ret = Resource.newInstance(r); for (String resource : resourceNames) { try { - ResourceInformation rResourceInformation = - r.getResourceInformation(resource); - ResourceInformation stepFactorResourceInformation = - stepFactor.getResourceInformation(resource); + ResourceInformation rResourceInformation = r + .getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = stepFactor + .getResourceInformation(resource); ResourceInformation tmp = ret.getResourceInformation(resource); - Long rValue = rResourceInformation.getValue(); - Long stepFactorValue = UnitsConversionUtil - .convert(stepFactorResourceInformation.getUnits(), - rResourceInformation.getUnits(), - stepFactorResourceInformation.getValue()); - Long value; + long rValue = rResourceInformation.getValue(); + long stepFactorValue = UnitsConversionUtil.convert( + stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); + long value; if (stepFactorValue != 0) { - value = roundUp ? - roundUp((long) Math.ceil(rValue * by), stepFactorValue) : - roundDown((long) (rValue * by), stepFactorValue); + value = roundUp + ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) + : roundDown((long) (rValue * by), stepFactorValue); } else { - value = - roundUp ? (long) Math.ceil(rValue * by) : (long) (rValue * by); + value = roundUp + ? (long) Math.ceil(rValue * by) + : (long) (rValue * by); } tmp.setValue(value); } catch (YarnException ye) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index d143e93..15fdd35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -22,12 +22,11 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.UnitsConversionUtil; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -39,9 +38,9 @@ * Helper class to create a resource with a fixed value for all resource * types. For example, a NONE resource which returns 0 for any resource type. */ - static class FixedValueResource extends Resource { + static class FixedValueResource extends BaseResource { - private Map resources; + private Map resourcesMap; private long resourceValue; private String name; @@ -53,7 +52,9 @@ FixedValueResource(String rName, long value) { this.resourceValue = value; this.name = rName; - resources = initResourceMap(); + resourcesMap = initResourceMap(); + resources = (ResourceInformation[]) this.resourcesMap.values() + .toArray(new ResourceInformation[this.resourcesMap.size()]); } private int resourceValueToInt() { @@ -96,31 +97,6 @@ public void setVirtualCores(int virtualCores) { } @Override - public Map getResources() { - return Collections.unmodifiableMap(this.resources); - } - - @Override - public ResourceInformation getResourceInformation(String resource) - throws YarnException { - if (resources.containsKey(resource)) { - ResourceInformation value = this.resources.get(resource); - ResourceInformation ret = ResourceInformation.newInstance(value); - ret.setValue(resourceValue); - return ret; - } - throw new YarnException("" + resource + " not found"); - } - - @Override - public Long getResourceValue(String resource) throws YarnException { - if (resources.containsKey(resource)) { - return resourceValue; - } - throw new YarnException("" + resource + " not found"); - } - - @Override public void setResourceInformation(String resource, ResourceInformation resourceInformation) throws ResourceNotFoundException { @@ -135,12 +111,12 @@ public void setResourceValue(String resource, Long value) private Map initResourceMap() { Map tmp = new HashMap<>(); - Map types = ResourceUtils.getResourceTypes(); + ResourceInformation[] types = ResourceUtils.getResourceTypesArray(); if (types != null) { - for (Map.Entry entry : types.entrySet()) { - tmp.put(entry.getKey(), - ResourceInformation.newInstance(entry.getValue())); - tmp.get(entry.getKey()).setValue(resourceValue); + for (ResourceInformation entry : types) { + tmp.put(entry.getName(), + ResourceInformation.newInstance(entry)); + tmp.get(entry.getName()).setValue(resourceValue); } } // this is a fix for getVirtualCores returning an int @@ -150,7 +126,6 @@ public void setResourceValue(String resource, Long value) } return tmp; } - } public static Resource createResource(int memory) { @@ -197,15 +172,16 @@ public static Resource clone(Resource res) { } public static Resource addTo(Resource lhs, Resource rhs) { - for (Map.Entry entry : lhs.getResources() - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : lhs.getResources()) { + String name = entry.getName(); try { ResourceInformation rhsValue = rhs.getResourceInformation(name); - ResourceInformation lhsValue = entry.getValue(); - long convertedRhs = UnitsConversionUtil - .convert(rhsValue.getUnits(), lhsValue.getUnits(), - rhsValue.getValue()); + ResourceInformation lhsValue = entry; + + long convertedRhs = (rhsValue.getUnits().equals(lhsValue.getUnits())) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); lhs.setResourceValue(name, lhsValue.getValue() + convertedRhs); } catch (YarnException ye) { continue; @@ -219,15 +195,16 @@ public static Resource add(Resource lhs, Resource rhs) { } public static Resource subtractFrom(Resource lhs, Resource rhs) { - for (Map.Entry entry : lhs.getResources() - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : lhs.getResources()) { + String name = entry.getName(); try { ResourceInformation rhsValue = rhs.getResourceInformation(name); - ResourceInformation lhsValue = entry.getValue(); - long convertedRhs = UnitsConversionUtil - .convert(rhsValue.getUnits(), lhsValue.getUnits(), - rhsValue.getValue()); + ResourceInformation lhsValue = entry; + + long convertedRhs = (rhsValue.getUnits().equals(lhsValue.getUnits())) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); lhs.setResourceValue(name, lhsValue.getValue() - convertedRhs); } catch (YarnException ye) { continue; @@ -263,10 +240,9 @@ public static Resource negate(Resource resource) { } public static Resource multiplyTo(Resource lhs, double by) { - for (Map.Entry entry : lhs.getResources() - .entrySet()) { - String name = entry.getKey(); - ResourceInformation lhsValue = entry.getValue(); + for (ResourceInformation entry : lhs.getResources()) { + String name = entry.getName(); + ResourceInformation lhsValue = entry; lhs.setResourceValue(name, (long) (lhsValue.getValue() * by)); } return lhs; @@ -282,15 +258,18 @@ public static Resource multiply(Resource lhs, double by) { */ public static Resource multiplyAndAddTo( Resource lhs, Resource rhs, double by) { - for (Map.Entry entry : lhs.getResources() - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : lhs.getResources()) { + String name = entry.getName(); try { ResourceInformation rhsValue = rhs.getResourceInformation(name); - ResourceInformation lhsValue = entry.getValue(); - long convertedRhs = (long) (UnitsConversionUtil - .convert(rhsValue.getUnits(), lhsValue.getUnits(), - rhsValue.getValue()) * by); + ResourceInformation lhsValue = entry; + + long convertedRhs = (long) (((rhsValue.getUnits() + .equals(lhsValue.getUnits())) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue())) + * by); lhs.setResourceValue(name, lhsValue.getValue() + convertedRhs); } catch (YarnException ye) { continue; @@ -311,10 +290,9 @@ public static Resource multiplyAndNormalizeDown( public static Resource multiplyAndRoundDown(Resource lhs, double by) { Resource out = clone(lhs); - for (Map.Entry entry : out.getResources() - .entrySet()) { - String name = entry.getKey(); - ResourceInformation lhsValue = entry.getValue(); + for (ResourceInformation entry : out.getResources()) { + String name = entry.getName(); + ResourceInformation lhsValue = entry; out.setResourceValue(name, (long) (lhsValue.getValue() * by)); } return out; @@ -416,15 +394,16 @@ public static Resource max( } public static boolean fitsIn(Resource smaller, Resource bigger) { - for (Map.Entry entry : smaller.getResources() - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : smaller.getResources()) { + String name = entry.getName(); try { ResourceInformation rhsValue = bigger.getResourceInformation(name); - ResourceInformation lhsValue = entry.getValue(); - long convertedRhs = UnitsConversionUtil - .convert(rhsValue.getUnits(), lhsValue.getUnits(), - rhsValue.getValue()); + ResourceInformation lhsValue = entry; + + long convertedRhs = (rhsValue.getUnits().equals(lhsValue.getUnits())) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); if(lhsValue.getValue() > convertedRhs) { return false; } @@ -442,15 +421,16 @@ public static boolean fitsIn(ResourceCalculator rc, Resource cluster, public static Resource componentwiseMin(Resource lhs, Resource rhs) { Resource ret = createResource(0); - for (Map.Entry entry : lhs.getResources() - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : lhs.getResources()) { + String name = entry.getName(); try { ResourceInformation rhsValue = rhs.getResourceInformation(name); - ResourceInformation lhsValue = entry.getValue(); - long convertedRhs = UnitsConversionUtil - .convert(rhsValue.getUnits(), lhsValue.getUnits(), - rhsValue.getValue()); + ResourceInformation lhsValue = entry; + + long convertedRhs = (rhsValue.getUnits().equals(lhsValue.getUnits())) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); ResourceInformation outInfo = lhsValue.getValue() < convertedRhs ? lhsValue : rhsValue; ret.setResourceInformation(name, outInfo); @@ -463,15 +443,16 @@ public static Resource componentwiseMin(Resource lhs, Resource rhs) { public static Resource componentwiseMax(Resource lhs, Resource rhs) { Resource ret = createResource(0); - for (Map.Entry entry : lhs.getResources() - .entrySet()) { - String name = entry.getKey(); + for (ResourceInformation entry : lhs.getResources()) { + String name = entry.getName(); try { ResourceInformation rhsValue = rhs.getResourceInformation(name); - ResourceInformation lhsValue = entry.getValue(); - long convertedRhs = UnitsConversionUtil - .convert(rhsValue.getUnits(), lhsValue.getUnits(), - rhsValue.getValue()); + ResourceInformation lhsValue = entry; + + long convertedRhs = (rhsValue.getUnits().equals(lhsValue.getUnits())) + ? rhsValue.getValue() + : UnitsConversionUtil.convert(rhsValue.getUnits(), + lhsValue.getUnits(), rhsValue.getValue()); ResourceInformation outInfo = lhsValue.getValue() > convertedRhs ? lhsValue : rhsValue; ret.setResourceInformation(name, outInfo); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java index 38554b6..b530150 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java @@ -276,13 +276,17 @@ public void testGetResourceInformation() throws Exception { String resourceFile = entry.getKey(); ResourceUtils.resetNodeResources(); File dest; - File source = - new File(conf.getClassLoader().getResource(resourceFile).getFile()); + File source = new File( + conf.getClassLoader().getResource(resourceFile).getFile()); dest = new File(source.getParent(), "node-resources.xml"); FileUtils.copyFile(source, dest); - Map actual = - ResourceUtils.getNodeResourceInformation(conf); - Assert.assertEquals(entry.getValue().getResources(), actual); + Map actual = ResourceUtils + .getNodeResourceInformation(conf); + Assert.assertEquals(actual.size(), + entry.getValue().getResources().length); + for (ResourceInformation resInfo : entry.getValue().getResources()) { + Assert.assertEquals(resInfo, actual.get(resInfo.getName())); + } dest.delete(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java index 1555e55..8ea3192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResources.java @@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils; +import org.apache.hadoop.yarn.util.resource.TestResourceUtils.ResourceFileInformation; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,8 +59,11 @@ public static Resource none() { private void setupExtraResourceType() throws Exception { Configuration conf = new YarnConfiguration(); + ResourceFileInformation testFile = + new ResourceFileInformation("resource-types-3.xml", 3); + testFile.resourceNameUnitsMap.put(EXTRA_RESOURCE_TYPE, ""); resourceTypesFile = - TestResourceUtils.setupResourceTypes(conf, "resource-types-3.xml"); + TestResourceUtils.setupResourceTypes(conf, testFile.filename); } private void unsetExtraResourceType() { @@ -246,7 +251,9 @@ public void testMultiplyAndRoundDown() { } @Test - public void testMultiplyAndAddTo() { + public void testMultiplyAndAddTo() throws Exception { + unsetExtraResourceType(); + setupExtraResourceType(); assertEquals(createResource(6, 4), multiplyAndAddTo(createResource(3, 1), createResource(2, 2), 1.5)); assertEquals(createResource(6, 4, 0), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java index 7987ded..ab33336 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java @@ -119,13 +119,13 @@ private void loadProfiles() throws IOException { private Resource parseResource(String key, Map value) throws IOException { Resource resource = Resource.newInstance(0, 0); Iterator iterator = value.entrySet().iterator(); - Map resourceTypes = - ResourceUtils.getResourceTypes(); + Map resourceTypes = ResourceUtils + .getResourceTypes(); while (iterator.hasNext()) { Map.Entry resourceEntry = (Map.Entry) iterator.next(); String resourceName = resourceEntry.getKey().toString(); - ResourceInformation resourceValue = - fromString(resourceName, resourceEntry.getValue().toString()); + ResourceInformation resourceValue = fromString(resourceName, + resourceEntry.getValue().toString()); if (resourceName.equals(MEMORY)) { resource.setMemorySize(resourceValue.getValue()); continue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java index ff18223..c514cb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptMetrics.java @@ -176,16 +176,15 @@ public void updateAggregatePreemptedAppResourceUsage( private void updateUsageMap(Resource allocated, long deltaUsedMillis, Map targetMap) { - for (Map.Entry entry : allocated.getResources() - .entrySet()) { + for (ResourceInformation entry : allocated.getResources()) { AtomicLong resourceUsed; - if (!targetMap.containsKey(entry.getKey())) { + if (!targetMap.containsKey(entry.getName())) { resourceUsed = new AtomicLong(0); - targetMap.put(entry.getKey(), resourceUsed); + targetMap.put(entry.getName(), resourceUsed); } - resourceUsed = targetMap.get(entry.getKey()); - resourceUsed.addAndGet((entry.getValue().getValue() * deltaUsedMillis) + resourceUsed = targetMap.get(entry.getName()); + resourceUsed.addAndGet((entry.getValue() * deltaUsedMillis) / DateUtils.MILLIS_PER_SECOND); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 811d9dc..b571322 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -984,13 +984,12 @@ private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() { for (RMContainer rmContainer : this.liveContainers.values()) { long usedMillis = currentTimeMillis - rmContainer.getCreationTime(); Resource resource = rmContainer.getContainer().getResource(); - for (Map.Entry entry : resource - .getResources().entrySet()) { + for (ResourceInformation entry : resource.getResources()) { long value = RMServerUtils - .getOrDefault(resourceSecondsMap, entry.getKey(), 0L); - value += entry.getValue().getValue() * usedMillis + .getOrDefault(resourceSecondsMap, entry.getName(), 0L); + value += entry.getValue() * usedMillis / DateUtils.MILLIS_PER_SECOND; - resourceSecondsMap.put(entry.getKey(), value); + resourceSecondsMap.put(entry.getName(), value); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 3f8ed55..12aff02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -63,9 +63,14 @@ public static void recordRejectedAppActivityFromLeafQueue( SchedulerApplicationAttempt application, Priority priority, String diagnostic) { String type = "app"; - recordActivity(activitiesManager, node, application.getQueueName(), - application.getApplicationId().toString(), priority, - ActivityState.REJECTED, diagnostic, type); + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + recordActivity(activitiesManager, node, application.getQueueName(), + application.getApplicationId().toString(), priority, + ActivityState.REJECTED, diagnostic, type); + } finishSkippedAppAllocationRecording(activitiesManager, application.getApplicationId(), ActivityState.REJECTED, diagnostic); } @@ -203,8 +208,13 @@ public static void finishSkippedAppAllocationRecording( public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - recordActivity(activitiesManager, node, parentQueueName, queueName, null, - state, diagnostic, null); + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + recordActivity(activitiesManager, node, parentQueueName, queueName, + null, state, diagnostic, null); + } } } @@ -266,13 +276,10 @@ public static void startNodeUpdateRecording( private static void recordActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentName, String childName, Priority priority, ActivityState state, String diagnostic, String type) { - if (activitiesManager == null) { - return; - } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), - parentName, childName, priority != null ? priority.toString() : null, - state, diagnostic, type); - } + + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName, + childName, priority != null ? priority.toString() : null, state, + diagnostic, type); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 013a5ac..2e502b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1026,6 +1026,8 @@ public CSAssignment assignContainers(Resource clusterResource, return CSAssignment.NULL_ASSIGNMENT; } + Map userLimits = new HashMap<>(); + boolean needAssignToQueueCheck = true; for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext(); ) { @@ -1035,24 +1037,50 @@ public CSAssignment assignContainers(Resource clusterResource, node.getNodeID(), SystemClock.getInstance().getTime(), application); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, application, application.getPriority(), - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + Resource appReserved = application.getCurrentReservation(); + if (needAssignToQueueCheck) { + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, appReserved, schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } + // If there was no reservation and canAssignToThisQueue returned + // true, there is no reason to check further. + if (!this.reservationsContinueLooking + || appReserved.equals(Resources.none())) { + needAssignToQueueCheck = false; + } } + CachedUserLimit cul = userLimits.get(application.getUser()); + Resource cachedUserLimit = null; + if (cul != null) { + cachedUserLimit = cul.userLimit; + } Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, ps.getPartition(), schedulingMode); - + clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit); + if (cul == null) { + cul = new CachedUserLimit(userLimit); + userLimits.put(application.getUser(), cul); + } // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, ps.getPartition(), currentResourceLimits)) { + boolean userAssignable = true; + if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { + userAssignable = false; + } else { + userAssignable = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, node.getPartition(), currentResourceLimits); + if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { + cul.canAssign = false; + cul.reservation = appReserved; + } + } + if (!userAssignable) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( @@ -1127,7 +1155,7 @@ public boolean accept(Resource cluster, // check user-limit Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p, - allocation.getSchedulingMode()); + allocation.getSchedulingMode(), null); // Deduct resources that we can release Resource usedResource = Resources.clone(getUser(username).getUsed(p)); @@ -1332,19 +1360,20 @@ private void setQueueResourceLimitsInfo( @Lock({LeafQueue.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, String nodePartition, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, Resource userLimit) { String user = application.getUser(); User queueUser = getUser(user); // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - getResourceLimitForActiveUsers(application.getUser(), clusterResource, - nodePartition, schedulingMode); - + if (userLimit == null) { + userLimit = getResourceLimitForActiveUsers(application.getUser(), + clusterResource, nodePartition, schedulingMode); + } setQueueResourceLimitsInfo(clusterResource); Resource headroom = + metrics.getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit, nodePartition); @@ -1352,7 +1381,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" - + queueUser.getUsed() + " headroom=" + headroom + " partition=" + + queueUser.getUsed() + " partition=" + nodePartition); } @@ -1713,7 +1742,7 @@ public void updateClusterResource(Resource clusterResource, .getSchedulableEntities()) { computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); } } finally { writeLock.unlock(); @@ -2052,4 +2081,14 @@ public void stopQueue() { public Set getAllUsers() { return this.getUsersManager().getUsers().keySet(); } + + static class CachedUserLimit { + final Resource userLimit; + boolean canAssign = true; + Resource reservation = Resources.none(); + + CachedUserLimit(Resource userLimit) { + this.userLimit = userLimit; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerInfo.java index 887b854..81491b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/SchedulerInfo.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; +import java.util.Arrays; import java.util.EnumSet; import javax.xml.bind.annotation.XmlRootElement; @@ -73,7 +74,7 @@ public ResourceInfo getMaxAllocation() { } public String getSchedulerResourceTypes() { - return minAllocResource.getResource().getResources().keySet().toString(); + return Arrays.toString(minAllocResource.getResource().getResources()); } public int getMaxClusterLevelAppPriority() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3c6e6df..143d5b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -33,9 +33,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -50,6 +52,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -90,7 +93,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; -import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -156,8 +158,12 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -3492,6 +3498,7 @@ public void testApplicationHeadRoom() throws Exception { rm.stop(); } + @Test public void testHeadRoomCalculationWithDRC() throws Exception { // test with total cluster resource of 20GB memory and 20 vcores. @@ -4074,6 +4081,143 @@ public void testCSReservationWithRootUnblocked() throws Exception { rm.stop(); } + @Test (timeout = 300000) + public void testUserLimitThroughput() throws Exception { + // Since this is more of a performance unit test, only run if + // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) + Assume.assumeTrue(Boolean.valueOf( + System.getProperty("RunUserLimitThroughput"))); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + + // For now make user limit large so we can activate all applications + qb.setUserLimitFactor((float)100.0); + qb.setupConfigurableCapacities(); + + SchedulerEvent addAppEvent; + SchedulerEvent addAttemptEvent; + Container container = mock(Container.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + + final int appCount = 100; + ApplicationId[] appids = new ApplicationId[appCount]; + RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount]; + ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount]; + RMAppImpl[] apps = new RMAppImpl[appCount]; + RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount]; + for (int i=0; i loggers=LogManager.getCurrentLoggers(); + loggers.hasMoreElements(); ) { + Logger logger = (Logger) loggers.nextElement(); + logger.setLevel(Level.WARN); + } + final int topn = 20; + final int iterations = 2000000; + final int printInterval = 20000; + final float numerator = 1000.0f * printInterval; + PriorityQueue queue = new PriorityQueue<>(topn, + Collections.reverseOrder()); + + long n = Time.monotonicNow(); + long timespent = 0; + for (int i = 0; i < iterations; i+=2) { + if (i > 0 && i % printInterval == 0){ + long ts = (Time.monotonicNow() - n); + if (queue.size() < topn) { + queue.offer(ts); + } else { + Long last = queue.peek(); + if (last > ts) { + queue.poll(); + queue.offer(ts); + } + } + System.out.println(i + " " + (numerator / ts)); + n= Time.monotonicNow(); + } + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + } + timespent=0; + int entries = queue.size(); + while(queue.size() > 0){ + long l = queue.poll(); + timespent += l; + } + System.out.println("Avg of fastest " + entries + ": " + + numerator / (timespent / entries)); + rm.stop(); + } + @Test public void testCSQueueBlocked() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 4417132..2864d7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1146,7 +1146,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //maxqueue 16G, userlimit 13G, - 4G used = 9G assertEquals(9*GB,app_0.getHeadroom().getMemorySize()); @@ -1169,7 +1169,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(8*GB, qb.getUsedResources().getMemorySize()); assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize()); @@ -1219,7 +1219,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(4*GB, qb.getUsedResources().getMemorySize()); //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both) assertEquals(5*GB, app_3.getHeadroom().getMemorySize()); @@ -1240,9 +1240,9 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //app3 is user1, active from last test case -- 2.10.1 (Apple Git-78)