diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 05c6cbf..8dc3850 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -508,6 +508,15 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String FS_RM_STATE_STORE_NUM_RETRIES = RM_PREFIX + + "fs.state-store.num-retries"; + public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0; + + public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS = RM_PREFIX + + "fs.state-store.retry-interval-ms"; + public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS = + 1000L; + public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX + "leveldb-state-store.path"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a7958a5..df730d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -420,6 +420,21 @@ + the number of retries to recover from IOException in + FileSystemRMStateStore. + + yarn.resourcemanager.fs.state-store.num-retries + 0 + + + + Retry interval in milliseconds in FileSystemRMStateStore. + + yarn.resourcemanager.fs.state-store.retry-interval-ms + 1000 + + + Local path where the RM state will be stored when using org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore as the value for yarn.resourcemanager.store.class diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 6e830a0..4e71fed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -92,6 +92,8 @@ Path rmDTSecretManagerRoot; private Path rmAppRoot; private Path dtSequenceNumberPath = null; + private int fsNumRetries; + private long fsRetryInterval; @VisibleForTesting Path fsWorkingPath; @@ -106,6 +108,12 @@ public synchronized void initInternal(Configuration conf) rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); amrmTokenSecretManagerRoot = new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); + fsNumRetries = + conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES); + fsRetryInterval = + conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS); } @Override @@ -157,7 +165,7 @@ protected synchronized void storeVersion() throws Exception { if (fs.exists(versionNodePath)) { updateFile(versionNodePath, data); } else { - writeFile(versionNodePath, data); + writeFileWithRetries(versionNodePath, data); } } @@ -179,7 +187,7 @@ public synchronized long getAndIncrementEpoch() throws Exception { // initialize epoch file with 1 for the next time. byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - writeFile(epochNodePath, storeData); + writeFileWithRetries(epochNodePath, storeData); } return currentEpoch; } @@ -361,7 +369,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appId); - fs.mkdirs(appDirPath); + mkdirsWithRetries(appDirPath); Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -369,7 +377,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, try { // currently throw all exceptions. May need to respond differently for HA // based on whether we have lost the right to write to FS - writeFile(nodeCreatePath, appStateData); + writeFileWithRetries(nodeCreatePath, appStateData); } catch (Exception e) { LOG.info("Error storing info for app: " + appId, e); throw e; @@ -408,7 +416,7 @@ public synchronized void storeApplicationAttemptStateInternal( try { // currently throw all exceptions. May need to respond differently for HA // based on whether we have lost the right to write to FS - writeFile(nodeCreatePath, attemptStateData); + writeFileWithRetries(nodeCreatePath, attemptStateData); } catch (Exception e) { LOG.info("Error storing info for attempt: " + appAttemptId, e); throw e; @@ -483,7 +491,7 @@ private void storeOrUpdateRMDelegationTokenState( updateFile(nodeCreatePath, identifierData.toByteArray()); } else { LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); - writeFile(nodeCreatePath, identifierData.toByteArray()); + writeFileWithRetries(nodeCreatePath, identifierData.toByteArray()); // store sequence number Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, @@ -513,7 +521,7 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) DataOutputStream fsOut = new DataOutputStream(os); LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); masterKey.write(fsOut); - writeFile(nodeCreatePath, os.toByteArray()); + writeFileWithRetries(nodeCreatePath, os.toByteArray()); fsOut.close(); } @@ -533,6 +541,43 @@ public synchronized void deleteStore() throws IOException { } } + private void mkdirsWithRetries(Path appDirPath) throws Exception { + int retry = 0; + while (true) { + try { + fs.mkdirs(appDirPath); + return; + } catch (IOException e) { + LOG.info("Exception from mkdirs.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out mkdirs retries. Giving up!"); + throw e; + } + LOG.info("Retrying mkdirs. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private void writeFileWithRetries(Path outputPath, byte[] data) + throws Exception { + int retry = 0; + while (true) { + try { + writeFile(outputPath, data); + return; + } catch (IOException e) { + LOG.info("Exception from writeFile.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out writeFile retries. Giving up!"); + throw e; + } + LOG.info("Retrying writeFile. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + private Path getAppDir(Path root, ApplicationId appId) { return getNodePath(root, appId.toString()); } @@ -595,8 +640,8 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { */ protected void updateFile(Path outputPath, byte[] data) throws Exception { Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); - // use writeFile to make sure .new file is created atomically - writeFile(newPath, data); + // use writeFileWithRetries to make sure .new file is created atomically + writeFileWithRetries(newPath, data); replaceFile(newPath, outputPath); } @@ -637,8 +682,17 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState( if (isUpdate) { updateFile(nodeCreatePath, stateData); } else { - writeFile(nodeCreatePath, stateData); + writeFileWithRetries(nodeCreatePath, stateData); } } + @VisibleForTesting + public int getNumRetries() { + return fsNumRetries; + } + + @VisibleForTesting + public long getRetryInterval() { + return fsRetryInterval; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index d0d19e3..3cedd4f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -100,7 +100,12 @@ public RMStateStore getRMStateStore() throws Exception { workingDirPathURI.toString()); conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, "100,6000"); + conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 1); + conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, + 900L); this.store = new TestFileSystemRMStore(conf); + Assert.assertEquals(store.getNumRetries(), 1); + Assert.assertEquals(store.getRetryInterval(), 900L); return store; }