From e0e7ef17e9c159db250a60ea61158ea5df580399 Mon Sep 17 00:00:00 2001 From: Folin Chen Date: Thu, 2 Jun 2016 20:34:01 +0800 Subject: [PATCH] fairscheduler performance --- .../scheduler/fair/FSAppAttempt.java | 48 +++++++++- .../scheduler/fair/FSLeafQueue.java | 104 ++++++++++++++++----- .../scheduler/fair/FSParentQueue.java | 23 +++-- .../resourcemanager/scheduler/fair/FSQueue.java | 19 ++++ .../scheduler/fair/FairScheduler.java | 45 ++++++++- .../scheduler/fair/FairSchedulerConfiguration.java | 12 +++ .../scheduler/fair/TestFSLeafQueue.java | 5 + .../scheduler/fair/TestMaxRunningAppsEnforcer.java | 31 +++--- 8 files changed, 237 insertions(+), 50 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index b23ec3e..2d3a7ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -48,10 +48,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -145,12 +148,42 @@ synchronized public void containerCompleted(RMContainer rmContainer, Resources.subtractFrom(currentConsumption, containerResource); // remove from preemption map if it is completed - preemptionMap.remove(rmContainer); + if(preemptionMap.remove(rmContainer) == null){ + ((FSQueue) queue).decResourceUsage(containerResource); + } // Clear resource utilization metrics cache. lastMemoryAggregateAllocationUpdateTime = -1; } + @Override + public synchronized void move(Queue newQueue) { + Resource appResource = Resource.newInstance(0, 0); + QueueMetrics oldMetrics = queue.getMetrics(); + QueueMetrics newMetrics = newQueue.getMetrics(); + String user = getUser(); + for (RMContainer liveContainer : liveContainers.values()) { + Resource resource = liveContainer.getContainer().getResource(); + Resources.addTo(appResource, liveContainer.getContainer().getResource()); + oldMetrics.releaseResources(user, 1, resource); + newMetrics.allocateResources(user, 1, resource, false); + } + ((FSQueue) queue).decResourceUsage(appResource); + + for (Map map : reservedContainers.values()) { + for (RMContainer reservedContainer : map.values()) { + Resource resource = reservedContainer.getReservedResource(); + oldMetrics.unreserveResource(user, resource); + newMetrics.reserveResource(user, resource); + } + } + + appSchedulingInfo.move(newQueue); + this.queue = newQueue; + + ((FSQueue) queue).incResourceUsage(appResource); + } + private synchronized void unreserveInternal( Priority priority, FSSchedulerNode node) { Map reservedContainers = @@ -171,6 +204,15 @@ private synchronized void unreserveInternal( + priority + "; currentReservation " + currentReservation); } + public synchronized void recoverContainer(SchedulerNode node, + RMContainer rmContainer) { + super.recoverContainer(rmContainer); + if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) { + ((FSQueue) queue).incResourceUsage(rmContainer.getContainer() + .getResource()); + } + } + @Override public Resource getHeadroom() { final FSQueue queue = (FSQueue) this.queue; @@ -340,6 +382,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && List resourceRequestList = appSchedulingInfo.allocate( type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + ((FSQueue) queue).incResourceUsage(container.getResource()); // Update resource requests related to "request" and store in RMContainer ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); @@ -379,6 +422,7 @@ public void addPreemption(RMContainer container, long time) { assert preemptionMap.get(container) == null; preemptionMap.put(container, time); Resources.addTo(preemptedResources, container.getAllocatedResource()); + ((FSQueue) queue).decResourceUsage(container.getAllocatedResource()); } public Long getContainerPreemptionTime(RMContainer container) { @@ -399,10 +443,12 @@ public Resource getPreemptedResources() { } public void resetPreemptedResources() { + ((FSQueue) queue).incResourceUsage(preemptedResources); preemptedResources = Resources.createResource(0); for (RMContainer container : getPreemptionContainers()) { Resources.addTo(preemptedResources, container.getAllocatedResource()); } + ((FSQueue) queue).decResourceUsage(preemptedResources); } public void clearPreemptedResources() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 86772b5..accd2f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -68,12 +68,18 @@ private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; + private long lastSortTime = 0; + private long lastCheckTime = 0; + private final long resortInterval; + private final long checkInterval; public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); + this.resortInterval = scheduler.getConf().getAppResortInterval(); + this.checkInterval = scheduler.getConf().getQueueUsageInterval(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -82,7 +88,16 @@ public void addApp(FSAppAttempt app, boolean runnable) { writeLock.lock(); try { if (runnable) { - runnableApps.add(app); + if(this.scheduler.getConf().getPreemptionEnabled()){ + runnableApps.add(app); + } else { + int index = Collections.binarySearch(runnableApps, app, policy.getComparator()); + if(index >= 0){ + runnableApps.add(index + 1, app); + } else { + runnableApps.add(-index -1, app); + } + } } else { nonRunnableApps.add(app); } @@ -235,25 +250,41 @@ public Resource getDemand() { return demand; } - @Override - public Resource getResourceUsage() { - Resource usage = Resources.createResource(0); - readLock.lock(); - try { - for (FSAppAttempt app : runnableApps) { - Resources.addTo(usage, app.getResourceUsage()); - } - for (FSAppAttempt app : nonRunnableApps) { - Resources.addTo(usage, app.getResourceUsage()); + public Resource getAmResourceUsage() { + return amResourceUsage; + } + + public void resortRunnableApps() { + if(System.currentTimeMillis() - lastSortTime > resortInterval){ + writeLock.lock(); + try{ + Collections.sort(runnableApps, policy.getComparator()); + this.lastSortTime = System.currentTimeMillis(); + } finally { + writeLock.unlock(); } - } finally { - readLock.unlock(); } - return usage; } - - public Resource getAmResourceUsage() { - return amResourceUsage; + + public void usageCheck() { + if(System.currentTimeMillis() - lastCheckTime > checkInterval){ + writeLock.lock(); + try{ + Resource resource = Resources.createResource(0); + for(FSAppAttempt attempt : runnableApps){ + Resources.addTo(resource, attempt.getResourceUsage()); + } + for(FSAppAttempt attempt : nonRunnableApps){ + Resources.addTo(resource, attempt.getResourceUsage()); + } + if(!resource.equals(getResourceUsage())){ + LOG.error("QueueUsage is not correct now ! QueueUsage:" + getResourceUsage() + ". ComputeUsage:" + resource); + } + this.lastCheckTime = System.currentTimeMillis(); + } finally { + writeLock.unlock(); + } + } } @Override @@ -310,31 +341,54 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - Comparator comparator = policy.getComparator(); - writeLock.lock(); - try { - Collections.sort(runnableApps, comparator); - } finally { - writeLock.unlock(); - } readLock.lock(); + FSAppAttempt successed = null; try { for (FSAppAttempt sched : runnableApps) { if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) { continue; } - assigned = sched.assignContainer(node); if (!assigned.equals(Resources.none())) { + successed = sched; break; } } } finally { readLock.unlock(); } + if(successed != null){ + writeLock.lock(); + try{ + runnableApps.remove(successed); + int index = Collections.binarySearch(runnableApps, successed, policy.getComparator()); + if(index >= 0){ + runnableApps.add(index + 1, successed); + } else { + runnableApps.add(- index -1 , successed); + } + } finally { + writeLock.unlock(); + } + } return assigned; } + public void reSortSingleApp(FSAppAttempt app) { + writeLock.lock(); + try{ + runnableApps.remove(app); + int index = Collections.binarySearch(runnableApps, app, policy.getComparator()); + if(index >= 0){ + runnableApps.add(index + 1, app); + } else { + runnableApps.add(- index -1 , app); + } + } finally { + writeLock.unlock(); + } + } + @Override public RMContainer preemptContainer() { RMContainer toBePreempted = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index f74106a..5e96319 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -91,13 +91,24 @@ public Resource getDemand() { return demand; } - @Override - public Resource getResourceUsage() { - Resource usage = Resources.createResource(0); - for (FSQueue child : childQueues) { - Resources.addTo(usage, child.getResourceUsage()); + public void resortRunnableApps() { + for(FSQueue queue : childQueues){ + if(queue instanceof FSParentQueue){ + ((FSParentQueue) queue).resortRunnableApps(); + } else if (queue instanceof FSLeafQueue) { + ((FSLeafQueue) queue).resortRunnableApps(); + } + } + } + + public void usageCheck() { + for(FSQueue queue : childQueues){ + if(queue instanceof FSParentQueue){ + ((FSParentQueue) queue).usageCheck(); + } else if (queue instanceof FSLeafQueue) { + ((FSLeafQueue) queue).usageCheck(); + } } - return usage; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index f60d128..8cb12ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -55,6 +55,7 @@ private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + private final Resource queueUsage = Resource.newInstance(0, 0); public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -64,7 +65,25 @@ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { metrics.setMaxShare(getMaxShare()); this.parent = parent; } + + public Resource getResourceUsage() { + return queueUsage; + } + + public synchronized void incResourceUsage(Resource res) { + Resources.addTo(queueUsage, res); + if (parent != null) { + parent.incResourceUsage(res); + } + } + public synchronized void decResourceUsage(Resource res) { + Resources.subtractFrom(queueUsage, res); + if (parent != null) { + parent.decResourceUsage(res); + } + } + public String getName() { return name; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 9e2af72..c3166c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -306,6 +306,9 @@ protected synchronized void update() { // and update metrics rootQueue.recomputeShares(); + queueMgr.getRootQueue().resortRunnableApps(); + queueMgr.getRootQueue().usageCheck(); + if (LOG.isDebugEnabled()) { if (--updatesToSkipForDebug < 0) { updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; @@ -749,7 +752,7 @@ private synchronized void removeApplicationAttempt( LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - completedContainer(rmContainer, + completedContainerForRemove(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -758,7 +761,7 @@ private synchronized void removeApplicationAttempt( // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - completedContainer(rmContainer, + completedContainerForRemove(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -781,6 +784,43 @@ private synchronized void removeApplicationAttempt( } } + protected synchronized void completedContainerForRemove(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + if (rmContainer == null) { + LOG.info("Null container completed..."); + return; + } + + Container container = rmContainer.getContainer(); + + // Get the application for the finished container + FSAppAttempt application = + getCurrentAttemptForContainer(container.getId()); + ApplicationId appId = + container.getId().getApplicationAttemptId().getApplicationId(); + if (application == null) { + LOG.info("Container " + container + " of" + + " unknown application attempt " + appId + + " completed with event " + event); + return; + } + + // Get the node on which the container was allocated + FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); + + if (rmContainer.getState() == RMContainerState.RESERVED) { + application.unreserve(rmContainer.getReservedPriority(), node); + } else { + application.containerCompleted(rmContainer, containerStatus, event); + node.releaseContainer(container); + updateRootQueueMetrics(); + } + + LOG.info("Application attempt " + application.getApplicationAttemptId() + + " released container " + container.getId() + " on node: " + node + + " with event: " + event); + } + /** * Clean up a completed container. */ @@ -816,6 +856,7 @@ protected synchronized void completedContainer(RMContainer rmContainer, node.releaseContainer(container); updateRootQueueMetrics(); } + application.getQueue().reSortSingleApp(application); LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 32ef906..e170601 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -122,6 +122,10 @@ /** Maximum number of containers to assign on each check-in. */ protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; protected static final int DEFAULT_MAX_ASSIGN = -1; + protected static final String APP_RESORT_INTERVAL_MS = CONF_PREFIX + "app.resort.interval-ms"; + protected static final long DEFAULT_APP_RESORT_INTERVAL_MS = 5000; + protected static final String QUEUEUSAGE_CHECK_INTERVAL_MS = CONF_PREFIX + "queueusage.check.interval-ms"; + protected static final long DEFAULT_QUEUEUSAGE_CHECK_INTERVAL_MS = 60000; /** The update interval for calculating resources in FairScheduler .*/ public static final String UPDATE_INTERVAL_MS = @@ -206,6 +210,14 @@ public int getMaxAssign() { return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN); } + public long getAppResortInterval() { + return getLong(APP_RESORT_INTERVAL_MS, DEFAULT_APP_RESORT_INTERVAL_MS); + } + + public long getQueueUsageInterval() { + return getLong(QUEUEUSAGE_CHECK_INTERVAL_MS, DEFAULT_QUEUEUSAGE_CHECK_INTERVAL_MS); + } + public boolean getSizeBasedWeight() { return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7cdd208..67669ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -89,6 +90,10 @@ public void testUpdateDemand() { FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); + Mockito.when(app.getMinShare()).thenReturn(Resources.none()); + Mockito.when(app.getResourceUsage()).thenReturn(Resources.none()); + Mockito.when(app.getWeights()).thenReturn(new ResourceWeights((float)1.0)); + Mockito.when(app.getName()).thenReturn("app"); schedulable.addAppSchedulable(app); schedulable.addAppSchedulable(app); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java index ac5748f..124cbf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java @@ -29,8 +29,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.junit.Before; import org.junit.Test; @@ -46,24 +48,22 @@ @Before public void setup() throws Exception { - Configuration conf = new Configuration(); + Configuration conf = new FairSchedulerConfiguration(); clock = new TestFairScheduler.MockClock(); - scheduler = mock(FairScheduler.class); - when(scheduler.getConf()).thenReturn( - new FairSchedulerConfiguration(conf)); - when(scheduler.getClock()).thenReturn(clock); - AllocationConfiguration allocConf = new AllocationConfiguration( - conf); - when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); - + scheduler = new FairScheduler(); + scheduler.init(new FairSchedulerConfiguration()); queueManager = new QueueManager(scheduler); queueManager.initialize(conf); - queueMaxApps = allocConf.queueMaxApps; - userMaxApps = allocConf.userMaxApps; + queueMaxApps = scheduler.allocConf.queueMaxApps; + userMaxApps = scheduler.allocConf.userMaxApps; maxAppsEnforcer = new MaxRunningAppsEnforcer(scheduler); appNum = 0; - rmContext = mock(RMContext.class); - when(rmContext.getEpoch()).thenReturn(0L); + ResourceManager resourceManager = new ResourceManager(); + resourceManager.init(conf); + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + resourceManager.getRMContext().getStateStore().start(); + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + rmContext = resourceManager.getRMContext(); } private FSAppAttempt addApp(FSLeafQueue queue, String user) { @@ -158,9 +158,8 @@ public void testRemoveEnablingOrderedByStartTime() { assertEquals(1, leaf1.getNumNonRunnableApps()); assertEquals(1, leaf2.getNumNonRunnableApps()); removeApp(app1); - assertEquals(0, leaf1.getNumRunnableApps()); - assertEquals(2, leaf2.getNumRunnableApps()); - assertEquals(0, leaf2.getNumNonRunnableApps()); + assertEquals(2, leaf1.getNumRunnableApps() + leaf2.getNumRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps() + leaf1.getNumNonRunnableApps()); } @Test -- 2.7.2.windows.1