diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 05c6cbf..8dc3850 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -508,6 +508,15 @@ private static void addDeprecatedKeys() { public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC = "2000, 500"; + public static final String FS_RM_STATE_STORE_NUM_RETRIES = RM_PREFIX + + "fs.state-store.num-retries"; + public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0; + + public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS = RM_PREFIX + + "fs.state-store.retry-interval-ms"; + public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS = + 1000L; + public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX + "leveldb-state-store.path"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index a7958a5..df730d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -420,6 +420,21 @@ + the number of retries to recover from IOException in + FileSystemRMStateStore. + + yarn.resourcemanager.fs.state-store.num-retries + 0 + + + + Retry interval in milliseconds in FileSystemRMStateStore. + + yarn.resourcemanager.fs.state-store.retry-interval-ms + 1000 + + + Local path where the RM state will be stored when using org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore as the value for yarn.resourcemanager.store.class diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 6e830a0..8fdbdb9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -92,6 +92,8 @@ Path rmDTSecretManagerRoot; private Path rmAppRoot; private Path dtSequenceNumberPath = null; + private int fsNumRetries; + private long fsRetryInterval; @VisibleForTesting Path fsWorkingPath; @@ -106,6 +108,12 @@ public synchronized void initInternal(Configuration conf) rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); amrmTokenSecretManagerRoot = new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); + fsNumRetries = + conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES); + fsRetryInterval = + conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS); } @Override @@ -139,7 +147,7 @@ protected Version getCurrentVersion() { @Override protected synchronized Version loadVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); - FileStatus status = getFileStatus(versionNodePath); + FileStatus status = getFileStatusWithRetries(versionNodePath); if (status != null) { byte[] data = readFile(versionNodePath, status.getLen()); Version version = @@ -154,10 +162,10 @@ protected synchronized void storeVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); - if (fs.exists(versionNodePath)) { + if (existsWithRetries(versionNodePath)) { updateFile(versionNodePath, data); } else { - writeFile(versionNodePath, data); + writeFileWithRetries(versionNodePath, data); } } @@ -165,7 +173,7 @@ protected synchronized void storeVersion() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception { Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); long currentEpoch = 0; - FileStatus status = getFileStatus(epochNodePath); + FileStatus status = getFileStatusWithRetries(epochNodePath); if (status != null) { // load current epoch byte[] data = readFile(epochNodePath, status.getLen()); @@ -179,7 +187,7 @@ public synchronized long getAndIncrementEpoch() throws Exception { // initialize epoch file with 1 for the next time. byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() .toByteArray(); - writeFile(epochNodePath, storeData); + writeFileWithRetries(epochNodePath, storeData); } return currentEpoch; } @@ -201,7 +209,8 @@ private void loadAMRMTokenSecretManagerState(RMState rmState) checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); Path amrmTokenSecretManagerStateDataDir = new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); - FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir); + FileStatus status = getFileStatusWithRetries( + amrmTokenSecretManagerStateDataDir); if (status == null) { return; } @@ -225,7 +234,8 @@ private void loadRMAppState(RMState rmState) throws Exception { for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + if (checkAndRemovePartialRecordWithRetries( + childNodeStatus.getPath())) { continue; } byte[] childData = @@ -315,7 +325,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); - if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) { continue; } if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { @@ -361,7 +371,7 @@ private void loadRMDTSecretManagerState(RMState rmState) throws Exception { public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appId); - fs.mkdirs(appDirPath); + mkdirsWithRetries(appDirPath); Path nodeCreatePath = getNodePath(appDirPath, appId.toString()); LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath); @@ -369,7 +379,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId appId, try { // currently throw all exceptions. May need to respond differently for HA // based on whether we have lost the right to write to FS - writeFile(nodeCreatePath, appStateData); + writeFileWithRetries(nodeCreatePath, appStateData); } catch (Exception e) { LOG.info("Error storing info for app: " + appId, e); throw e; @@ -408,7 +418,7 @@ public synchronized void storeApplicationAttemptStateInternal( try { // currently throw all exceptions. May need to respond differently for HA // based on whether we have lost the right to write to FS - writeFile(nodeCreatePath, attemptStateData); + writeFileWithRetries(nodeCreatePath, attemptStateData); } catch (Exception e) { LOG.info("Error storing info for attempt: " + appAttemptId, e); throw e; @@ -444,7 +454,7 @@ public synchronized void removeApplicationStateInternal( appState.getApplicationSubmissionContext().getApplicationId(); Path nodeRemovePath = getAppDir(rmAppRoot, appId); LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath); - deleteFile(nodeRemovePath); + deleteFileWithRetries(nodeRemovePath); } @Override @@ -460,7 +470,7 @@ public synchronized void removeRMDelegationTokenState( Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber()); - deleteFile(nodeCreatePath); + deleteFileWithRetries(nodeCreatePath); } @Override @@ -483,7 +493,7 @@ private void storeOrUpdateRMDelegationTokenState( updateFile(nodeCreatePath, identifierData.toByteArray()); } else { LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); - writeFile(nodeCreatePath, identifierData.toByteArray()); + writeFileWithRetries(nodeCreatePath, identifierData.toByteArray()); // store sequence number Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, @@ -492,11 +502,12 @@ private void storeOrUpdateRMDelegationTokenState( LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + identifier.getSequenceNumber()); if (dtSequenceNumberPath == null) { - if (!createFile(latestSequenceNumberPath)) { + if (!createFileWithRetries(latestSequenceNumberPath)) { throw new Exception("Failed to create " + latestSequenceNumberPath); } } else { - if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) { + if (!renameFileWithRetries(dtSequenceNumberPath, + latestSequenceNumberPath)) { throw new Exception("Failed to rename " + dtSequenceNumberPath); } } @@ -513,7 +524,7 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) DataOutputStream fsOut = new DataOutputStream(os); LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId()); masterKey.write(fsOut); - writeFile(nodeCreatePath, os.toByteArray()); + writeFileWithRetries(nodeCreatePath, os.toByteArray()); fsOut.close(); } @@ -523,13 +534,13 @@ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey) Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX + masterKey.getKeyId()); LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId()); - deleteFile(nodeCreatePath); + deleteFileWithRetries(nodeCreatePath); } @Override - public synchronized void deleteStore() throws IOException { - if (fs.exists(rootDirPath)) { - fs.delete(rootDirPath, true); + public synchronized void deleteStore() throws Exception { + if (existsWithRetries(rootDirPath)) { + deleteFileWithRetries(rootDirPath); } } @@ -539,6 +550,148 @@ private Path getAppDir(Path root, ApplicationId appId) { // FileSystem related code + private boolean checkAndRemovePartialRecordWithRetries(Path record) + throws Exception { + int retry = 0; + while (true) { + try { + return checkAndRemovePartialRecord(record); + } catch (IOException e) { + LOG.info("Exception from checkAndRemovePartialRecord.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out checkAndRemovePartialRecord retries." + + " Giving up!"); + throw e; + } + LOG.info("Retrying checkAndRemovePartialRecord. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private void mkdirsWithRetries(Path appDirPath) throws Exception { + int retry = 0; + while (true) { + try { + fs.mkdirs(appDirPath); + return; + } catch (IOException e) { + LOG.info("Exception from mkdirs.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out mkdirs retries. Giving up!"); + throw e; + } + LOG.info("Retrying mkdirs. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private void writeFileWithRetries(Path outputPath, byte[] data) + throws Exception { + int retry = 0; + while (true) { + try { + writeFile(outputPath, data); + return; + } catch (IOException e) { + LOG.info("Exception from writeFile.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out writeFile retries. Giving up!"); + throw e; + } + LOG.info("Retrying writeFile. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private void deleteFileWithRetries(Path deletePath) throws Exception { + int retry = 0; + while (true) { + try { + deleteFile(deletePath); + return; + } catch (IOException e) { + LOG.info("Exception from deleteFile.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out deleteFile retries. Giving up!"); + throw e; + } + LOG.info("Retrying deleteFile. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private boolean renameFileWithRetries(Path src, Path dst) throws Exception { + int retry = 0; + while (true) { + try { + return renameFile(src, dst); + } catch (IOException e) { + LOG.info("Exception from renameFile.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out renameFile retries. Giving up!"); + throw e; + } + LOG.info("Retrying renameFile. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private boolean createFileWithRetries(Path newFile) throws Exception { + int retry = 0; + while (true) { + try { + return createFile(newFile); + } catch (IOException e) { + LOG.info("Exception from createFile.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out createFile retries. Giving up!"); + throw e; + } + LOG.info("Retrying createFile. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private FileStatus getFileStatusWithRetries(Path path) throws Exception { + int retry = 0; + while (true) { + try { + return getFileStatus(path); + } catch (IOException e) { + LOG.info("Exception from getFileStatus.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out getFileStatus retries. Giving up!"); + throw e; + } + LOG.info("Retrying getFileStatus. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + + private boolean existsWithRetries(Path path) throws Exception { + int retry = 0; + while (true) { + try { + return fs.exists(path); + } catch (IOException e) { + LOG.info("Exception from exists.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out exists retries. Giving up!"); + throw e; + } + LOG.info("Retrying exists. Retry no. " + retry); + Thread.sleep(fsRetryInterval); + } + } + } + private void deleteFile(Path deletePath) throws Exception { if(!fs.delete(deletePath, true)) { throw new Exception("Failed to delete " + deletePath); @@ -595,18 +748,18 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { */ protected void updateFile(Path outputPath, byte[] data) throws Exception { Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); - // use writeFile to make sure .new file is created atomically - writeFile(newPath, data); + // use writeFileWithRetries to make sure .new file is created atomically + writeFileWithRetries(newPath, data); replaceFile(newPath, outputPath); } protected void replaceFile(Path srcPath, Path dstPath) throws Exception { - if (fs.exists(dstPath)) { - deleteFile(dstPath); + if (existsWithRetries(dstPath)) { + deleteFileWithRetries(dstPath); } else { LOG.info("File doesn't exist. Skip deleting the file " + dstPath); } - fs.rename(srcPath, dstPath); + renameFileWithRetries(srcPath, dstPath); } @Private @@ -637,8 +790,17 @@ public synchronized void storeOrUpdateAMRMTokenSecretManagerState( if (isUpdate) { updateFile(nodeCreatePath, stateData); } else { - writeFile(nodeCreatePath, stateData); + writeFileWithRetries(nodeCreatePath, stateData); } } + @VisibleForTesting + public int getNumRetries() { + return fsNumRetries; + } + + @VisibleForTesting + public long getRetryInterval() { + return fsRetryInterval; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index d0d19e3..675d73c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -100,7 +100,12 @@ public RMStateStore getRMStateStore() throws Exception { workingDirPathURI.toString()); conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC, "100,6000"); + conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 5); + conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS, + 900L); this.store = new TestFileSystemRMStore(conf); + Assert.assertEquals(store.getNumRetries(), 5); + Assert.assertEquals(store.getRetryInterval(), 900L); return store; }