diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d2a71bc..6ed378a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -188,6 +188,9 @@ private static void addDeprecatedKeys() { public static final String RM_EPOCH = RM_PREFIX + "epoch"; public static final long DEFAULT_RM_EPOCH = 0L; + public static final String RM_EPOCH_RANGE = RM_PREFIX + "epoch"; + public static final long DEFAULT_RM_EPOCH_RANGE = Integer.MAX_VALUE; + /** The address of the applications manager interface in the RM.*/ public static final String RM_ADDRESS = RM_PREFIX + "address"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 19297bc..b797283 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -205,12 +205,12 @@ public synchronized long getAndIncrementEpoch() throws Exception { Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); updateFile(epochNodePath, storeData, false); } else { // initialize epoch file with 1 for the next time. - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); writeFileWithRetries(epochNodePath, storeData, false); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 36a8dfa..e7fb02f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -259,7 +259,7 @@ public synchronized long getAndIncrementEpoch() throws Exception { if (data != null) { currentEpoch = EpochProto.parseFrom(data).getEpoch(); } - EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto(); + EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto(); db.put(dbKeyBytes, proto.toByteArray()); } catch (DBException e) { throw new IOException(e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5041000..219e10a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -59,7 +59,7 @@ public void checkVersion() throws Exception { @Override public synchronized long getAndIncrementEpoch() throws Exception { long currentEpoch = epoch; - epoch = epoch + 1; + epoch = nextEpoch(epoch); return currentEpoch; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index b4dd378..b02f51d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -104,6 +104,7 @@ protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; protected long baseEpoch; + protected long epochRange; protected ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; @@ -732,6 +733,8 @@ protected void serviceInit(Configuration conf) throws Exception{ // read the base epoch value from conf baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH, YarnConfiguration.DEFAULT_RM_EPOCH); + epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE, + YarnConfiguration.DEFAULT_RM_EPOCH_RANGE); initInternal(conf); } @@ -818,7 +821,14 @@ public void checkVersion() throws Exception { * Get the current epoch of RM and increment the value. */ public abstract long getAndIncrementEpoch() throws Exception; - + + /** + * Compute the next epoch value + */ + protected long nextEpoch(long epoch){ + return (epoch - baseEpoch + 1) % epochRange + baseEpoch; + } + /** * Blocking API * The derived class must recover state from the store and return a new diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9073910..de1f1ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -491,13 +491,13 @@ public synchronized long getAndIncrementEpoch() throws Exception { Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl, fencingNodePath); } else { // initialize epoch node with 1 for the next time. - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); zkManager.safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath);