diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 59e108a..a38053b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -325,6 +325,10 @@ public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED = false; + public static final String RM_SCHEDULER_RECOVERY_WAIT_MS = RM_PREFIX + + "scheduler.recovery-wait-ms"; + public static final long DEFAULT_RM_SCHEDULER_RECOVERY_WAIT_MS = 10000; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 0c1628e..34f1bb8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -278,6 +278,16 @@ + Set the amount of time scheduler waits before allocating new + containers on work-preserving-recovery. Such wait period gives RM a chance + to settle down resyncing with NMs in the cluster on recovery before assigning + new containers to applications. + + yarn.resourcemanager.scheduler.recovery-wait-ms + 10000 + + + The class to use as the persistent store. If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 01d5064..2086b50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -101,6 +101,8 @@ void setRMApplicationHistoryWriter( ConfigurationProvider getConfigurationProvider(); boolean isWorkPreservingRecoveryEnabled(); - + int getEpoch(); -} \ No newline at end of file + + boolean isSchedulerReadyForAllocatingContainers(); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index f72ef30..e729ac5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -83,6 +83,8 @@ private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; private int epoch; + private long schedulerRecoveryStartTime = 0; + private long schedulerRecoveryWaitTime = 0; /** * Default constructor. To be used in conjunction with setter methods for @@ -366,7 +368,17 @@ public int getEpoch() { return this.epoch; } - void setEpoch(int epoch) { + void setEpoch(int epoch) { this.epoch = epoch; } -} \ No newline at end of file + + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { + this.schedulerRecoveryStartTime = System.currentTimeMillis(); + this.schedulerRecoveryWaitTime = waitTime; + } + + public boolean isSchedulerReadyForAllocatingContainers() { + return (System.currentTimeMillis() - schedulerRecoveryStartTime) + > schedulerRecoveryWaitTime; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c921ae9..cca8b90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1028,6 +1028,8 @@ public void recover(RMState state) throws Exception { // recover applications rmAppManager.recover(state); + + setSchedulerRecoveryStartAndWaitTime(state, conf); } public static void main(String argv[]) { @@ -1067,6 +1069,16 @@ private void resetDispatcher() { rmContext.setDispatcher(rmDispatcher); } + private void setSchedulerRecoveryStartAndWaitTime(RMState state, + Configuration conf) { + if (!state.getApplicationState().isEmpty()) { + long waitTime = + conf.getLong(YarnConfiguration.RM_SCHEDULER_RECOVERY_WAIT_MS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_RECOVERY_WAIT_MS); + rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime); + } + } + /** * Retrieve RM bind address from configuration * diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 92727e3..703b409 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -808,6 +808,11 @@ private synchronized void nodeUpdate(RMNode nm) { } private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); + return; + } // Assign new containers... // 1. Check for reserved applications diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 7a4e79c..1b78d07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1011,6 +1011,12 @@ public int compare(NodeId n1, NodeId n2) { } private synchronized void attemptScheduling(FSSchedulerNode node) { + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); + return; + } + // Assign new containers... // 1. Check for reserved applications // 2. Schedule if there are no reservations diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b017db7..aa95523 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -715,6 +715,13 @@ private synchronized void nodeUpdate(RMNode rmNode) { completedContainer, RMContainerEventType.FINISHED); } + + if (rmContext.isWorkPreservingRecoveryEnabled() + && !rmContext.isSchedulerReadyForAllocatingContainers()) { + LOG.info("Skip allocating containers. Scheduler is waiting for recovery."); + return; + } + if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, node.getAvailableResource(),minimumAllocation)) { LOG.debug("Node heartbeat " + rmNode.getNodeID() + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index cfd05f9..b1a7c12 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -49,7 +49,7 @@ private volatile int responseId = 0; private final ApplicationAttemptId attemptId; - private final RMContext context; + private RMContext context; private ApplicationMasterProtocol amRMProtocol; private final List requests = new ArrayList(); @@ -66,6 +66,11 @@ void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) { this.amRMProtocol = amRMProtocol; } + void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol, RMContext context) { + this.context = context; + this.amRMProtocol = amRMProtocol; + } + public void waitForState(RMAppAttemptState finalState) throws Exception { RMApp app = context.getRMApps().get(attemptId.getApplicationId()); RMAppAttempt attempt = app.getRMAppAttempt(attemptId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java index fb5c3a3..dcf9187 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -34,8 +34,11 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -559,8 +562,8 @@ public void testAppReregisterOnRMWorkPreservingRestart() throws Exception { rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED); - am0.setAMRMProtocol(rm2.getApplicationMasterService()); - am0.registerAppAttempt(false); + am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am0.registerAppAttempt(true); rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING); @@ -635,4 +638,48 @@ private void waitForNumContainersToRecover(int num, MockRM rm, Thread.sleep(200); } } + + @Test (timeout = 20000) + public void testNewContainersNotAllocatedDuringSchedulerRecovery() + throws Exception { + conf.setLong(YarnConfiguration.RM_SCHEDULER_RECOVERY_WAIT_MS, 4000); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // Restart RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(); + + am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); + am1.registerAppAttempt(true); + rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + // AM request for containers + am1.allocate("127.0.0.1", 1000, 1, new ArrayList()); + + List containers = new ArrayList(); + long startTime = System.currentTimeMillis(); + do { + nm1.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + // New containers should not be allocated when scheduler is recovering. + if (System.currentTimeMillis() - startTime < 2000) { + Assert.assertTrue(containers.isEmpty()); + } + System.out.println("Elapsed time: " + + (System.currentTimeMillis() - startTime) + ", Container size : " + + containers.size()); + } while (containers.size() != 1); + } }