diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..8548cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -660,7 +660,7 @@ public void setRMDispatcher(Dispatcher dispatcher) { @Override protected void serviceInit(Configuration conf) throws Exception{ // create async handler - dispatcher = new AsyncDispatcher(); + dispatcher = createDispatcher(); dispatcher.init(conf); dispatcher.register(RMStateStoreEventType.class, new ForwardingEventHandler()); @@ -668,6 +668,10 @@ protected void serviceInit(Configuration conf) throws Exception{ initInternal(conf); } + protected AsyncDispatcher createDispatcher() { + return new AsyncDispatcher(); + } + @Override protected void serviceStart() throws Exception { dispatcher.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 03a9645..e814345 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -622,6 +624,20 @@ public void testRMRestartOrFailoverNotCountedForAMFailures() rm2.stop(); } + class MockMemoryRMStateStore extends MemoryRMStateStore { + private DrainDispatcher drainDispatcher; + + @Override + protected AsyncDispatcher createDispatcher() { + drainDispatcher = new DrainDispatcher(); + return drainDispatcher; + } + + public DrainDispatcher getDrainDispatcher() { + return drainDispatcher; + } + } + @Test (timeout = 120000) public void testRMAppAttemptFailuresValidityInterval() throws Exception { YarnConfiguration conf = new YarnConfiguration(); @@ -633,8 +649,9 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); // explicitly set max-am-retry count as 2. conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); - MemoryRMStateStore memStore = new MemoryRMStateStore(); + MockMemoryRMStateStore memStore = new MockMemoryRMStateStore(); memStore.init(conf); + DrainDispatcher stateStoreDispatcher = memStore.getDrainDispatcher(); MockRM rm1 = new MockRM(conf, memStore); rm1.start(); @@ -675,9 +692,6 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); - //Wait to make sure attempt1 be removed in State Store - //TODO explore a better way than sleeping for a while (YARN-4929) - Thread.sleep(15 * 1000); // launch the second attempt rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -711,6 +725,14 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { memStore.getState().getApplicationState().get(app1.getApplicationId()); Assert.assertEquals(1, app1State.getFirstAttemptId()); + // Set recovered app1's systemClock + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + ((RMAppImpl)recoveredApp).setSystemClock(clock); + + // wait for 10 seconds + clock.setTime(System.currentTimeMillis() + 10*1000); + // re-register the NM nm1.setResourceTrackerService(rm2.getResourceTrackerService()); NMContainerStatus status = Records.newRecord(NMContainerStatus.class); @@ -722,9 +744,8 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { nm1.registerNode(Collections.singletonList(status), null); rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); - //Wait to make sure attempt3 be removed in State Store - //TODO explore a better way than sleeping for a while (YARN-4929) - Thread.sleep(15 * 1000); + // Wait to make sure attempt1 be removed in State Store + waitForEventsProcessed(rm2, stateStoreDispatcher); Assert.assertEquals(2, app1State.getAttemptCount()); rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -739,6 +760,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { nm1 .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm2.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitForEventsProcessed(rm2, stateStoreDispatcher); Assert.assertEquals(2, app1State.getAttemptCount()); // can launch the 5th attempt successfully @@ -746,13 +768,13 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { MockAM am5 = rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1); - clock.reset(); rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.RUNNING); // Fail attempt5 normally nm1 .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitForEventsProcessed(rm2, stateStoreDispatcher); Assert.assertEquals(2, app1State.getAttemptCount()); rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); @@ -760,6 +782,12 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { rm2.stop(); } + private void waitForEventsProcessed(MockRM rm, + DrainDispatcher stateStoreDispatcher) { + rm.drainEvents(); + stateStoreDispatcher.await(); + } + private boolean isContainerIdInContainerStatus( List containerStatuses, ContainerId containerId) { for (ContainerStatus status : containerStatuses) {