diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0480cab..839765c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -342,7 +342,16 @@ public static final String RM_MAX_COMPLETED_APPLICATIONS = RM_PREFIX + "max-completed-applications"; public static final int DEFAULT_RM_MAX_COMPLETED_APPLICATIONS = 10000; - + + /** + * The maximum number of completed applications RM state store keeps, by + * default equals to DEFAULT_RM_MAX_COMPLETED_APPLICATIONS + */ + public static final String RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = + RM_PREFIX + "state-store.max-completed-applications"; + public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = + DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + /** Default application name */ public static final String DEFAULT_APPLICATION_NAME = "N/A"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index d798f4c..c43dc1a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -276,6 +276,21 @@ + The maximum number of completed applications RM state + store keeps, less than or equals to ${yarn.resourcemanager.max-completed-applications}. + By default, it equals to ${yarn.resourcemanager.max-completed-applications}. + This ensures that the applications kept in the state store are consistent with + the applications remembered in RM memory. + Any values larger than ${yarn.resourcemanager.max-completed-applications} will + be reset to ${yarn.resourcemanager.max-completed-applications}. + Note that this value impacts the RM recovery performance.Typically, + a smaller value indicates better performance on RM recovery. + + yarn.resourcemanager.state-store.max-completed-applications + ${yarn.resourcemanager.max-completed-applications} + + + Host:Port of the ZooKeeper server where RM state will be stored. This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4dfa3ba..812ebeb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -65,7 +65,9 @@ private static final Log LOG = LogFactory.getLog(RMAppManager.class); - private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + private int maxCompletedAppsInMemory; + private int maxCompletedAppsInStateStore; + protected int completedAppsInStateStore = 0; private LinkedList completedApps = new LinkedList(); private final RMContext rmContext; @@ -82,9 +84,16 @@ public RMAppManager(RMContext context, this.masterService = masterService; this.applicationACLsManager = applicationACLsManager; this.conf = conf; - setCompletedAppsMax(conf.getInt( + this.maxCompletedAppsInMemory = conf.getInt( YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, - YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS)); + YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS); + this.maxCompletedAppsInStateStore = + conf.getInt( + YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, + YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS); + if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) { + this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory; + } } /** @@ -173,10 +182,6 @@ public void logApplicationSummary(ApplicationId appId) { ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId)); } - protected synchronized void setCompletedAppsMax(int max) { - this.completedAppsMax = max; - } - protected synchronized int getCompletedAppsListSize() { return this.completedApps.size(); } @@ -190,7 +195,8 @@ protected synchronized void finishApplication(ApplicationId applicationId) { rmContext.getDelegationTokenRenewer().applicationFinished(applicationId); } - completedApps.add(applicationId); + completedApps.add(applicationId); + completedAppsInStateStore++; writeAuditLog(applicationId); } } @@ -229,15 +235,25 @@ protected void writeAuditLog(ApplicationId appId) { * check to see if hit the limit for max # completed apps kept */ protected synchronized void checkAppNumCompletedLimit() { - while (completedApps.size() > this.completedAppsMax) { - ApplicationId removeId = completedApps.remove(); + // check apps kept in state store. + while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) { + ApplicationId removeId = + completedApps.get(completedApps.size() - completedAppsInStateStore); + RMApp removeApp = rmContext.getRMApps().get(removeId); + rmContext.getStateStore().removeApplication(removeApp); + completedAppsInStateStore--; + } + + // check apps kept in memorty. + while (completedApps.size() > this.maxCompletedAppsInMemory) { + ApplicationId removeId = completedApps.remove(); LOG.info("Application should be expired, max # apps" - + " met. Removing app: " + removeId); + + " met. Removing app: " + removeId); rmContext.getRMApps().remove(removeId); this.applicationACLsManager.removeApplication(removeId); } } - + @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, @@ -380,8 +396,6 @@ public void recover(RMState state) throws Exception { Map appStates = state.getApplicationState(); LOG.info("Recovering " + appStates.size() + " applications"); for (ApplicationState appState : appStates.values()) { - LOG.info("Recovering application " + appState.getAppId()); - submitApplication(appState.getApplicationSubmissionContext(), appState.getSubmitTime(), appState.getUser(), true, state); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 23cefd3..88b1a90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -167,7 +167,9 @@ private void loadRMAppState(RMState rmState) throws Exception { readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application - LOG.info("Loading application from node: " + childNodeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application from node: " + childNodeName); + } ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl( @@ -185,7 +187,10 @@ private void loadRMAppState(RMState rmState) throws Exception { } else if (childNodeName .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { // attempt - LOG.info("Loading application attempt from node: " + childNodeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application attempt from node: " + + childNodeName); + } ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(childNodeName); ApplicationAttemptStateDataPBImpl attemptStateData = @@ -225,6 +230,7 @@ private void loadRMAppState(RMState rmState) throws Exception { assert appState != null; appState.attempts.put(attemptState.getAttemptId(), attemptState); } + LOG.info("Done Loading applications from FS state store"); } catch (Exception e) { LOG.error("Failed to load state.", e); throw e; @@ -362,7 +368,7 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) + public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index d5ff5ed..961bec3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -171,8 +171,8 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) - throws Exception { + public synchronized void removeApplicationStateInternal( + ApplicationState appState) throws Exception { ApplicationId appId = appState.getAppId(); ApplicationState removed = state.appState.remove(appId); if (removed == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index c212c1f..3098b26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -63,7 +63,7 @@ protected void storeApplicationAttemptStateInternal(String attemptId, } @Override - protected void removeApplicationState(ApplicationState appState) + protected void removeApplicationStateInternal(ApplicationState appState) throws Exception { // Do nothing } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5e0e944..a845264 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -519,6 +518,7 @@ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey) * This does not block the dispatcher threads * There is no notification of completion for this operation. */ + @SuppressWarnings("unchecked") public synchronized void removeApplication(RMApp app) { ApplicationState appState = new ApplicationState( app.getSubmitTime(), app.getStartTime(), @@ -532,14 +532,6 @@ public synchronized void removeApplication(RMApp app) { appState.attempts.put(attemptState.getAttemptId(), attemptState); } - removeApplication(appState); - } - - @SuppressWarnings("unchecked") - /** - * Non-Blocking API - */ - public synchronized void removeApplication(ApplicationState appState) { dispatcher.getEventHandler().handle(new RMStateStoreRemoveAppEvent(appState)); } @@ -548,8 +540,8 @@ public synchronized void removeApplication(ApplicationState appState) { * Derived classes must implement this method to remove the state of an * application and its attempts */ - protected abstract void removeApplicationState(ApplicationState appState) - throws Exception; + protected abstract void removeApplicationStateInternal( + ApplicationState appState) throws Exception; // TODO: This should eventually become cluster-Id + "AM_RM_TOKEN_SERVICE". See // YARN-986 @@ -666,11 +658,9 @@ protected void handleStoreEvent(RMStateStoreEvent event) { ApplicationState appState = ((RMStateStoreRemoveAppEvent) event).getAppState(); ApplicationId appId = appState.getAppId(); - Exception removedException = null; LOG.info("Removing info for app: " + appId); try { - removeApplicationState(appState); - notifyDoneRemovingApplcation(appId, removedException); + removeApplicationStateInternal(appState); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); notifyStoreOperationFailed(e); @@ -738,17 +728,6 @@ private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId new RMAppAttemptUpdateSavedEvent(attemptId, updatedException)); } - @SuppressWarnings("unchecked") - /** - * This is to notify RMApp that this application has been removed from - * RMStateStore - */ - private void notifyDoneRemovingApplcation(ApplicationId appId, - Exception removedException) { - rmDispatcher.getEventHandler().handle( - new RMAppRemovedEvent(appId, removedException)); - } - /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index f419ff0..8737781 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -392,7 +392,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { byte[] childData = getDataWithRetries(childNodePath, true); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { // application - LOG.info("Loading application from znode: " + childNodeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application from znode: " + childNodeName); + } ApplicationId appId = ConverterUtils.toApplicationId(childNodeName); ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl( @@ -412,7 +414,9 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { } else if (childNodeName .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { // attempt - LOG.info("Loading application attempt from znode: " + childNodeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application attempt from znode: " + childNodeName); + } ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(childNodeName); ApplicationAttemptStateDataPBImpl attemptStateData = @@ -456,10 +460,10 @@ private synchronized void loadRMAppState(RMState rmState) throws Exception { LOG.info("Application node not found for attempt: " + attemptState.getAttemptId()); deleteWithRetries( - getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), - 0); + getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1); } } + LOG.info("Done Loading applications from ZK state store"); } @Override @@ -517,16 +521,16 @@ public synchronized void updateApplicationAttemptStateInternal( } @Override - public synchronized void removeApplicationState(ApplicationState appState) + public synchronized void removeApplicationStateInternal(ApplicationState appState) throws Exception { String appId = appState.getAppId().toString(); String nodeRemovePath = getNodePath(rmAppRoot, appId); ArrayList opList = new ArrayList(); - opList.add(Op.delete(nodeRemovePath, 0)); + opList.add(Op.delete(nodeRemovePath, -1)); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString()); - opList.add(Op.delete(attemptRemovePath, 0)); + opList.add(Op.delete(attemptRemovePath, -1)); } if (LOG.isDebugEnabled()) { LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath @@ -569,7 +573,7 @@ protected synchronized void storeRMDelegationTokenAndSequenceNumberState( } if (dtSequenceNumberPath != null) { - opList.add(Op.delete(dtSequenceNumberPath, 0)); + opList.add(Op.delete(dtSequenceNumberPath, -1)); } opList.add(Op.create(latestSequenceNumberPath, null, zkAcl, CreateMode.PERSISTENT)); @@ -587,7 +591,7 @@ protected synchronized void removeRMDelegationTokenState( LOG.debug("Removing RMDelegationToken_" + rmDTIdentifier.getSequenceNumber()); } - deleteWithRetries(nodeRemovePath, 0); + deleteWithRetries(nodeRemovePath, -1); } @Override @@ -619,7 +623,7 @@ protected synchronized void removeRMDTMasterKeyState( if (LOG.isDebugEnabled()) { LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId()); } - deleteWithRetries(nodeRemovePath, 0); + deleteWithRetries(nodeRemovePath, -1); } // ZK related code diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 76d59ec..5a70cc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -660,32 +660,34 @@ public void transition(RMAppImpl app, RMAppEvent event) { @SuppressWarnings("unchecked") private static final class RMAppRecoveredTransition implements MultipleArcTransition { - + @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { + /* + * If last attempt recovered final state is null .. it means attempt was + * started but AM container may or may not have started / finished. + * Therefore we should wait for it to finish. + */ + for (RMAppAttempt attempt : app.getAppAttempts().values()) { + app.dispatcher.getEventHandler().handle( + new RMAppAttemptEvent(attempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + + // The app has completed. + if (app.recoveredFinalState != null) { + FINAL_TRANSITION.transition(app, event); + return app.recoveredFinalState; + } + // No existent attempts means the attempt associated with this app was not + // started or started but not yet saved。 if (app.attempts.isEmpty()) { - // Saved application was not running any attempts. app.createNewAttempt(true); - return RMAppState.SUBMITTED; - } else { - /* - * If last attempt recovered final state is null .. it means attempt - * was started but AM container may or may not have started / finished. - * Therefore we should wait for it to finish. - */ - for (RMAppAttempt attempt : app.getAppAttempts().values()) { - app.dispatcher.getEventHandler().handle( - new RMAppAttemptEvent(attempt.getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); - } - if (app.recoveredFinalState != null) { - FINAL_TRANSITION.transition(app, event); - return app.recoveredFinalState; - } else { - return RMAppState.RUNNING; - } + return RMAppState.SUBMITTED; } + + return RMAppState.RUNNING; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java deleted file mode 100644 index a030dbe..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -public class RMAppRemovedEvent extends RMAppEvent { - - private final Exception removedException; - - public RMAppRemovedEvent(ApplicationId appId, Exception removedException) { - super(appId, RMAppEventType.APP_REMOVED); - this.removedException = removedException; - } - - public Exception getRemovedException() { - return removedException; - } - -} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index ffa021f..67a2e41 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -76,14 +76,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -675,9 +674,8 @@ public void recover(RMState state) throws Exception { ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId()); assert attemptState != null; - LOG.info("Recovered attempt: AppId: " - + getAppAttemptId().getApplicationId() + " AttemptId: " - + getAppAttemptId() + " MasterContainer: " + masterContainer); + LOG.info("Recovering attempt: " + getAppAttemptId() + " with final state: " + + attemptState.getState()); diagnostics.append("Attempt recovered after RM restart"); diagnostics.append(attemptState.getDiagnostics()); setMasterContainer(attemptState.getMasterContainer()); @@ -856,8 +854,6 @@ public void transition(RMAppAttemptImpl appAttempt, @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - LOG.info("Recovering attempt : recoverdFinalState :" - + appAttempt.recoveredFinalState); if (appAttempt.recoveredFinalState != null) { appAttempt.progress = 1.0f; RMApp rmApp =appAttempt.rmContext.getRMApps().get( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 8fe8de4..90277b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import java.util.HashMap; import java.util.List; @@ -43,6 +47,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -99,7 +104,7 @@ public static RMContext mockRMContext(int n, long time) { rmDispatcher); AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor( rmDispatcher); - return new RMContextImpl(rmDispatcher, + RMContext context = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, null, null, null, null) { @Override @@ -107,6 +112,8 @@ public static RMContext mockRMContext(int n, long time) { return map; } }; + ((RMContextImpl)context).setStateStore(mock(RMStateStore.class)); + return context; } public class TestAppManagerDispatcher implements @@ -142,7 +149,6 @@ public void handle(RMAppEvent event) { public TestRMAppManager(RMContext context, Configuration conf) { super(context, null, null, new ApplicationACLsManager(conf), conf); - setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS); } public TestRMAppManager(RMContext context, @@ -150,7 +156,6 @@ public TestRMAppManager(RMContext context, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationACLsManager applicationACLsManager, Configuration conf) { super(context, scheduler, masterService, applicationACLsManager, conf); - setCompletedAppsMax(YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS); } public void checkAppNumCompletedLimit() { @@ -164,9 +169,8 @@ public void finishApplication(ApplicationId appId) { public int getCompletedAppsListSize() { return super.getCompletedAppsListSize(); } - - public void setCompletedAppsMax(int max) { - super.setCompletedAppsMax(max); + public int getCompletedAppsInStateStore() { + return this.completedAppsInStateStore; } public void submitApplication( ApplicationSubmissionContext submissionContext, String user) @@ -227,9 +231,9 @@ public void testRMAppRetireNone() throws Exception { // Create such that none of the applications will retire since // haven't hit max # RMContext rmContext = mockRMContext(10, now - 10); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); - - appMonitor.setCompletedAppsMax(10); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 10); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext,conf); Assert.assertEquals("Number of apps incorrect before checkAppTimeLimit", 10, rmContext.getRMApps().size()); @@ -243,6 +247,8 @@ public void testRMAppRetireNone() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 10, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), never()).removeApplication( + isA(RMApp.class)); } @Test @@ -250,9 +256,10 @@ public void testRMAppRetireSome() throws Exception { long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(10, now - 20000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); - - appMonitor.setCompletedAppsMax(3); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 3); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 3); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); Assert.assertEquals("Number of apps incorrect before", 10, rmContext .getRMApps().size()); @@ -266,6 +273,8 @@ public void testRMAppRetireSome() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 3, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(7)).removeApplication( + isA(RMApp.class)); } @Test @@ -274,14 +283,17 @@ public void testRMAppRetireSomeDifferentStates() throws Exception { // these parameters don't matter, override applications below RMContext rmContext = mockRMContext(10, now - 20000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 2); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); - appMonitor.setCompletedAppsMax(2); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); // clear out applications map rmContext.getRMApps().clear(); Assert.assertEquals("map isn't empty", 0, rmContext.getRMApps().size()); + // 6 applications are in final state, 4 are not in final state. // / set with various finished states RMApp app = new MockRMApp(0, now - 20000, RMAppState.KILLED); rmContext.getRMApps().put(app.getApplicationId(), app); @@ -318,7 +330,9 @@ public void testRMAppRetireSomeDifferentStates() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 2, appMonitor.getCompletedAppsListSize()); - + // 6 applications in final state, 4 of them are removed + verify(rmContext.getStateStore(), times(4)).removeApplication( + isA(RMApp.class)); } @Test @@ -342,14 +356,13 @@ public void testRMAppRetireZeroSetting() throws Exception { long now = System.currentTimeMillis(); RMContext rmContext = mockRMContext(10, now - 20000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, new Configuration()); - + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 0); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 0); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); Assert.assertEquals("Number of apps incorrect before", 10, rmContext .getRMApps().size()); - // test with 0 - appMonitor.setCompletedAppsMax(0); - addToCompletedApps(appMonitor, rmContext); Assert.assertEquals("Number of completed apps incorrect", 10, appMonitor.getCompletedAppsListSize()); @@ -360,6 +373,64 @@ public void testRMAppRetireZeroSetting() throws Exception { rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", 0, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(10)).removeApplication( + isA(RMApp.class)); + } + + @Test + public void testStateStoreAppLimitLessThanMemeoryAppLimit() { + long now = System.currentTimeMillis(); + RMContext rmContext = mockRMContext(10, now - 20000); + Configuration conf = new YarnConfiguration(); + int maxAppsInMemory = 8; + int maxAppsInStateStore = 4; + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory); + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, + maxAppsInStateStore); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + addToCompletedApps(appMonitor, rmContext); + Assert.assertEquals("Number of completed apps incorrect", 10, + appMonitor.getCompletedAppsListSize()); + appMonitor.checkAppNumCompletedLimit(); + + Assert.assertEquals("Number of apps incorrect after # completed check", + maxAppsInMemory, rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", + maxAppsInMemory, appMonitor.getCompletedAppsListSize()); + + int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore; + verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore)) + .removeApplication(isA(RMApp.class)); + Assert.assertEquals(maxAppsInStateStore, + appMonitor.getCompletedAppsInStateStore()); + } + + @Test + public void testStateStoreAppLimitLargerThanMemoryAppLimit() { + long now = System.currentTimeMillis(); + RMContext rmContext = mockRMContext(10, now - 20000); + Configuration conf = new YarnConfiguration(); + int maxAppsInMemory = 8; + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory); + // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS. + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + addToCompletedApps(appMonitor, rmContext); + Assert.assertEquals("Number of completed apps incorrect", 10, + appMonitor.getCompletedAppsListSize()); + appMonitor.checkAppNumCompletedLimit(); + + int numRemoveApps = 10 - maxAppsInMemory; + Assert.assertEquals("Number of apps incorrect after # completed check", + maxAppsInMemory, rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", + maxAppsInMemory, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication( + isA(RMApp.class)); + Assert.assertEquals(maxAppsInMemory, + appMonitor.getCompletedAppsInStateStore()); } protected void setupDispatcher(RMContext rmContext, Configuration conf) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 38f8542..f1b258b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -423,6 +423,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception { rm2.getRMContext().getRMApps().get(app0.getApplicationId()); Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); + rm1.stop(); + rm2.stop(); } @Test @@ -629,6 +631,8 @@ public void testRMRestartFailedApp() throws Exception { .contains("Failing the application.")); // failed diagnostics from attempt is lost because the diagnostics from // attempt is not yet available by the time app is saving the app state. + rm1.stop(); + rm2.stop(); } @Test @@ -675,6 +679,8 @@ public void testRMRestartKilledApp() throws Exception{ ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2); Assert.assertEquals(app0.getDiagnostics().toString(), appReport.getDiagnostics()); + rm1.stop(); + rm2.stop(); } @Test @@ -724,6 +730,9 @@ public void testRMRestartSucceededApp() throws Exception { Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, appReport.getFinalApplicationStatus()); Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl()); + + rm1.stop(); + rm2.stop(); } @Test @@ -817,6 +826,9 @@ protected RMAppManager createRMAppManager() { // check application summary is logged for the completed apps after RM restart. verify(rm2.getRMAppManager(), times(3)).logApplicationSummary( isA(ApplicationId.class)); + + rm1.stop(); + rm2.stop(); } private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @@ -1378,6 +1390,52 @@ protected void handleStoreEvent(RMStateStoreEvent event) { Assert.assertTrue(rmAppState.size() == NUM_APPS); } + @Test + public void testFinishedAppRemovalAfterRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1); + memStore.init(conf); + RMState rmState = memStore.getState(); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create an app and finish the app. + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + finishApplicationMaster(app0, rm1, nm1, am0); + + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1 = rm2.registerNode("127.0.0.1:1234", 15120); + + Map rmAppState = + rmState.getApplicationState(); + + // app0 exits in both state store and rmContext + Assert.assertEquals(RMAppState.FINISHED, + rmAppState.get(app0.getApplicationId()).getState()); + rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED); + + // create one more app and finish the app. + RMApp app1 = rm2.submitApp(200); + MockAM am1 = launchAM(app1, rm2, nm1); + finishApplicationMaster(app1, rm2, nm1, am1); + + // the first app0 get kicked out from both rmContext and state store + Assert.assertNull(rm2.getRMContext().getRMApps() + .get(app0.getApplicationId())); + Assert.assertNull(rmAppState.get(app0.getApplicationId())); + + rm1.stop(); + rm2.stop(); + } + public static class TestSecurityMockRM extends MockRM { public TestSecurityMockRM(Configuration conf, RMStateStore store) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 417fdb1..ac77f6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -109,6 +110,7 @@ public EventHandler getEventHandler() { boolean isFinalStateValid() throws Exception; void writeVersion(RMStateVersion version) throws Exception; RMStateVersion getCurrentVersion() throws Exception; + boolean appExists(RMApp app) throws Exception; } void waitNotify(TestDispatcher dispatcher) { @@ -128,7 +130,7 @@ void waitNotify(TestDispatcher dispatcher) { dispatcher.notified = false; } - void storeApp(RMStateStore store, ApplicationId appId, long submitTime, + RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); @@ -141,6 +143,7 @@ void storeApp(RMStateStore store, ApplicationId appId, long submitTime, when(mockApp.getApplicationSubmissionContext()).thenReturn(context); when(mockApp.getUser()).thenReturn("test"); store.storeNewApplication(mockApp); + return mockApp; } ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, @@ -370,6 +373,7 @@ public void testRMDTSecretManagerStateStore( Assert.assertEquals(keySet, secretManagerState.getMasterKeyState()); Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber()); + store.close(); } private Token generateAMRMToken( @@ -415,4 +419,33 @@ public void testCheckVersion(RMStateStoreHelper stateStoreHelper) Assert.assertTrue(t instanceof RMStateVersionIncompatibleException); } } + + public void testAppDeletion(RMStateStoreHelper stateStoreHelper) + throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + store.setRMDispatcher(new TestDispatcher()); + // create and store apps + ArrayList appList = new ArrayList(); + int NUM_APPS = 5; + for (int i = 0; i < NUM_APPS; i++) { + ApplicationId appId = ApplicationId.newInstance(1383183338, i); + RMApp app = storeApp(store, appId, 123456789, 987654321); + appList.add(app); + } + + Assert.assertEquals(NUM_APPS, appList.size()); + for (RMApp app : appList) { + // wait for app to be stored. + while (!stateStoreHelper.appExists(app)) + ; + } + + for (RMApp app : appList) { + // remove the app + store.removeApplication(app); + // wait for app to be removed. + while (stateStoreHelper.appExists(app)) + ; + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index 4df1c3b..27e8411 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.Assert; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.Test; @@ -69,6 +71,13 @@ public Path getVersionNode() { public RMStateVersion getCurrentVersion() { return CURRENT_VERSION_INFO; } + + public Path getAppDir(String appId) { + Path rootDir = new Path(workingDirPathURI, ROOT_DIR_NAME); + Path appRootDir = new Path(rootDir, RM_APP_ROOT); + Path appDir = new Path(appRootDir, appId); + return appDir; + } } public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception { @@ -109,9 +118,16 @@ public void writeVersion(RMStateVersion version) throws Exception { public RMStateVersion getCurrentVersion() throws Exception { return store.getCurrentVersion(); } + + public boolean appExists(RMApp app) throws IOException { + FileSystem fs = cluster.getFileSystem(); + Path nodePath = + store.getAppDir(app.getApplicationId().toString()); + return fs.exists(nodePath); + } } - @Test + @Test(timeout = 60000) public void testFSRMStateStore() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); MiniDFSCluster cluster = @@ -126,11 +142,8 @@ public void testFSRMStateStore() throws Exception { String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003"; ApplicationAttemptId attemptId3 = ConverterUtils.toApplicationAttemptId(appAttemptIdStr3); - Path rootDir = - new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot"); - Path appRootDir = new Path(rootDir, "RMAppRoot"); Path appDir = - new Path(appRootDir, attemptId3.getApplicationId().toString()); + fsTester.store.getAppDir(attemptId3.getApplicationId().toString()); Path tempAppAttemptFile = new Path(appDir, attemptId3.toString() + ".tmp"); fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false); @@ -138,10 +151,11 @@ public void testFSRMStateStore() throws Exception { fsOut.close(); testRMAppStateStore(fsTester); - Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath + Assert.assertFalse(fsTester.workingDirPathURI .getFileSystem(conf).exists(tempAppAttemptFile)); testRMDTSecretManagerStateStore(fsTester); testCheckVersion(fsTester); + testAppDeletion(fsTester); } finally { cluster.shutdown(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 19121d8..924a30f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.Test; public class TestZKRMStateStore extends RMStateStoreTestBase { @@ -57,6 +59,7 @@ ZooKeeper client; TestZKRMStateStoreInternal store; + String workingZnode; class TestZKRMStateStoreInternal extends ZKRMStateStore { @@ -79,11 +82,16 @@ public String getVersionNode() { public RMStateVersion getCurrentVersion() { return CURRENT_VERSION_INFO; } + + public String getAppNode(String appId) { + return workingZnode + "/" + ROOT_ZNODE_NAME + "/" + RM_APP_ROOT + "/" + + appId; + } } public RMStateStore getRMStateStore() throws Exception { - String workingZnode = "/Test"; - Configuration conf = new YarnConfiguration(); + YarnConfiguration conf = new YarnConfiguration(); + workingZnode = "/Test"; conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); this.client = createClient(); @@ -107,14 +115,22 @@ public void writeVersion(RMStateVersion version) throws Exception { public RMStateVersion getCurrentVersion() throws Exception { return store.getCurrentVersion(); } + + public boolean appExists(RMApp app) throws Exception { + Stat node = + client.exists(store.getAppNode(app.getApplicationId().toString()), + false); + return node !=null; + } } - @Test + @Test (timeout = 60000) public void testZKRMStateStoreRealZK() throws Exception { TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); testRMAppStateStore(zkTester); testRMDTSecretManagerStateStore(zkTester); testCheckVersion(zkTester); + testAppDeletion(zkTester); } private Configuration createHARMConf( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java index 3def83c..f271370 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java @@ -120,7 +120,7 @@ public void testZKClientRetry() throws Exception { TestZKClient zkClientTester = new TestZKClient(); final String path = "/test"; YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100); + conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 1000); conf.setLong(YarnConfiguration.ZK_RM_STATE_STORE_RETRY_INTERVAL_MS, 100); final ZKRMStateStore store = (ZKRMStateStore) zkClientTester.getRMStateStore(conf);