diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..5d22d82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -1192,4 +1192,11 @@ public RMStateStoreState getRMStateStoreState() { this.readLock.unlock(); } } + + @VisibleForTesting + public void resetDispatcher(AsyncDispatcher asyncDispatcher) { + dispatcher = asyncDispatcher; + dispatcher.register(RMStateStoreEventType.class, + new ForwardingEventHandler()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 03a9645..858c90f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -635,6 +636,9 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); + DrainDispatcher stateStoreDispatcher = new DrainDispatcher(); + stateStoreDispatcher.init(conf); + memStore.resetDispatcher(stateStoreDispatcher); MockRM rm1 = new MockRM(conf, memStore); rm1.start(); @@ -675,9 +679,6 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); - //Wait to make sure attempt1 be removed in State Store - //TODO explore a better way than sleeping for a while (YARN-4929) - Thread.sleep(15 * 1000); // launch the second attempt rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -711,6 +712,14 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { memStore.getState().getApplicationState().get(app1.getApplicationId()); Assert.assertEquals(1, app1State.getFirstAttemptId()); + // Set recovered app1's systemClock + RMApp recoveredApp = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + ((RMAppImpl)recoveredApp).setSystemClock(clock); + + // wait for 10 seconds + clock.setTime(System.currentTimeMillis() + 10*1000); + // re-register the NM nm1.setResourceTrackerService(rm2.getResourceTrackerService()); NMContainerStatus status = Records.newRecord(NMContainerStatus.class); @@ -722,9 +731,8 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { nm1.registerNode(Collections.singletonList(status), null); rm2.waitForState(attempt3.getAppAttemptId(), RMAppAttemptState.FAILED); - //Wait to make sure attempt3 be removed in State Store - //TODO explore a better way than sleeping for a while (YARN-4929) - Thread.sleep(15 * 1000); + // Wait to make sure attempt1 be removed in State Store + waitForEventsProcessed(rm2, stateStoreDispatcher); Assert.assertEquals(2, app1State.getAttemptCount()); rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); @@ -739,6 +747,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { nm1 .nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm2.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitForEventsProcessed(rm2, stateStoreDispatcher); Assert.assertEquals(2, app1State.getAttemptCount()); // can launch the 5th attempt successfully @@ -746,13 +755,13 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { MockAM am5 = rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1); - clock.reset(); rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.RUNNING); // Fail attempt5 normally nm1 .nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE); rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED); + waitForEventsProcessed(rm2, stateStoreDispatcher); Assert.assertEquals(2, app1State.getAttemptCount()); rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED); @@ -760,6 +769,12 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { rm2.stop(); } + private void waitForEventsProcessed(MockRM rm, + DrainDispatcher stateStoreDispatcher) { + rm.drainEvents(); + stateStoreDispatcher.await(); + } + private boolean isContainerIdInContainerStatus( List containerStatuses, ContainerId containerId) { for (ContainerStatus status : containerStatuses) {