diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 2f9209c..88d1931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -54,6 +56,7 @@ public class RMContextImpl implements RMContext { private Dispatcher rmDispatcher; + private EventHandler schedulerDispatcher; private boolean isHAEnabled; @@ -438,4 +441,15 @@ public Configuration getYarnConfiguration() { public void setYarnConfiguration(Configuration yarnConfiguration) { this.yarnConfiguration=yarnConfiguration; } + + public void setSchedulerDispatcher( + EventHandler schedulerDispatcher) { + this.schedulerDispatcher = schedulerDispatcher; + } + + @Private + @VisibleForTesting + public EventHandler getSchedulerDispatcher() { + return schedulerDispatcher; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 130cfd4..7c5124a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -489,6 +489,7 @@ protected void serviceInit(Configuration configuration) throws Exception { schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); + rmContext.setSchedulerDispatcher(schedulerDispatcher); // Register event handler for RmAppEvents rmDispatcher.register(RMAppEventType.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 316a450..fc5a826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -94,7 +95,7 @@ .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, RMContainerEventType.EXPIRE, new FinishedTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, - RMContainerEventType.KILL, new FinishedTransition()) + RMContainerEventType.KILL, new ContainerRescheduledTransition()) // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, @@ -495,6 +496,16 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { } } + private static final class ContainerRescheduledTransition extends BaseTransition { + + @Override + public void transition(RMContainerImpl container, RMContainerEvent event) { + // Tell scheduler to recover request of this container to app + container.eventHandler.handle(new ContainerRescheduledEvent(container)); + new FinishedTransition().transition(container, event); + } + } + private static class FinishedTransition extends BaseTransition { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 48c7f2f..6540b42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -107,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -1326,6 +1327,13 @@ public void handle(SchedulerEvent event) { RMContainerEventType.EXPIRE); } break; + case CONTAINER_RESCHEDULED: { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -1499,7 +1507,6 @@ public void killContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } - recoverResourceRequestForContainer(cont); completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java new file mode 100644 index 0000000..6ae5be5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; + +public class ContainerRescheduledEvent extends SchedulerEvent { + + private RMContainer container; + public ContainerRescheduledEvent(RMContainer container) { + super(SchedulerEventType.CONTAINER_RESCHEDULED); + this.container = container; + } + + public RMContainer getContainer() { + return container; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 13aecb3..b2292e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -36,5 +36,8 @@ APP_ATTEMPT_REMOVED, // Source: ContainerAllocationExpirer - CONTAINER_EXPIRED + CONTAINER_EXPIRED, + + // Source: RMContainer + CONTAINER_RESCHEDULED } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f481de5..2a46550 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -450,7 +451,6 @@ protected void warnOrKillContainer(RMContainer container) { SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); - recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); @@ -1242,6 +1242,15 @@ public void handle(SchedulerEvent event) { SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); break; + case CONTAINER_RESCHEDULED: + if (!(event instanceof ContainerRescheduledEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent)event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + break; default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b8c419c..df2b6b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -846,6 +847,13 @@ public void handle(SchedulerEvent event) { RMContainerEventType.EXPIRE); } break; + case CONTAINER_RESCHEDULED: { + ContainerRescheduledEvent containerRescheduledEvent = + (ContainerRescheduledEvent) event; + RMContainer container = containerRescheduledEvent.getContainer(); + recoverResourceRequestForContainer(container); + } + break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 7befba4..f97e584 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -247,7 +247,7 @@ public void testAMRestartWithExistingContainers() throws Exception { private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException { int count = 0; - while (attempt.getJustFinishedContainers().size() != expectedNum + while (attempt.getJustFinishedContainers().size() < expectedNum && count < 500) { Thread.sleep(100); count++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 98877e7..9c40dec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -3811,12 +3812,14 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); // exceeds no limits - ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", + "user1"); verifyAppRunnable(attId1, true); verifyQueueNumRunnable("queue1.sub1", 1, 0); clock.tick(10); // exceeds no limits - ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1"); + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", + "user1"); verifyAppRunnable(attId2, true); verifyQueueNumRunnable("queue1.sub3", 1, 0); clock.tick(10); @@ -3826,7 +3829,8 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { verifyQueueNumRunnable("queue1.sub2", 1, 0); clock.tick(10); // exceeds queue1 limit - ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1"); + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", + "user1"); verifyAppRunnable(attId4, false); verifyQueueNumRunnable("queue1.sub2", 1, 1); clock.tick(10); @@ -3864,15 +3868,16 @@ public void testMaxRunningAppsHierarchicalQueues() throws Exception { @Test (timeout = 10000) public void testContinuousScheduling() throws Exception { - // set continuous scheduling enabled - scheduler = new FairScheduler(); + scheduler = (FairScheduler)resourceManager.getResourceScheduler(); + ((ResourceManager.SchedulerEventDispatcher) + ((RMContextImpl)resourceManager.getRMContext()) + .getSchedulerDispatcher()).start(); Configuration conf = createConfiguration(); + // set continuous scheduling enabled conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); - scheduler.setRMContext(resourceManager.getRMContext()); - scheduler.init(conf); + true); + scheduler.serviceInit(conf); scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); Assert.assertTrue("Continuous scheduling should be enabled.", scheduler.isContinuousSchedulingEnabled()); @@ -3974,6 +3979,56 @@ public void testContinuousSchedulingWithNodeRemoved() throws Exception { } } + @Test (timeout = 10000) + public void testContinuousSchedulingWithNodeReconnect() throws Exception { + scheduler = (FairScheduler)resourceManager.getResourceScheduler(); + ((ResourceManager.SchedulerEventDispatcher) + ((RMContextImpl)resourceManager.getRMContext()) + .getSchedulerDispatcher()).start(); + + // Add one node + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Add application request + ApplicationAttemptId appAttemptId = createSchedulingRequest(1024, + 1, "queue11", "user1"); + scheduler.update(); + + // Invoke the continuous scheduling once + scheduler.continuousSchedulingAttempt(); + + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); + + // check consumption after scheduling + Assert.assertEquals(1024, app.getCurrentConsumption().getMemory()); + Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + + // check consumption after node removed + Assert.assertEquals(0, app.getCurrentConsumption().getMemory()); + Assert.assertEquals(0, app.getCurrentConsumption().getVirtualCores()); + + // Added one node + NodeAddedSchedulerEvent addedNode1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(addedNode1); + + while (app.getCurrentConsumption().equals(Resources.none())) { + scheduler.update(); + scheduler.continuousSchedulingAttempt(); + } + + // check consumption after re-schSchedulerEventDispatchereduling + Assert.assertEquals(1024, app.getCurrentConsumption().getMemory()); + Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores()); + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); @@ -4052,10 +4107,13 @@ public void testRecoverRequestAfterPreemption() throws Exception { conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); MockClock clock = new MockClock(); + scheduler = (FairScheduler)resourceManager.getResourceScheduler(); + ((ResourceManager.SchedulerEventDispatcher) + ((RMContextImpl)resourceManager.getRMContext()) + .getSchedulerDispatcher()).start(); scheduler.setClock(clock); - scheduler.init(conf); + scheduler.serviceInit(conf); scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); Priority priority = Priority.newInstance(20); String host = "127.0.0.1";