diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 0a01c60e538..d126f0980b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -832,4 +832,8 @@ public long getAggegatedReleasedContainers() { public long getAggregatePreemptedContainers() { return aggregateContainersPreempted.value(); } + + public QueueMetricsForCustomResources getQueueMetricsForCustomResources() { + return queueMetricsForCustomResources; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java index e8c88979209..34708580274 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java @@ -101,4 +101,8 @@ public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) { QueueMetricsCustomResource getAggregatePreemptedSeconds() { return aggregatePreemptedSeconds; } + + public QueueMetricsCustomResource getAvailable() { + return available; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3deddee5d6e..ba9fcb3bbd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsForCustomResources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; @@ -505,16 +508,35 @@ public ActiveUsersManager getAbstractUsersManager() { */ private Resource computeMaxAMResource() { Resource maxResource = Resources.clone(getFairShare()); + if (maxResource.getMemorySize() == 0) { - maxResource.setMemorySize( - Math.min(scheduler.getRootQueueMetrics().getAvailableMB(), - getMaxShare().getMemorySize())); + long value = Math.min(scheduler.getRootQueueMetrics().getAvailableMB(), + getMaxShare().getMemorySize()); + maxResource.setMemorySize(value); } if (maxResource.getVirtualCores() == 0) { - maxResource.setVirtualCores(Math.min( + int value = Math.min( scheduler.getRootQueueMetrics().getAvailableVirtualCores(), - getMaxShare().getVirtualCores())); + getMaxShare().getVirtualCores()); + maxResource.setVirtualCores(value); + } + + QueueMetricsForCustomResources metricsForCustomResources = + scheduler.getRootQueueMetrics().getQueueMetricsForCustomResources(); + + if (metricsForCustomResources != null) { + QueueMetricsCustomResource availableResources = + metricsForCustomResources.getAvailable(); + + for (Map.Entry availableEntry : availableResources + .getValues().entrySet()) { + String resourceName = availableEntry.getKey(); + Long availableValue = availableEntry.getValue(); + long value = Math.min(availableValue, + getMaxShare().getResourceValue(resourceName)); + maxResource.setResourceValue(resourceName, value); + } } // Round up to allow AM to run when there is only one vcore on the cluster @@ -525,8 +547,8 @@ private Resource computeMaxAMResource() { * Check whether this queue can run the Application Master under the * maxAMShare limit. * - * @param amResource resources required to run the AM - * @return true if this queue can run + * @param amResource Resources required to run the AM. + * @return true if this queue can run the AM. */ public boolean canRunAppAM(Resource amResource) { if (Math.abs(maxAMShare - -1.0f) < 0.0001) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 827c22129b5..04bf15ae566 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -310,4 +310,8 @@ static FSQueueMetrics forQueue(MetricsSystem ms, String queueName, return (FSQueueMetrics)metrics; } + + FSQueueMetricsForCustomResources getCustomResources() { + return customResources; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 282367edbaa..54492572133 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1203,9 +1203,9 @@ public ResourceCalculator getResourceCalculator() { * metrics will be consistent. */ private void updateRootQueueMetrics() { - rootMetrics.setAvailableResourcesToQueue( - Resources.subtract( - getClusterResource(), rootMetrics.getAllocatedResources())); + Resource res = Resources.subtract(getClusterResource(), + rootMetrics.getAllocatedResources()); + rootMetrics.setAvailableResourcesToQueue(res); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 4a738ca07fb..528416137ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -18,22 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; @@ -42,19 +27,42 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + public class TestFSLeafQueue extends FairSchedulerTestBase { private final static String ALLOC_FILE = new File(TEST_DIR, TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath(); private Resource maxResource = Resources.createResource(1024 * 8); + private static float MAX_AM_SHARE = 0.5f; + private static final String CUSTOM_RESOURCE = "test1"; @Before public void setup() throws IOException { @@ -105,6 +113,8 @@ public void test() throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); + out.println("" + MAX_AM_SHARE + + ""); out.println(""); out.println(""); out.println(""); @@ -221,4 +231,128 @@ public void run() { assertTrue("Test failed with exception(s)" + exceptions, exceptions.isEmpty()); } + + @Test + public void testCanRunAppAMReturnsTrue() { + conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE); + ResourceUtils.resetResourceTypes(conf); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + Resource maxShare = Resource.newInstance(1024 * 8, 4, + ImmutableMap.of(CUSTOM_RESOURCE, 10L)); + + // Add a node to increase available memory and vcores in scheduler's + // root queue metrics + addNodeToScheduler(Resource.newInstance(4096, 10, + ImmutableMap.of(CUSTOM_RESOURCE, 25L))); + + FSLeafQueue queue = setupQueue(maxShare); + + //Min(availableMemory, maxShareMemory (maxResourceOverridden)) + // --> Min(4096, 8192) = 4096 + //Min(availableVCores, maxShareVCores (maxResourceOverridden)) + // --> Min(10, 4) = 4 + //Min(available test1, maxShare test1 (maxResourceOverridden)) + // --> Min(25, 10) = 10 + //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE + // --> 2048 MB memory, 2 vcores, 5 test1 + Resource expectedAMShare = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 5L)); + + Resource appAMResource = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 3L)); + + Map customResourceValues = + verifyQueueMetricsForCustomResources(queue); + + boolean result = queue.canRunAppAM(appAMResource); + assertTrue("AM should have been allocated!", result); + + verifyAMShare(queue, expectedAMShare, customResourceValues); + } + + private FSLeafQueue setupQueue(Resource maxShare) { + String queueName = "root.queue1"; + FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); + schedulable.setMaxShare(new ConfigurableResource(maxShare)); + schedulable.setMaxAMShare(MAX_AM_SHARE); + return schedulable; + } + + @Test + public void testCanRunAppAMReturnsFalse() { + conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE); + ResourceUtils.resetResourceTypes(conf); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + Resource maxShare = Resource.newInstance(1024 * 8, 4, + ImmutableMap.of(CUSTOM_RESOURCE, 10L)); + + // Add a node to increase available memory and vcores in scheduler's + // root queue metrics + addNodeToScheduler(Resource.newInstance(4096, 10, + ImmutableMap.of(CUSTOM_RESOURCE, 25L))); + + FSLeafQueue queue = setupQueue(maxShare); + + //Min(availableMemory, maxShareMemory (maxResourceOverridden)) + // --> Min(4096, 8192) = 4096 + //Min(availableVCores, maxShareVCores (maxResourceOverridden)) + // --> Min(10, 4) = 4 + //Min(available test1, maxShare test1 (maxResourceOverridden)) + // --> Min(25, 10) = 10 + //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE + // --> 2048 MB memory, 2 vcores, 5 test1 + Resource expectedAMShare = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 5L)); + + Resource appAMResource = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 6L)); + + Map customResourceValues = + verifyQueueMetricsForCustomResources(queue); + + boolean result = queue.canRunAppAM(appAMResource); + assertFalse("AM should not have been allocated!", result); + + verifyAMShare(queue, expectedAMShare, customResourceValues); + } + + private void addNodeToScheduler(Resource node1Resource) { + RMNode node1 = MockNodes.newNodeInfo(0, node1Resource, 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + } + + private void verifyAMShare(FSLeafQueue schedulable, + Resource expectedAMShare, Map customResourceValues) { + Resource actualAMShare = Resource.newInstance( + schedulable.getMetrics().getMaxAMShareMB(), + schedulable.getMetrics().getMaxAMShareVCores(), customResourceValues); + long customResourceValue = + actualAMShare.getResourceValue(CUSTOM_RESOURCE); + + //make sure to verify custom resource value explicitly! + assertEquals(5L, customResourceValue); + assertEquals("AM share is not the expected!", expectedAMShare, + actualAMShare); + } + + private Map verifyQueueMetricsForCustomResources( + FSLeafQueue schedulable) { + QueueMetricsCustomResource maxAMShareCustomResources = + schedulable.getMetrics().getCustomResources().getMaxAMShare(); + Map customResourceValues = maxAMShareCustomResources + .getValues(); + assertNotNull("Queue metrics for custom resources should not be null!", + maxAMShareCustomResources); + assertNotNull("Queue metrics for custom resources resource values " + + "should not be null!", customResourceValues); + return customResourceValues; + } }