diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d392410..19419a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -817,6 +817,21 @@ public void handle(RMAppAttemptEvent event) { LOG.error("Error in handling event type " + event.getType() + " for applicationAttempt " + appAttemptId, t); } + } else { + RMAppAttempt dummyAppAttempt = rmApp.getDummyAppAttempt(); + if (dummyAppAttempt != null) { + try { + LOG.info("Event handled by dummyAppAttempt: " + event); + dummyAppAttempt.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " for applicationAttempt " + appAttemptId + + " with dummyAttempt", t); + } + } else { + LOG.error("Could not handle event " + event.getType() + + " for applicationAttempt " + appAttemptId); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index bd24b25..902244b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -230,6 +230,11 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { } RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + if (rmAppAttempt == null) { + LOG.info("Ignoring not found attempt " + appAttemptId); + return; + } + Container masterContainer = rmAppAttempt.getMasterContainer(); if (masterContainer.getId().equals(containerStatus.getContainerId()) && containerStatus.getContainerState() == ContainerState.COMPLETE) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index a1cebf5..021ca36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -482,6 +482,18 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) + throws Exception { + Path appDirPath = + getAppDir(rmAppRoot, appAttemptId.getApplicationId()); + Path nodeRemovePath = getNodePath(appDirPath, appAttemptId.toString()); + LOG.info("Removing info for attempt: " + appAttemptId + " at: " + + nodeRemovePath); + deleteFileWithRetries(nodeRemovePath); + } + + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index afc6721..045e258 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -575,6 +575,28 @@ protected void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) + throws IOException { + String attemptKey = getApplicationAttemptNodeKey(attemptId); + try { + WriteBatch batch = db.createWriteBatch(); + try { + batch.delete(bytes(attemptKey)); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing state for attempt " + attemptId + " at " + + attemptKey); + } + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override protected void removeApplicationStateInternal(ApplicationStateData appState) throws IOException { ApplicationId appId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index ce6addb..caaea7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -142,6 +142,19 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) throws Exception { + ApplicationStateData appState = + state.getApplicationState().get(appAttemptId.getApplicationId()); + ApplicationAttemptStateData attemptState = + appState.attempts.remove(appAttemptId); + LOG.info("Removing state for attempt: " + appAttemptId); + if (attemptState == null) { + throw new YarnRuntimeException("Application doesn't exist"); + } + } + + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { ApplicationId appId = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 96f77f5..f6fd6fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -132,6 +132,12 @@ protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemp } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) throws Exception { + // Do nothing + } + + @Override public void checkVersion() throws Exception { // Do nothing } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index ec42cbe..ae17aaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -137,6 +137,10 @@ new UpdateAppAttemptTransition()) .addTransition(RMStateStoreState.ACTIVE, EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), + RMStateStoreEventType.REMOVE_APP_ATTEMPT, + new RemoveAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, + EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED), RMStateStoreEventType.STORE_MASTERKEY, new StoreRMDTMasterKeyTransition()) .addTransition(RMStateStoreState.ACTIVE, @@ -552,6 +556,32 @@ private static RMStateStoreState finalState(boolean isFenced) { return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE; } + private static class RemoveAppAttemptTransition implements + MultipleArcTransition { + @Override + public RMStateStoreState transition(RMStateStore store, + RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreRemoveAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return RMStateStoreState.ACTIVE; + } + boolean isFenced = false; + ApplicationAttemptId attemptId = + ((RMStateStoreRemoveAppAttemptEvent) event).getApplicationAttemptId(); + ApplicationId appId = attemptId.getApplicationId(); + LOG.info("Removing attempt " + attemptId + " from app: " + appId); + try { + store.removeApplicationAttemptInternal(attemptId); + } catch (Exception e) { + LOG.error("Error removing attempt: " + attemptId, e); + isFenced = store.notifyStoreOperationFailedInternal(e); + } + return finalState(isFenced); + } + } + public RMStateStore() { super(RMStateStore.class.getName()); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -983,6 +1013,29 @@ public void removeApplication(RMApp app) { protected abstract void removeApplicationStateInternal( ApplicationStateData appState) throws Exception; + /** + * Non-blocking API + * ResourceManager services call this to remove an attempt from the state + * store + * This does not block the dispatcher threads + * There is no notification of completion for this operation. + */ + @SuppressWarnings("unchecked") + public synchronized void removeApplicationAttempt( + ApplicationAttemptId applicationAttemptId) { + dispatcher.getEventHandler().handle( + new RMStateStoreRemoveAppAttemptEvent(applicationAttemptId)); + } + + /** + * Blocking API + * Derived classes must implement this method to remove the state of specified + * attempt. + */ + protected abstract void removeApplicationAttemptInternal( + ApplicationAttemptId attemptId) throws Exception; + + // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-1779 public static final Text AM_RM_TOKEN_SERVICE = new Text( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 492826d..b34634d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -24,6 +24,7 @@ UPDATE_APP, UPDATE_APP_ATTEMPT, REMOVE_APP, + REMOVE_APP_ATTEMPT, FENCED, // Below events should be called synchronously diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java similarity index 62% copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java index 492826d..7455c39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreRemoveAppAttemptEvent.java @@ -18,21 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -public enum RMStateStoreEventType { - STORE_APP_ATTEMPT, - STORE_APP, - UPDATE_APP, - UPDATE_APP_ATTEMPT, - REMOVE_APP, - FENCED, +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; - // Below events should be called synchronously - STORE_MASTERKEY, - REMOVE_MASTERKEY, - STORE_DELEGATION_TOKEN, - REMOVE_DELEGATION_TOKEN, - UPDATE_DELEGATION_TOKEN, - UPDATE_AMRM_TOKEN, - STORE_RESERVATION, - REMOVE_RESERVATION, +/** + * A event used to remove an attempt. + */ +public class RMStateStoreRemoveAppAttemptEvent extends RMStateStoreEvent { + private ApplicationAttemptId applicationAttemptId; + + RMStateStoreRemoveAppAttemptEvent(ApplicationAttemptId applicationAttemptId) { + super(RMStateStoreEventType.REMOVE_APP_ATTEMPT); + this.applicationAttemptId = applicationAttemptId; + } + + public ApplicationAttemptId getApplicationAttemptId() { + return applicationAttemptId; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index ca0f4ac..ddb8a0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -659,6 +659,22 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override + public synchronized void removeApplicationAttemptInternal( + ApplicationAttemptId appAttemptId) + throws Exception { + String appId = appAttemptId.getApplicationId().toString(); + String appIdRemovePath = getNodePath(rmAppRoot, appId); + String attemptIdRemovePath = getNodePath(appIdRemovePath, + appAttemptId.toString()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Removing info for attempt: " + appAttemptId + " at: " + + attemptIdRemovePath); + } + safeDelete(attemptIdRemovePath); + } + + @Override public synchronized void removeApplicationStateInternal( ApplicationStateData appState) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 1d199ed..5137bdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -80,6 +80,16 @@ public ApplicationAttemptStateData getAttempt( return attempts.get(attemptId); } + public int getAttemptStartId() { + int min = Integer.MAX_VALUE; + for(ApplicationAttemptId attemptId : attempts.keySet()) { + if (attemptId.getAttemptId() < min) { + min = attemptId.getAttemptId(); + } + } + return min == Integer.MAX_VALUE ? 0 : min - 1; + } + public abstract ApplicationStateDataProto getProto(); /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index bb0fc34..88ac051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -115,6 +115,12 @@ RMAppAttempt getCurrentAppAttempt(); /** + * A dummy appAttempt used to handle event for those removed from attempts. + * @return a dummy appAttempt to handle event for those removed attempts. + */ + RMAppAttempt getDummyAppAttempt(); + + /** * {@link RMApp} can have multiple application attempts {@link RMAppAttempt}. * This method returns the all {@link RMAppAttempt}s for the RMApp. * @return all {@link RMAppAttempt}s for the RMApp. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 7a2b717..cbdcb12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -149,8 +149,16 @@ private long startTime; private long finishTime = 0; private long storedFinishTime = 0; + private int currentAttemptId = 0; // This field isn't protected by readlock now. private volatile RMAppAttempt currentAttempt; + /** + * For work-preserving AM restart, failed attempt are still capturing + * CONTAINER_FINISHED event and record the finished containers for the + * use by the next new attempt. It is used to handle CONTAINER_FINISHED + * event for those removed attempts. + */ + private RMAppAttempt dummyAttempt; private String queue; private EventHandler handler; private static final AppFinishedTransition FINISHED_TRANSITION = @@ -557,6 +565,11 @@ public RMAppAttempt getCurrentAppAttempt() { } @Override + public RMAppAttempt getDummyAppAttempt() { + return this.dummyAttempt; + } + + @Override public Map getAppAttempts() { this.readLock.lock(); @@ -803,6 +816,7 @@ public void recover(RMState state) { this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); + this.currentAttemptId = appState.getAttemptStartId(); // send the ATS create Event sendATSCreateEvent(this, this.startTime); @@ -810,13 +824,16 @@ public void recover(RMState state) { for(int i=0; i= app.maxAppAttempts) { @@ -1334,6 +1355,38 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.FINAL_SAVING; } } + + private void removeAttemptsBeyondValidityInterval(RMAppImpl app) { + if (app.attemptFailuresValidityInterval <= 0) { + return; + } + + while (!app.attempts.isEmpty()) { + // attempts' first element is oldest attempt because it is a + // LinkedHashMap + RMAppAttempt oldestAttempt = app.attempts.values().iterator().next(); + LOG.info("************** " + oldestAttempt.getAppAttemptId() + " finished at " + oldestAttempt.getFinishTime()); + LOG.info("sysclock time: " + app.systemClock.getTime()); + LOG.info("interval: " + app.attemptFailuresValidityInterval); + // If oldest attempt is not beyond validity interval, just break + if (oldestAttempt.getFinishTime() > app.systemClock.getTime() + - app.attemptFailuresValidityInterval) { + break; + } + removeAppAttempt(app, oldestAttempt); + } + } + + // Remove the attempt in app.attempts, and set dummyAttempt if needed. + private void removeAppAttempt(RMAppImpl app, RMAppAttempt appAttempt) { + ApplicationAttemptId attemptId = appAttempt.getAppAttemptId(); + LOG.info("Remove attempt from state store : " + attemptId); + if (app.dummyAttempt == null) { + app.dummyAttempt = app.attempts.get(attemptId); + } + app.attempts.remove(attemptId); + app.rmContext.getStateStore().removeApplicationAttempt(attemptId); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 560a305..27a00f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -103,6 +103,10 @@ public RMAppAttempt getCurrentAppAttempt() { throw new UnsupportedOperationException("Not supported yet."); } @Override + public RMAppAttempt getDummyAppAttempt() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public Map getAppAttempts() { throw new UnsupportedOperationException("Not supported yet."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index acacc40..818ecd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -847,7 +847,11 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { // can launch the third attempt successfully rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - Assert.assertEquals(3, app1.getAppAttempts().size()); + // attempt 1 has been deleted from attempts because its finish time is + // beyond validity interval + Assert.assertEquals(2, app1.getAppAttempts().size()); + Assert.assertEquals(1, memStore.getState().getApplicationState().get( + app1.getApplicationId()).getAttemptCount()); RMAppAttempt attempt3 = app1.getCurrentAppAttempt(); clock.reset(); MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -858,6 +862,10 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { MockRM rm2 = new MockRM(conf, memStore); rm2.start(); + app1 = (RMAppImpl)rm2.getRMContext().getRMApps() + .get(app1.getApplicationId()); + app1.setSystemClock(clock); + // re-register the NM nm1.setResourceTrackerService(rm2.getResourceTrackerService()); NMContainerStatus status = Records.newRecord(NMContainerStatus.class); @@ -874,7 +882,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { // Lauch Attempt 4 MockAM am4 = - rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 4, nm1); + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1); // wait for 10 seconds clock.setTime(System.currentTimeMillis() + 10*1000); @@ -887,7 +895,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception { rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); MockAM am5 = - rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 5, nm1); + rm2.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 2, nm1); clock.reset(); am5.waitForState(RMAppAttemptState.RUNNING); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 35fd0c3..e4adfe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -151,6 +151,11 @@ public RMAppAttempt getCurrentAppAttempt() { return attempt; } + @Override + public RMAppAttempt getDummyAppAttempt() { + throw new UnsupportedOperationException("Not supported yet."); + } + public void setCurrentAppAttempt(RMAppAttempt attempt) { this.attempt = attempt; }