diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 075a39c..dfce04e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -623,6 +623,11 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + public static final String RM_STATE_STORE_MAX_COMPLETED_ATTEMPTS_PER_APP = + RM_PREFIX + "state-store.max-completed-attempts-per-app"; + public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_ATTEMPTS_PER_APP = + 1000; + /** Default application name */ public static final String DEFAULT_APPLICATION_NAME = "N/A"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5e2ff6c..8de127a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -1006,6 +1006,10 @@ public abstract class RMStateStore extends AbstractService { this.resourceManager = rm; } + public ResourceManager getResourceManager() { + return this.resourceManager; + } + private class StandByTransitionThread implements Runnable { @Override public void run() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index be211d3..8a2af7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -105,6 +105,7 @@ public class ZKRMStateStore extends RMStateStore { private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME = "RMDTMasterKeysRoot"; private int numRetries; + private int maxCompletedAttemptsPerAppInStateStore; private int serializeHeadAndTailSize = 0; private int serializeDeleteFixSize = 0; private long deleteMultiOpSizeLimit = 0; @@ -240,6 +241,11 @@ public class ZKRMStateStore extends RMStateStore { numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); + + maxCompletedAttemptsPerAppInStateStore = + conf.getInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_ATTEMPTS_PER_APP, + YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_ATTEMPTS_PER_APP); + serializeHeadAndTailSize = conf.getInt(YarnConfiguration.RM_ZK_SERIALIZE_HEADEANDTAIL_SIZE, YarnConfiguration.DEFAULT_RM_ZK_SERIALIZE_HEADEANDTAIL_SIZE); serializeDeleteFixSize = conf.getInt(YarnConfiguration.RM_ZK_SERIALIZE_DELETEFIX_SIZE, @@ -680,6 +686,7 @@ public class ZKRMStateStore extends RMStateStore { throws Exception { String appDirPath = getNodePath(rmAppRoot, appAttemptId.getApplicationId().toString()); + String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString()); if (LOG.isDebugEnabled()) { @@ -689,6 +696,29 @@ public class ZKRMStateStore extends RMStateStore { byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray(); createWithRetries(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); + + // delete the earliest attempt from ZooKeeper statestore and memory + List attempts = getChildrenWithRetries(appDirPath, false); + if (attempts.size() > maxCompletedAttemptsPerAppInStateStore) { + String attemptToDeleted = ""; + int attemptID = Integer.MAX_VALUE; + for (String attempt : attempts){ + int tmpID = Integer.parseInt(attempt.substring(attempt.lastIndexOf('_') + 1)); + if (attemptID > tmpID){ + attemptID = tmpID; + attemptToDeleted = attempt; + } + } + if (!"".equals(attemptToDeleted)) { + String attemptPath = getNodePath(appDirPath, attemptToDeleted); + deleteWithRetries(attemptPath, false); + ApplicationAttemptId appAttemptIdToDeleted = ApplicationAttemptId.newInstance( + appAttemptId.getApplicationId(), attemptID); + this.getResourceManager().getRMContext().getRMApps().get( + appAttemptId.getApplicationId()).getAppAttempts().remove(appAttemptIdToDeleted); + LOG.info("Delete " + attemptPath + " from ZooKeeper and memory"); + } + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 17f103b..e66c371 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -535,7 +535,7 @@ public class RMAppImpl implements RMApp, Recoverable { this.readLock.lock(); try { - return Collections.unmodifiableMap(this.attempts); + return this.attempts; } finally { this.readLock.unlock(); } @@ -771,9 +771,9 @@ public class RMAppImpl implements RMApp, Recoverable { this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); - for(int i=0; i