diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 9e54ac6298a..94312102a49 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -37,6 +38,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; /** * Helper library that: @@ -60,11 +63,16 @@ Resources.clone(Resources.none()); // Max allocation - private long maxNodeMemory = -1; - private int maxNodeVCores = -1; + private final long[] maxResources; private Resource configuredMaxAllocation; private boolean forceConfiguredMaxAllocation = true; private long configuredMaxAllocationWaitTime; + private boolean reportedMaxAllocation = false; + + public ClusterNodeTracker() { + maxResources = new long[ResourceUtils.getNumberOfKnownResourceTypes()]; + Arrays.fill(maxResources, -1); + } public void addNode(N node) { writeLock.lock(); @@ -208,17 +216,19 @@ public Resource getMaxAllowedAllocation() { forceConfiguredMaxAllocation = false; } - if (forceConfiguredMaxAllocation - || maxNodeMemory == -1 || maxNodeVCores == -1) { + if (forceConfiguredMaxAllocation || !reportedMaxAllocation) { return configuredMaxAllocation; } Resource ret = Resources.clone(configuredMaxAllocation); - if (ret.getMemorySize() > maxNodeMemory) { - ret.setMemorySize(maxNodeMemory); - } - if (ret.getVirtualCores() > maxNodeVCores) { - ret.setVirtualCores(maxNodeVCores); + + for (int i = 0; i < maxResources.length; i++) { + ResourceInformation info = ret.getResourceInformation(i); + long value = info.getValue(); + + if (value > maxResources[i]) { + info.setValue(maxResources[i]); + } } return ret; @@ -229,31 +239,41 @@ public Resource getMaxAllowedAllocation() { private void updateMaxResources(SchedulerNode node, boolean add) { Resource totalResource = node.getTotalResource(); + writeLock.lock(); + try { if (add) { // added node - long nodeMemory = totalResource.getMemorySize(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - } - int nodeVCores = totalResource.getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; + // If we add a node, we must have a max allocation for all resource + // types + reportedMaxAllocation = true; + + for (int i = 0; i < maxResources.length; i++) { + long value = totalResource.getResourceInformation(i).getValue(); + + if (value > maxResources[i]) { + maxResources[i] = value; + } } } else { // removed node - if (maxNodeMemory == totalResource.getMemorySize()) { - maxNodeMemory = -1; - } - if (maxNodeVCores == totalResource.getVirtualCores()) { - maxNodeVCores = -1; + boolean recalculate = false; + + for (int i = 0; i < maxResources.length; i++) { + long value = totalResource.getResourceInformation(i).getValue(); + + if (value == maxResources[i]) { + // No need to set reportedMaxAllocation to false here because we + // will recalculate before we release the lock. + maxResources[i] = -1; + recalculate = true; + } } + // We only have to iterate through the nodes if the current max memory // or vcores was equal to the removed node's - if (maxNodeMemory == -1 || maxNodeVCores == -1) { + if (recalculate) { // Treat it like an empty cluster and add nodes - for (N n : nodes.values()) { - updateMaxResources(n, true); - } + nodes.values().forEach(n -> updateMaxResources(n, true)); } } } finally {