diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8aefe9f..3aeedaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1131,8 +1131,14 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, && !RMAppImpl.isAppInFinalState(rmApp)) { // Add the previous finished attempt to scheduler synchronously so // that scheduler knows the previous attempt. - appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false, true)); + if (EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.LAUNCHED) + .contains(appAttempt.recoveredFinalState)) { + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false, true, true)); + } else { + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false, true)); + } (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition( appAttempt, event); } @@ -1164,7 +1170,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // Add attempt to scheduler synchronously to guarantee scheduler // knows attempts before AM or NM re-registers. appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false, true)); + appAttempt.getAppAttemptId(), false, true, true)); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 159c7a5..e518db1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -771,7 +771,8 @@ private synchronized void addApplication(ApplicationId applicationId, private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean isAttemptRecovering) { + boolean isAttemptRecovering, + boolean wasAttemptRunning) { SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); if (application == null) { @@ -783,7 +784,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority()); + application.getPriority(), wasAttemptRunning); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); @@ -1352,7 +1353,8 @@ public void handle(SchedulerEvent event) { (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getIsAttemptRecovering()); + appAttemptAddedEvent.getIsAttemptRecovering(), + appAttemptAddedEvent.getWasAttemptRunning()); } break; case APP_ATTEMPT_REMOVED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index be1ba89..59dcd17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -94,7 +94,9 @@ private Priority defaultAppPriorityPerQueue; private OrderingPolicy pendingOrderingPolicy = null; - + + private OrderingPolicy pendingOrderingPolicyRecovery = null; + private volatile float minimumAllocationFactor; private Map users = new HashMap(); @@ -156,6 +158,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); setPendingAppsOrderingPolicy(conf . getOrderingPolicy(getQueuePath())); + setPendingAppsOrderingPolicyRecovery(conf + . getOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -320,7 +324,8 @@ public synchronized int getNumApplications() { } public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities(); + return pendingOrderingPolicy.getNumSchedulableEntities() + + pendingOrderingPolicyRecovery.getNumSchedulableEntities(); } public synchronized int getNumActiveApplications() { @@ -607,9 +612,24 @@ private synchronized void activateApplications() { Map userAmPartitionLimit = new HashMap(); - for (Iterator i = getPendingAppsOrderingPolicy() - .getAssignmentIterator(); i.hasNext();) { - FiCaSchedulerApp application = i.next(); + for (Iterator i = + getPendingAppsOrderingPolicyRecovery().getAssignmentIterator(); i + .hasNext();) { + activateApplications(i, amPartitionLimit, userAmPartitionLimit); + } + + for (Iterator i = + getPendingAppsOrderingPolicy().getAssignmentIterator(); i.hasNext();) { + activateApplications(i, amPartitionLimit, userAmPartitionLimit); + } + } + + private synchronized void activateApplications( + Iterator fsApp, + Map amPartitionLimit, + Map userAmPartitionLimit) { + while (fsApp.hasNext()) { + FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); // Get the am-node-partition associated with each application @@ -623,12 +643,12 @@ private synchronized void activateApplications() { amPartitionLimit.put(partitionName, amLimit); } // Check am resource limit. - Resource amIfStarted = Resources.add( - application.getAMResource(partitionName), - queueUsage.getAMUsed(partitionName)); + Resource amIfStarted = + Resources.add(application.getAMResource(partitionName), + queueUsage.getAMUsed(partitionName)); if (LOG.isDebugEnabled()) { - LOG.debug("application "+application.getId() +" AMResource " + LOG.debug("application " + application.getId() + " AMResource " + application.getAMResource(partitionName) + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + " amLimit " + amLimit + " lastClusterResource " @@ -647,8 +667,10 @@ private synchronized void activateApplications() { + " skipping enforcement to allow at least one application" + " to start"); } else { - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); + application + .updateAMContainerDiagnostics( + AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); LOG.info("Not activating application " + applicationId + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " + amLimit); @@ -666,9 +688,9 @@ private synchronized void activateApplications() { userAmPartitionLimit.put(partitionName, userAMLimit); } - Resource userAmIfStarted = Resources.add( - application.getAMResource(partitionName), - user.getConsumedAMResources(partitionName)); + Resource userAmIfStarted = + Resources.add(application.getAMResource(partitionName), + user.getConsumedAMResources(partitionName)); if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, userAmIfStarted, userAMLimit)) { @@ -681,8 +703,10 @@ private synchronized void activateApplications() { + " low. skipping enforcement to allow at least one application" + " to start"); } else { - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); + application + .updateAMContainerDiagnostics( + AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); LOG.info("Not activating application " + applicationId + " for user: " + user + " as userAmIfStarted: " + userAmIfStarted + " exceeds userAmLimit: " + userAMLimit); @@ -700,17 +724,21 @@ private synchronized void activateApplications() { metrics.incAMUsed(application.getUser(), application.getAMResource(partitionName)); metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); - i.remove(); + fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application.getUser() + " activated in queue: " + getQueueName()); } } - + private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) { // Accept user.submitApplication(); - getPendingAppsOrderingPolicy().addSchedulableEntity(application); + if (application.wasAppRunningDuringRecovery()) { + getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application); + } else { + getPendingAppsOrderingPolicy().addSchedulableEntity(application); + } applicationAttemptMap.put(application.getApplicationAttemptId(), application); // Activate applications @@ -750,7 +778,11 @@ public synchronized void removeApplicationAttempt( boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { - pendingOrderingPolicy.removeSchedulableEntity(application); + if (application.wasAppRunningDuringRecovery()) { + pendingOrderingPolicyRecovery.removeSchedulableEntity(application); + } else { + pendingOrderingPolicy.removeSchedulableEntity(application); + } } else { queueUsage.decAMUsed(partitionName, application.getAMResource(partitionName)); @@ -1499,7 +1531,10 @@ public void recoverContainer(Resource clusterResource, * Obtain (read-only) collection of pending applications. */ public Collection getPendingApplications() { - return pendingOrderingPolicy.getSchedulableEntities(); + Collection pendingApps = + pendingOrderingPolicy.getSchedulableEntities(); + pendingApps.addAll(pendingOrderingPolicyRecovery.getSchedulableEntities()); + return pendingApps; } /** @@ -1547,6 +1582,10 @@ public synchronized void collectSchedulerApplications( .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); } + for (FiCaSchedulerApp pendingApp : pendingOrderingPolicyRecovery + .getSchedulableEntities()) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) { apps.add(app.getApplicationAttemptId()); @@ -1678,6 +1717,21 @@ public synchronized void setPendingAppsOrderingPolicy( this.pendingOrderingPolicy = pendingOrderingPolicy; } + public synchronized OrderingPolicy + getPendingAppsOrderingPolicyRecovery() { + return pendingOrderingPolicyRecovery; + } + + public synchronized void setPendingAppsOrderingPolicyRecovery( + OrderingPolicy pendingOrderingPolicyRecovery) { + if (null != this.pendingOrderingPolicyRecovery) { + pendingOrderingPolicyRecovery + .addAllSchedulableEntities(this.pendingOrderingPolicyRecovery + .getSchedulableEntities()); + } + this.pendingOrderingPolicyRecovery = pendingOrderingPolicyRecovery; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c9c792e..bb123ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -95,17 +95,21 @@ */ private String appSkipNodeDiagnostics; + private boolean wasAppRunningDuringRecovery = false; + public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { this(applicationAttemptId, user, queue, activeUsersManager, rmContext, - Priority.newInstance(0)); + Priority.newInstance(0), false); } public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext, Priority appPriority) { + RMContext rmContext, Priority appPriority, + boolean wasAppRunningDuringRecovery) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + this.wasAppRunningDuringRecovery = wasAppRunningDuringRecovery; RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); @@ -479,7 +483,11 @@ public RMContainer findNodeToUnreserve(Resource clusterResource, public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - + + public boolean wasAppRunningDuringRecovery() { + return this.wasAppRunningDuringRecovery; + } + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode, RMContainer reservedContainer) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java index 6e66d2a..c8ff368 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java @@ -25,21 +25,30 @@ private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; private final boolean isAttemptRecovering; + private final boolean wasAttemptRunning; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { - this(applicationAttemptId, transferStateFromPreviousAttempt, false); + this(applicationAttemptId, transferStateFromPreviousAttempt, false, false); } public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt, - boolean isAttemptRecovering) { + boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { + this(applicationAttemptId, transferStateFromPreviousAttempt, + isAttemptRecovering, false); + } + + public AppAttemptAddedSchedulerEvent( + ApplicationAttemptId applicationAttemptId, + boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering, + boolean wasAttemptRunning) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; this.isAttemptRecovering = isAttemptRecovering; + this.wasAttemptRunning = wasAttemptRunning; } public ApplicationAttemptId getApplicationAttemptId() { @@ -53,4 +62,8 @@ public boolean getTransferStateFromPreviousAttempt() { public boolean getIsAttemptRecovering() { return isAttemptRecovering; } + + public boolean getWasAttemptRunning() { + return wasAttemptRunning; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 169e9f6..c9c3c1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -32,17 +33,23 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -567,4 +574,144 @@ public void testApplicationPriorityAllocationWithChangeInPriority() Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); rm.stop(); } + + // Test the scenario is + // Submit + @Test + public void testOrderOfActivatingThePriorityApplicationOnRMRestart() + throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + final DrainDispatcher dispatcher = new DrainDispatcher(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // PHASE 1: create state in an RM + + // start RM + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); + nm1.registerNode(); + + dispatcher.await(); + + ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); + LeafQueue defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + int memory = defaultQueue.getAMResourceLimit().getMemory() / 2; + + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm1.submitApp(memory, appPriority1); + MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + am1.registerAppAttempt(); + + Priority appPriority2 = Priority.newInstance(6); + RMApp app2 = rm1.submitApp(memory, appPriority2); + MockAM am2 = MockRM.launchAM(app2, rm1, nm1); + am2.registerAppAttempt(); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); + + Priority appPriority3 = Priority.newInstance(7); + RMApp app3 = rm1.submitApp(memory, appPriority3); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); + + Iterator iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + FiCaSchedulerApp fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + FiCaSchedulerApp fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + iterator = defaultQueue.getPendingApplications().iterator(); + FiCaSchedulerApp fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; + + // start new RM + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // Verify RM Apps after this restart + Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); + + dispatcher1.await(); + scheduler = rm2.getRMContext().getScheduler(); + defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + + int count = 5; + while (count-- > 0) { + if ((defaultQueue.getNumActiveApplications() + defaultQueue + .getNumPendingApplications()) == 3) { + break; + } + Thread.sleep(500); + } + + Assert.assertEquals(1, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(2, defaultQueue.getNumPendingApplications()); + + nm1.registerNode(); + dispatcher1.await(); + + count = 5; + while (count-- > 0) { + if (defaultQueue.getOrderingPolicy() + .getSchedulableEntities().size() == 2) { + break; + } + Thread.sleep(500); + } + + iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + + iterator = defaultQueue.getPendingApplications().iterator(); + fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + rm2.stop(); + rm1.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 479e25a..01e9a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -2411,16 +2411,16 @@ public void testFifoAssignment() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - FiCaSchedulerApp app_0 = - spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3))); + FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, + a, mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(3), false)); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - FiCaSchedulerApp app_1 = - spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5))); + FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, + a, mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(5), false)); a.submitApplicationAttempt(app_1, user_0); Priority priority = TestUtils.createMockPriority(1); -- 1.9.2.msysgit.0