diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 44a6fc3..acc4a05 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -353,6 +353,11 @@
public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
false;
+ public static final String RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
+ RM_PREFIX + "work-preserving-recovery.scheduling-wait-ms";
+ public static final long DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS =
+ 10000;
+
/** Zookeeper interaction configs */
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 3a7e94a..a2c3fd0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -298,6 +298,16 @@
+ Set the amount of time RM waits before allocating new
+ containers on work-preserving-recovery. Such wait period gives RM a chance
+ to settle down resyncing with NMs in the cluster on recovery, before assigning
+ new containers to applications.
+
+ yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms
+ 10000
+
+
+
The class to use as the persistent store.
If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 46ef432..60f88f6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -108,4 +108,6 @@ void setRMApplicationHistoryWriter(
boolean isWorkPreservingRecoveryEnabled();
long getEpoch();
-}
\ No newline at end of file
+
+ boolean isSchedulerReadyForAllocatingContainers();
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 8a9b51e..36eec04 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -21,6 +21,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
@@ -44,6 +47,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
@@ -85,6 +90,13 @@
private SystemMetricsPublisher systemMetricsPublisher;
private ConfigurationProvider configurationProvider;
private long epoch;
+ private Clock systemClock = new SystemClock();
+ private long schedulerRecoveryStartTime = 0;
+ private long schedulerRecoveryWaitTime = 0;
+ private boolean printLog = true;
+ private boolean isSchedulerReady = false;
+
+ private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
/**
* Default constructor. To be used in conjunction with setter methods for
@@ -379,7 +391,34 @@ public long getEpoch() {
return this.epoch;
}
- void setEpoch(long epoch) {
+ void setEpoch(long epoch) {
this.epoch = epoch;
}
-}
\ No newline at end of file
+
+ public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
+ this.schedulerRecoveryStartTime = systemClock.getTime();
+ this.schedulerRecoveryWaitTime = waitTime;
+ }
+
+ public boolean isSchedulerReadyForAllocatingContainers() {
+ if (isSchedulerReady) {
+ return isSchedulerReady;
+ }
+ isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
+ > schedulerRecoveryWaitTime;
+ if (!isSchedulerReady && printLog) {
+ LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
+ printLog = false;
+ }
+ if (isSchedulerReady) {
+ LOG.info("Scheduler recovery is done. Start allocating new containers.");
+ }
+ return isSchedulerReady;
+ }
+
+ @Private
+ @VisibleForTesting
+ public void setSystemClock(Clock clock) {
+ this.systemClock = clock;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 0def615..79af7a6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1131,6 +1131,8 @@ public void recover(RMState state) throws Exception {
// recover applications
rmAppManager.recover(state);
+
+ setSchedulerRecoveryStartAndWaitTime(state, conf);
}
public static void main(String argv[]) {
@@ -1178,6 +1180,16 @@ private void resetDispatcher() {
rmContext.setDispatcher(rmDispatcher);
}
+ private void setSchedulerRecoveryStartAndWaitTime(RMState state,
+ Configuration conf) {
+ if (!state.getApplicationState().isEmpty()) {
+ long waitTime =
+ conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
+ YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
+ rmContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
+ }
+ }
+
/**
* Retrieve RM bind address from configuration
*
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6b810d7..bdfc819 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -902,6 +902,10 @@ private synchronized void updateNodeAndQueueResource(RMNode nm,
}
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+ if (rmContext.isWorkPreservingRecoveryEnabled()
+ && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+ return;
+ }
// Assign new containers...
// 1. Check for reserved applications
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 9c40d48..296d884 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1015,6 +1015,11 @@ public int compare(NodeId n1, NodeId n2) {
}
private synchronized void attemptScheduling(FSSchedulerNode node) {
+ if (rmContext.isWorkPreservingRecoveryEnabled()
+ && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+ return;
+ }
+
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index d72e796..ea21c2b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -702,6 +702,12 @@ private synchronized void nodeUpdate(RMNode rmNode) {
completedContainer, RMContainerEventType.FINISHED);
}
+
+ if (rmContext.isWorkPreservingRecoveryEnabled()
+ && !rmContext.isSchedulerReadyForAllocatingContainers()) {
+ return;
+ }
+
if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(),minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 02983c2..f1b5f14 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -37,10 +37,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -62,6 +64,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -479,6 +483,7 @@ private void checkParentQueue(ParentQueue parentQueue, int numContainers,
@Test(timeout = 20000)
public void testAMfailedBetweenRMRestart() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
+ conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
@@ -762,4 +767,55 @@ public static void waitForNumContainersToRecover(int num, MockRM rm,
Thread.sleep(200);
}
}
+
+ @Test (timeout = 20000)
+ public void testNewContainersNotAllocatedDuringSchedulerRecovery()
+ throws Exception {
+ conf.setLong(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 4000);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // Restart RM
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode();
+ ControlledClock clock = new ControlledClock(new SystemClock());
+ long startTime = System.currentTimeMillis();
+ ((RMContextImpl)rm2.getRMContext()).setSystemClock(clock);
+ am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+ am1.registerAppAttempt(true);
+ rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ // AM request for new containers
+ am1.allocate("127.0.0.1", 1000, 1, new ArrayList());
+
+ List containers = new ArrayList();
+ clock.setTime(startTime + 2000);
+ nm1.nodeHeartbeat(true);
+
+ // sleep some time as allocation happens asynchronously.
+ Thread.sleep(3000);
+ containers.addAll(am1.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ // container is not allocated during scheduling recovery.
+ Assert.assertTrue(containers.isEmpty());
+
+ clock.setTime(startTime + 8000);
+ nm1.nodeHeartbeat(true);
+ // Container is created after recovery is done.
+ while (containers.isEmpty()) {
+ containers.addAll(am1.allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(500);
+ }
+ }
}