diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 8aa136d..4357a42 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -188,6 +188,9 @@ private static void addDeprecatedKeys() {
public static final String RM_EPOCH = RM_PREFIX + "epoch";
public static final long DEFAULT_RM_EPOCH = 0L;
+ public static final String RM_EPOCH_RANGE = RM_EPOCH + ".range";
+ public static final long DEFAULT_RM_EPOCH_RANGE = Integer.MAX_VALUE;
+
/** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS =
RM_PREFIX + "address";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 85915c2..af7b131 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -677,6 +677,13 @@
+ The range of values above base epoch that the RM will use before
+ wrapping around
+ yarn.resourcemanager.epoch.range
+ 2147483647
+
+
+
The list of RM nodes in the cluster when HA is
enabled. See description of yarn.resourcemanager.ha
.enabled for full details on how this is used.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 19297bc..b797283 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -205,12 +205,12 @@ public synchronized long getAndIncrementEpoch() throws Exception {
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
updateFile(epochNodePath, storeData, false);
} else {
// initialize epoch file with 1 for the next time.
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
writeFileWithRetries(epochNodePath, storeData, false);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index 36a8dfa..e7fb02f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -259,7 +259,7 @@ public synchronized long getAndIncrementEpoch() throws Exception {
if (data != null) {
currentEpoch = EpochProto.parseFrom(data).getEpoch();
}
- EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
+ EpochProto proto = Epoch.newInstance(nextEpoch(currentEpoch)).getProto();
db.put(dbKeyBytes, proto.toByteArray());
} catch (DBException e) {
throw new IOException(e);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 5041000..219e10a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -59,7 +59,7 @@ public void checkVersion() throws Exception {
@Override
public synchronized long getAndIncrementEpoch() throws Exception {
long currentEpoch = epoch;
- epoch = epoch + 1;
+ epoch = nextEpoch(epoch);
return currentEpoch;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index b4dd378..fd30363 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -104,6 +104,7 @@
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
protected long baseEpoch;
+ private long epochRange;
protected ResourceManager resourceManager;
private final ReadLock readLock;
private final WriteLock writeLock;
@@ -732,6 +733,8 @@ protected void serviceInit(Configuration conf) throws Exception{
// read the base epoch value from conf
baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
YarnConfiguration.DEFAULT_RM_EPOCH);
+ epochRange = conf.getLong(YarnConfiguration.RM_EPOCH_RANGE,
+ YarnConfiguration.DEFAULT_RM_EPOCH_RANGE);
initInternal(conf);
}
@@ -818,7 +821,16 @@ public void checkVersion() throws Exception {
* Get the current epoch of RM and increment the value.
*/
public abstract long getAndIncrementEpoch() throws Exception;
-
+
+ /**
+ * Compute the next epoch value by incrementing by one.
+ * Wraps around if the epoch range is exceeded.
+ */
+ protected long nextEpoch(long epoch){
+ long epochVal = (epoch - baseEpoch + 1) % epochRange;
+ return epochVal + baseEpoch;
+ }
+
/**
* Blocking API
* The derived class must recover state from the store and return a new
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 9073910..de1f1ad 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -491,13 +491,13 @@ public synchronized long getAndIncrementEpoch() throws Exception {
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
// increment epoch and store it
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
fencingNodePath);
} else {
// initialize epoch node with 1 for the next time.
- byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
+ byte[] storeData = Epoch.newInstance(nextEpoch(currentEpoch)).getProto()
.toByteArray();
zkManager.safeCreate(epochNodePath, storeData, zkAcl,
CreateMode.PERSISTENT, zkAcl, fencingNodePath);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 957d4ce..3454d72 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -94,6 +94,8 @@
protected final long epoch = 10L;
+ private final long epochRange = 10L;
+
static class TestDispatcher implements Dispatcher, EventHandler {
ApplicationAttemptId attemptId;
@@ -141,6 +143,10 @@ void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId
boolean attemptExists(RMAppAttempt attempt) throws Exception;
}
+ public long getEpochRange() {
+ return epochRange;
+ }
+
void waitNotify(TestDispatcher dispatcher) {
long startTime = System.currentTimeMillis();
while(!dispatcher.notified) {
@@ -576,6 +582,14 @@ public void testEpoch(RMStateStoreHelper stateStoreHelper)
long thirdTimeEpoch = store.getAndIncrementEpoch();
Assert.assertEquals(epoch + 2, thirdTimeEpoch);
+
+ for (int i = 0; i < epochRange; ++i) {
+ store.getAndIncrementEpoch();
+ }
+ long wrappedEpoch = store.getAndIncrementEpoch();
+ // Epoch should have wrapped around and then incremented once for a total
+ // of + 3
+ Assert.assertEquals(epoch + 3, wrappedEpoch);
}
public void testAppDeletion(RMStateStoreHelper stateStoreHelper)
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index fe4a701..14f5404 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -118,6 +118,7 @@ public RMStateStore getRMStateStore() throws Exception {
conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
900L);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+ conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
if (adminCheckEnable) {
conf.setBoolean(
YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index afd0c77..576ee7f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -83,6 +83,7 @@ public void testVersion() throws Exception {
@Test(timeout = 60000)
public void testEpoch() throws Exception {
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+ conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
testEpoch(tester);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index d8718e0..4cba266 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -210,6 +210,7 @@ private RMStateStore createStore(Configuration conf) throws Exception {
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
+ conf.setLong(YarnConfiguration.RM_EPOCH_RANGE, getEpochRange());
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}