diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 5d4d7e2..f756cee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,6 +22,8 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -82,8 +84,9 @@ private Resource configuredMaximumAllocation; private int maxNodeMemory = -1; private int maxNodeVCores = -1; - private ReentrantReadWriteLock maximumAllocationLock = - new ReentrantReadWriteLock(); + private final ReadLock maxAllocReadLock; + private final WriteLock maxAllocWriteLock; + private boolean useConfiguredMaximumAllocationOnly = true; private long configuredMaximumAllocationWaitTime; @@ -103,6 +106,9 @@ */ public AbstractYarnScheduler(String name) { super(name); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.maxAllocReadLock = lock.readLock(); + this.maxAllocWriteLock = lock.writeLock(); } @Override @@ -157,8 +163,7 @@ public Resource getMinimumResourceCapability() { @Override public Resource getMaximumResourceCapability() { Resource maxResource; - ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock(); - readLock.lock(); + maxAllocReadLock.lock(); try { if (useConfiguredMaximumAllocationOnly) { if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() @@ -170,22 +175,20 @@ public Resource getMaximumResourceCapability() { maxResource = Resources.clone(maximumAllocation); } } finally { - readLock.unlock(); + maxAllocReadLock.unlock(); } return maxResource; } protected void initMaximumResourceCapability(Resource maximumAllocation) { - ReentrantReadWriteLock.WriteLock writeLock = - maximumAllocationLock.writeLock(); - writeLock.lock(); + maxAllocWriteLock.lock(); try { if (this.configuredMaximumAllocation == null) { this.configuredMaximumAllocation = Resources.clone(maximumAllocation); this.maximumAllocation = Resources.clone(maximumAllocation); } } finally { - writeLock.unlock(); + maxAllocWriteLock.unlock(); } } @@ -541,7 +544,7 @@ public synchronized void updateNodeResource(RMNode nm, Resource oldResource = node.getTotalResource(); if(!oldResource.equals(newResource)) { // Log resource change - LOG.info("Update resource on node: " + node.getNodeName() + LOG.info("Update resource on node: " + node.getNodeName() + " from: " + oldResource + ", to: " + newResource); @@ -551,6 +554,48 @@ public synchronized void updateNodeResource(RMNode nm, // update resource to clusterResource Resources.subtractFrom(clusterResource, oldResource); Resources.addTo(clusterResource, newResource); + + // update maximumAllocation after node resource was changed + maxAllocWriteLock.lock(); + try { + int oldNodeMemory = oldResource.getMemory(); + int newNodeMemory = newResource.getMemory(); + if (oldNodeMemory < newNodeMemory) { + // change maxNodeMemory if new memory is bigger than old memory and + // new memory is bigger than old maxNodeMemory + if (newNodeMemory > maxNodeMemory) { + maxNodeMemory = newNodeMemory; + maximumAllocation.setMemory(Math.min( + configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + } else if ((oldNodeMemory == maxNodeMemory) && + (oldNodeMemory > newNodeMemory)) { + // set maxNodeMemory invalid if old memory is equal to maxNodeMemory + // and old memory is bigger than new memory + maxNodeMemory = -1; + } + + int oldNodeVCores = oldResource.getVirtualCores(); + int newNodeVCores = newResource.getVirtualCores(); + if (oldNodeVCores < newNodeVCores) { + // change maxNodeVCores if new VCores is bigger than old VCores and + // new VCores is bigger than old maxNodeVCores + if (newNodeVCores > maxNodeVCores) { + maxNodeVCores = newNodeVCores; + maximumAllocation.setVirtualCores(Math.min( + configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); + } + } else if ((oldNodeVCores == maxNodeVCores) && + (oldNodeVCores > newNodeVCores)) { + // set maxNodeVCores invalid if old VCores is equal to maxNodeVCores + // and old VCores is bigger than new VCores + maxNodeVCores = -1; + } + + updateMaximumAllocation(); + } finally { + maxAllocWriteLock.unlock(); + } } else { // Log resource change LOG.warn("Update resource on node: " + node.getNodeName() @@ -571,9 +616,7 @@ public synchronized void updateNodeResource(RMNode nm, } protected void updateMaximumAllocation(SchedulerNode node, boolean add) { - ReentrantReadWriteLock.WriteLock writeLock = - maximumAllocationLock.writeLock(); - writeLock.lock(); + maxAllocWriteLock.lock(); try { if (add) { // added node int nodeMemory = node.getTotalResource().getMemory(); @@ -597,35 +640,39 @@ protected void updateMaximumAllocation(SchedulerNode node, boolean add) { } // We only have to iterate through the nodes if the current max memory // or vcores was equal to the removed node's - if (maxNodeMemory == -1 || maxNodeVCores == -1) { - for (Map.Entry nodeEntry : nodes.entrySet()) { - int nodeMemory = - nodeEntry.getValue().getTotalResource().getMemory(); - if (nodeMemory > maxNodeMemory) { - maxNodeMemory = nodeMemory; - } - int nodeVCores = - nodeEntry.getValue().getTotalResource().getVirtualCores(); - if (nodeVCores > maxNodeVCores) { - maxNodeVCores = nodeVCores; - } - } - if (maxNodeMemory == -1) { // no nodes - maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); - } else { - maximumAllocation.setMemory( - Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); - } - if (maxNodeVCores == -1) { // no nodes - maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); - } else { - maximumAllocation.setVirtualCores( - Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); - } - } + updateMaximumAllocation(); } } finally { - writeLock.unlock(); + maxAllocWriteLock.unlock(); + } + } + + private void updateMaximumAllocation() { + if (maxNodeMemory == -1 || maxNodeVCores == -1) { + for (Map.Entry nodeEntry : nodes.entrySet()) { + int nodeMemory = + nodeEntry.getValue().getTotalResource().getMemory(); + if (nodeMemory > maxNodeMemory) { + maxNodeMemory = nodeMemory; + } + int nodeVCores = + nodeEntry.getValue().getTotalResource().getVirtualCores(); + if (nodeVCores > maxNodeVCores) { + maxNodeVCores = nodeVCores; + } + } + if (maxNodeMemory == -1) { // no nodes + maximumAllocation.setMemory(configuredMaximumAllocation.getMemory()); + } else { + maximumAllocation.setMemory( + Math.min(configuredMaximumAllocation.getMemory(), maxNodeMemory)); + } + if (maxNodeVCores == -1) { // no nodes + maximumAllocation.setVirtualCores(configuredMaximumAllocation.getVirtualCores()); + } else { + maximumAllocation.setVirtualCores( + Math.min(configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 27b20d4..a0ad740 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -279,6 +280,54 @@ public void testUpdateMaxAllocationUsesTotal() throws IOException { } } + @Test + public void testMaxAllocationAfterUpdateNodeResource() throws IOException { + final int configuredMaxVCores = 20; + final int configuredMaxMemory = 10 * 1024; + Resource configuredMaximumResource = Resource.newInstance + (configuredMaxMemory, configuredMaxVCores); + + configureScheduler(); + YarnConfiguration conf = getConf(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + configuredMaxVCores); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + configuredMaxMemory); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + 0); + + MockRM rm = new MockRM(conf); + try { + rm.start(); + AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm + .getResourceScheduler(); + verifyMaximumResourceCapability(configuredMaximumResource, scheduler); + + Resource resource1 = Resource.newInstance(1024, 5); + Resource resource2 = Resource.newInstance(2048, 10); + RMNode node1 = MockNodes.newNodeInfo( + 0, resource1, 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + RMNode node2 = MockNodes.newNodeInfo( + 0, Resources.createResource(512, 1), 2, "127.0.0.3"); + scheduler.handle(new NodeAddedSchedulerEvent(node2)); + verifyMaximumResourceCapability(resource1, scheduler); + + // increase node resource + scheduler.updateNodeResource(node1, ResourceOption.newInstance( + resource2, 0)); + verifyMaximumResourceCapability(resource2, scheduler); + + // decrease node resource + scheduler.updateNodeResource(node1, ResourceOption.newInstance( + resource1, 0)); + verifyMaximumResourceCapability(resource1, scheduler); + } finally { + rm.stop(); + } + } + private void verifyMaximumResourceCapability( Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {