diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 015baa1..15446ec 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -506,6 +506,15 @@ private static void addDeprecatedKeys() {
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500";
+ public static final String FS_RM_STATE_STORE_NUM_RETRIES = RM_PREFIX
+ + "fs.state-store.num-retries";
+ public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0;
+
+ public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS = RM_PREFIX
+ + "fs.state-store.retry-interval-ms";
+ public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
+ 1000L;
+
public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
+ "leveldb-state-store.path";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 349e57b..f574388 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -420,6 +420,21 @@
+ the number of retries to recover from IOException in
+ FileSystemRMStateStore.
+
+ yarn.resourcemanager.fs.state-store.num-retries
+ 0
+
+
+
+ Retry interval in milliseconds in FileSystemRMStateStore.
+
+ yarn.resourcemanager.fs.state-store.retry-interval-ms
+ 1000
+
+
+
Local path where the RM state will be stored when using
org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
as the value for yarn.resourcemanager.store.class
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 6e830a0..4e71fed 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -92,6 +92,8 @@
Path rmDTSecretManagerRoot;
private Path rmAppRoot;
private Path dtSequenceNumberPath = null;
+ private int fsNumRetries;
+ private long fsRetryInterval;
@VisibleForTesting
Path fsWorkingPath;
@@ -106,6 +108,12 @@ public synchronized void initInternal(Configuration conf)
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
amrmTokenSecretManagerRoot =
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+ fsNumRetries =
+ conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
+ YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
+ fsRetryInterval =
+ conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
}
@Override
@@ -157,7 +165,7 @@ protected synchronized void storeVersion() throws Exception {
if (fs.exists(versionNodePath)) {
updateFile(versionNodePath, data);
} else {
- writeFile(versionNodePath, data);
+ writeFileWithRetries(versionNodePath, data);
}
}
@@ -179,7 +187,7 @@ public synchronized long getAndIncrementEpoch() throws Exception {
// initialize epoch file with 1 for the next time.
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
.toByteArray();
- writeFile(epochNodePath, storeData);
+ writeFileWithRetries(epochNodePath, storeData);
}
return currentEpoch;
}
@@ -361,7 +369,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateData appStateDataPB) throws Exception {
Path appDirPath = getAppDir(rmAppRoot, appId);
- fs.mkdirs(appDirPath);
+ mkdirsWithRetries(appDirPath);
Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -369,7 +377,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId,
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
- writeFile(nodeCreatePath, appStateData);
+ writeFileWithRetries(nodeCreatePath, appStateData);
} catch (Exception e) {
LOG.info("Error storing info for app: " + appId, e);
throw e;
@@ -408,7 +416,7 @@ public synchronized void storeApplicationAttemptStateInternal(
try {
// currently throw all exceptions. May need to respond differently for HA
// based on whether we have lost the right to write to FS
- writeFile(nodeCreatePath, attemptStateData);
+ writeFileWithRetries(nodeCreatePath, attemptStateData);
} catch (Exception e) {
LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e;
@@ -483,7 +491,7 @@ private void storeOrUpdateRMDelegationTokenState(
updateFile(nodeCreatePath, identifierData.toByteArray());
} else {
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
- writeFile(nodeCreatePath, identifierData.toByteArray());
+ writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
// store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
@@ -513,7 +521,7 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
DataOutputStream fsOut = new DataOutputStream(os);
LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
masterKey.write(fsOut);
- writeFile(nodeCreatePath, os.toByteArray());
+ writeFileWithRetries(nodeCreatePath, os.toByteArray());
fsOut.close();
}
@@ -533,6 +541,43 @@ public synchronized void deleteStore() throws IOException {
}
}
+ private void mkdirsWithRetries(Path appDirPath) throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ fs.mkdirs(appDirPath);
+ return;
+ } catch (IOException e) {
+ LOG.info("Exception from mkdirs.", e);
+ if (++retry > fsNumRetries) {
+ LOG.info("Maxed out mkdirs retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Retrying mkdirs. Retry no. " + retry);
+ Thread.sleep(fsRetryInterval);
+ }
+ }
+ }
+
+ private void writeFileWithRetries(Path outputPath, byte[] data)
+ throws Exception {
+ int retry = 0;
+ while (true) {
+ try {
+ writeFile(outputPath, data);
+ return;
+ } catch (IOException e) {
+ LOG.info("Exception from writeFile.", e);
+ if (++retry > fsNumRetries) {
+ LOG.info("Maxed out writeFile retries. Giving up!");
+ throw e;
+ }
+ LOG.info("Retrying writeFile. Retry no. " + retry);
+ Thread.sleep(fsRetryInterval);
+ }
+ }
+ }
+
private Path getAppDir(Path root, ApplicationId appId) {
return getNodePath(root, appId.toString());
}
@@ -595,8 +640,8 @@ private void writeFile(Path outputPath, byte[] data) throws Exception {
*/
protected void updateFile(Path outputPath, byte[] data) throws Exception {
Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
- // use writeFile to make sure .new file is created atomically
- writeFile(newPath, data);
+ // use writeFileWithRetries to make sure .new file is created atomically
+ writeFileWithRetries(newPath, data);
replaceFile(newPath, outputPath);
}
@@ -637,8 +682,17 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
if (isUpdate) {
updateFile(nodeCreatePath, stateData);
} else {
- writeFile(nodeCreatePath, stateData);
+ writeFileWithRetries(nodeCreatePath, stateData);
}
}
+ @VisibleForTesting
+ public int getNumRetries() {
+ return fsNumRetries;
+ }
+
+ @VisibleForTesting
+ public long getRetryInterval() {
+ return fsRetryInterval;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index d0d19e3..3cedd4f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -100,7 +100,12 @@ public RMStateStore getRMStateStore() throws Exception {
workingDirPathURI.toString());
conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
"100,6000");
+ conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 1);
+ conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
+ 900L);
this.store = new TestFileSystemRMStore(conf);
+ Assert.assertEquals(store.getNumRetries(), 1);
+ Assert.assertEquals(store.getRetryInterval(), 900L);
return store;
}