diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8aefe9f..d96e955 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1129,10 +1129,6 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // state but application is not in final state. if (rmApp.getCurrentAppAttempt() == appAttempt && !RMAppImpl.isAppInFinalState(rmApp)) { - // Add the previous finished attempt to scheduler synchronously so - // that scheduler knows the previous attempt. - appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false, true)); (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition( appAttempt, event); } @@ -1346,7 +1342,8 @@ public void transition(RMAppAttemptImpl appAttempt, { appAttempt.invalidateAMHostAndPort(); - if (appAttempt.submissionContext + if (appAttempt.recoveredFinalState == null + && appAttempt.submissionContext .getKeepContainersAcrossApplicationAttempts() && !appAttempt.submissionContext.getUnmanagedAM()) { // See if we should retain containers for non-unmanaged applications @@ -1375,8 +1372,10 @@ public void transition(RMAppAttemptImpl appAttempt, } appAttempt.eventHandler.handle(appEvent); - appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( - appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts)); + if (appAttempt.recoveredFinalState == null) { + appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( + appAttemptId, finalAttemptState, keepContainersAcrossAppAttempts)); + } appAttempt.removeCredentials(appAttempt); appAttempt.rmContext.getRMApplicationHistoryWriter() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 4d81350..e8d32e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -109,6 +109,7 @@ private LogAggregationContext logAggregationContext; private volatile Priority appPriority = null; + private boolean isAttemptRecovering; protected ResourceUsage attemptResourceUsage = new ResourceUsage(); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); @@ -960,6 +961,14 @@ protected void getActivedAppDiagnosticMessage( // queue's resource usage for specific partition } + public boolean isAttemptRecovering() { + return isAttemptRecovering; + } + + protected void setAttemptRecovering(boolean isAttemptRecovering) { + this.isAttemptRecovering = isAttemptRecovering; + } + public static enum AMState { UNMANAGED("User launched the Application Master, since it's unmanaged. "), INACTIVATED("Application is added to the scheduler and is not yet activated. "), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 159c7a5..0a4ff54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -783,7 +783,7 @@ private synchronized void addApplicationAttempt( FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority()); + application.getPriority(), isAttemptRecovering); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5c3f4b9..e9a223d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -95,6 +95,8 @@ private OrderingPolicy pendingOrderingPolicy = null; + private OrderingPolicy pendingOrderingPolicyRecovery = null; + private volatile float minimumAllocationFactor; private Map users = new HashMap(); @@ -156,6 +158,8 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); setPendingAppsOrderingPolicy(conf . getOrderingPolicy(getQueuePath())); + setPendingAppsOrderingPolicyRecovery(conf + . getOrderingPolicy(getQueuePath())); userLimit = conf.getUserLimit(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath()); @@ -320,7 +324,8 @@ public synchronized int getNumApplications() { } public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities(); + return pendingOrderingPolicy.getNumSchedulableEntities() + + pendingOrderingPolicyRecovery.getNumSchedulableEntities(); } public synchronized int getNumActiveApplications() { @@ -599,9 +604,19 @@ private synchronized void activateApplications() { Map userAmPartitionLimit = new HashMap(); - for (Iterator i = getPendingAppsOrderingPolicy() - .getAssignmentIterator(); i.hasNext();) { - FiCaSchedulerApp application = i.next(); + activateApplications(getPendingAppsOrderingPolicyRecovery() + .getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit); + + activateApplications( + getPendingAppsOrderingPolicy().getAssignmentIterator(), + amPartitionLimit, userAmPartitionLimit); + } + + private synchronized void activateApplications( + Iterator fsApp, Map amPartitionLimit, + Map userAmPartitionLimit) { + while (fsApp.hasNext()) { + FiCaSchedulerApp application = fsApp.next(); ApplicationId applicationId = application.getApplicationId(); // Get the am-node-partition associated with each application @@ -692,7 +707,7 @@ private synchronized void activateApplications() { metrics.incAMUsed(application.getUser(), application.getAMResource(partitionName)); metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); - i.remove(); + fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application.getUser() + " activated in queue: " + getQueueName()); } @@ -702,7 +717,11 @@ private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) { // Accept user.submitApplication(); - getPendingAppsOrderingPolicy().addSchedulableEntity(application); + if (application.isAttemptRecovering()) { + getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application); + } else { + getPendingAppsOrderingPolicy().addSchedulableEntity(application); + } applicationAttemptMap.put(application.getApplicationAttemptId(), application); // Activate applications @@ -742,7 +761,11 @@ public synchronized void removeApplicationAttempt( boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { - pendingOrderingPolicy.removeSchedulableEntity(application); + if (application.isAttemptRecovering()) { + pendingOrderingPolicyRecovery.removeSchedulableEntity(application); + } else { + pendingOrderingPolicy.removeSchedulableEntity(application); + } } else { queueUsage.decAMUsed(partitionName, application.getAMResource(partitionName)); @@ -1491,7 +1514,10 @@ public void recoverContainer(Resource clusterResource, * Obtain (read-only) collection of pending applications. */ public Collection getPendingApplications() { - return pendingOrderingPolicy.getSchedulableEntities(); + Collection pendingApps = + pendingOrderingPolicy.getSchedulableEntities(); + pendingApps.addAll(pendingOrderingPolicyRecovery.getSchedulableEntities()); + return pendingApps; } /** @@ -1539,6 +1565,10 @@ public synchronized void collectSchedulerApplications( .getSchedulableEntities()) { apps.add(pendingApp.getApplicationAttemptId()); } + for (FiCaSchedulerApp pendingApp : pendingOrderingPolicyRecovery + .getSchedulableEntities()) { + apps.add(pendingApp.getApplicationAttemptId()); + } for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) { apps.add(app.getApplicationAttemptId()); @@ -1670,6 +1700,20 @@ public synchronized void setPendingAppsOrderingPolicy( this.pendingOrderingPolicy = pendingOrderingPolicy; } + public synchronized OrderingPolicy getPendingAppsOrderingPolicyRecovery() { + return pendingOrderingPolicyRecovery; + } + + public synchronized void setPendingAppsOrderingPolicyRecovery( + OrderingPolicy pendingOrderingPolicyRecovery) { + if (null != this.pendingOrderingPolicyRecovery) { + pendingOrderingPolicyRecovery + .addAllSchedulableEntities(this.pendingOrderingPolicyRecovery + .getSchedulableEntities()); + } + this.pendingOrderingPolicyRecovery = pendingOrderingPolicyRecovery; + } + /* * Holds shared values used by all applications in * the queue to calculate headroom on demand diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index c9c792e..4b88415 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -99,12 +99,12 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { this(applicationAttemptId, user, queue, activeUsersManager, rmContext, - Priority.newInstance(0)); + Priority.newInstance(0), false); } public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext, Priority appPriority) { + RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); @@ -129,6 +129,7 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, setAppAMNodePartitionName(partition); setAMResource(partition, amResource); setPriority(appPriority); + setAttemptRecovering(isAttemptRecovering); scheduler = rmContext.getScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 169e9f6..b6c54f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -567,4 +569,140 @@ public void testApplicationPriorityAllocationWithChangeInPriority() Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); rm.stop(); } + + @Test + public void testOrderOfActivatingThePriorityApplicationOnRMRestart() + throws Exception { + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + final DrainDispatcher dispatcher = new DrainDispatcher(); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // PHASE 1: create state in an RM + + // start RM + MockRM rm1 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm1.start(); + + MockNM nm1 = + new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); + nm1.registerNode(); + + dispatcher.await(); + + ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); + LeafQueue defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + int memory = defaultQueue.getAMResourceLimit().getMemory() / 2; + + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm1.submitApp(memory, appPriority1); + MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + am1.registerAppAttempt(); + + Priority appPriority2 = Priority.newInstance(6); + RMApp app2 = rm1.submitApp(memory, appPriority2); + MockAM am2 = MockRM.launchAM(app2, rm1, nm1); + am2.registerAppAttempt(); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); + + Priority appPriority3 = Priority.newInstance(7); + RMApp app3 = rm1.submitApp(memory, appPriority3); + + dispatcher.await(); + Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); + + Iterator iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + FiCaSchedulerApp fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + FiCaSchedulerApp fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + iterator = defaultQueue.getPendingApplications().iterator(); + FiCaSchedulerApp fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + final DrainDispatcher dispatcher1 = new DrainDispatcher(); + // create new RM to represent restart and recover state + MockRM rm2 = new MockRM(conf, memStore) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher1; + } + }; + + // start new RM + rm2.start(); + // change NM to point to new RM + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + // Verify RM Apps after this restart + Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); + + dispatcher1.await(); + scheduler = rm2.getRMContext().getScheduler(); + defaultQueue = + (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); + + int count = 5; + while (count-- > 0) { + if ((defaultQueue.getNumActiveApplications() + defaultQueue + .getNumPendingApplications()) == 3) { + break; + } + Thread.sleep(500); + } + + Assert.assertEquals(1, defaultQueue.getNumActiveApplications()); + Assert.assertEquals(2, defaultQueue.getNumPendingApplications()); + + nm1.registerNode(); + dispatcher1.await(); + + count = 5; + while (count-- > 0) { + if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) { + break; + } + Thread.sleep(500); + } + + iterator = + defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator(); + fcApp2 = iterator.next(); + Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(), + fcApp2.getApplicationAttemptId()); + + fcApp1 = iterator.next(); + Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(), + fcApp1.getApplicationAttemptId()); + + iterator = defaultQueue.getPendingApplications().iterator(); + fcApp3 = iterator.next(); + Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), + fcApp3.getApplicationAttemptId()); + + rm2.stop(); + rm1.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 479e25a..d4b8dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -2413,14 +2413,16 @@ public void testFifoAssignment() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3))); + mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(3), false)); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5))); + mock(ActiveUsersManager.class), spyRMContext, + Priority.newInstance(5), false)); a.submitApplicationAttempt(app_1, user_0); Priority priority = TestUtils.createMockPriority(1); -- 1.9.2.msysgit.0