diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8aa136d..4357a42 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -188,6 +188,9 @@ private static void addDeprecatedKeys() { public static final String RM_EPOCH = RM_PREFIX + "epoch"; public static final long DEFAULT_RM_EPOCH = 0L; + public static final String RM_EPOCH_RANGE = RM_EPOCH + ".range"; + public static final long DEFAULT_RM_EPOCH_RANGE = Integer.MAX_VALUE; + /** The address of the applications manager interface in the RM.*/ public static final String RM_ADDRESS = RM_PREFIX + "address"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 85915c2..af7b131 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -677,6 +677,13 @@ + The range of values above base epoch that the RM will use before + wrapping around + yarn.resourcemanager.epoch.range + 2147483647 + + + The list of RM nodes in the cluster when HA is enabled. See description of yarn.resourcemanager.ha .enabled for full details on how this is used. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 19297bc..b797283 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -205,12 +205,12 @@ public synchronized long getAndIncrementEpoch() throws Exception { Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); updateFile(epochNodePath, storeData, false); } else { // initialize epoch file with 1 for the next time. - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); writeFileWithRetries(epochNodePath, storeData, false); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 36a8dfa..e7fb02f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -259,7 +259,7 @@ public synchronized long getAndIncrementEpoch() throws Exception { if (data != null) { currentEpoch = EpochProto.parseFrom(data).getEpoch(); } - EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto(); + EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto(); db.put(dbKeyBytes, proto.toByteArray()); } catch (DBException e) { throw new IOException(e); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 5041000..219e10a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -59,7 +59,7 @@ public void checkVersion() throws Exception { @Override public synchronized long getAndIncrementEpoch() throws Exception { long currentEpoch = epoch; - epoch = epoch + 1; + epoch = nextEpoch(epoch); return currentEpoch; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index b4dd378..fd30363 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -104,6 +104,7 @@ protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; protected long baseEpoch; + private long epochRange; protected ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; @@ -732,6 +733,8 @@ protected void serviceInit(Configuration conf) throws Exception{ // read the base epoch value from conf baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH, YarnConfiguration.DEFAULT_RM_EPOCH); + epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE, + YarnConfiguration.DEFAULT_RM_EPOCH_RANGE); initInternal(conf); } @@ -818,7 +821,16 @@ public void checkVersion() throws Exception { * Get the current epoch of RM and increment the value. */ public abstract long getAndIncrementEpoch() throws Exception; - + + /** + * Compute the next epoch value by incrementing by one. + * Wraps around if the epoch range is exceeded. + */ + protected long nextEpoch(long epoch){ + long epochVal = (epoch - baseEpoch + 1) % epochRange; + return epochVal + baseEpoch; + } + /** * Blocking API * The derived class must recover state from the store and return a new diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 9073910..de1f1ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -491,13 +491,13 @@ public synchronized long getAndIncrementEpoch() throws Exception { Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); // increment epoch and store it - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl, fencingNodePath); } else { // initialize epoch node with 1 for the next time. - byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto() .toByteArray(); zkManager.safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 957d4ce..3454d72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -94,6 +94,8 @@ protected final long epoch = 10L; + private final long epochRange = 10L; + static class TestDispatcher implements Dispatcher, EventHandler { ApplicationAttemptId attemptId; @@ -141,6 +143,10 @@ void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId boolean attemptExists(RMAppAttempt attempt) throws Exception; } + public long getEpochRange() { + return epochRange; + } + void waitNotify(TestDispatcher dispatcher) { long startTime = System.currentTimeMillis(); while(!dispatcher.notified) { @@ -576,6 +582,14 @@ public void testEpoch(RMStateStoreHelper stateStoreHelper) long thirdTimeEpoch = store.getAndIncrementEpoch(); Assert.assertEquals(epoch + 2, thirdTimeEpoch); + + for (int i = 0; i < epochRange; ++i) { + store.getAndIncrementEpoch(); + } + long wrappedEpoch = store.getAndIncrementEpoch(); + // Epoch should have wrapped around and then incremented once for a total + // of + 3 + Assert.assertEquals(epoch + 3, wrappedEpoch); } public void testAppDeletion(RMStateStoreHelper stateStoreHelper) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index fe4a701..14f5404 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -118,6 +118,7 @@ public RMStateStore getRMStateStore() throws Exception { conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, 900L); conf.setLong(YarnConfiguration.RM_EPOCH, epoch); + conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange()); if (adminCheckEnable) { conf.setBoolean( YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index afd0c77..576ee7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -83,6 +83,7 @@ public void testVersion() throws Exception { @Test(timeout = 60000) public void testEpoch() throws Exception { conf.setLong(YarnConfiguration.RM_EPOCH, epoch); + conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange()); LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); testEpoch(tester); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index d8718e0..4cba266 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -210,6 +210,7 @@ private RMStateStore createStore(Configuration conf) throws Exception { curatorTestingServer.getConnectString()); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.setLong(YarnConfiguration.RM_EPOCH, epoch); + conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange()); this.store = new TestZKRMStateStoreInternal(conf, workingZnode); return this.store; }