diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 062f5cc..e85ba92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -118,6 +119,9 @@ private void loadRMAppState(RMState rmState) throws Exception { for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); + if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + continue; + } byte[] childData = readFile(childNodeStatus.getPath(), childNodeStatus.getLen()); if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { @@ -178,12 +182,28 @@ private void loadRMAppState(RMState rmState) throws Exception { } } + private boolean checkAndRemovePartialRecord(Path record) throws IOException { + // If the file ends with .tmp then it shows that it failed + // during saving state into state store. The file will be deleted as a + // part of this call + if (record.getName().endsWith(".tmp")) { + LOG.error("incomplete rm state store entry found :" + + record); + fs.delete(record, false); + return true; + } + return false; + } + private void loadRMDTSecretManagerState(RMState rmState) throws Exception { FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot); for(FileStatus childNodeStatus : childNodes) { assert childNodeStatus.isFile(); String childNodeName = childNodeStatus.getPath().getName(); + if (checkAndRemovePartialRecord(childNodeStatus.getPath())) { + continue; + } if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) { rmState.rmSecretManagerState.dtSequenceNumber = Integer.parseInt(childNodeName.split("_")[1]); @@ -344,10 +364,19 @@ private void deleteFile(Path deletePath) throws Exception { return data; } + /* + * In order to make this write atomic as a part of write we will first write + * data to .tmp file and then rename it. Here we are assuming that rename is + * atomic for underlying file system. + */ private void writeFile(Path outputPath, byte[] data) throws Exception { - FSDataOutputStream fsOut = fs.create(outputPath, false); + Path tempPath = + new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); + FSDataOutputStream fsOut = null; + fsOut = fs.create(tempPath, false); fsOut.write(data); fsOut.close(); + fs.rename(tempPath, outputPath); } private boolean renameFile(Path src, Path dst) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java index d75fc7d..56b31c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -332,6 +333,28 @@ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) attempts.put(attemptIdRemoved, mockRemovedAttempt); store.removeApplication(mockRemovedApp); + // If the state store is FileSystemRMStateStore then add corrupted entry. + // It should discard the entry and remove it from file system. + if (store instanceof FileSystemRMStateStore) { + FSDataOutputStream fsOut = null; + FileSystemRMStateStore fileSystemRMStateStore = + (FileSystemRMStateStore) store; + String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003"; + ApplicationAttemptId attemptId3 = + ConverterUtils.toApplicationAttemptId(appAttemptIdStr3); + Path rootDir = + new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot"); + Path appRootDir = new Path(rootDir, "RMAppRoot"); + Path appDir = + new Path(appRootDir, attemptId3.getApplicationId().toString()); + Path tempAppAttemptFile = + new Path(appDir, attemptId3.toString() + ".tmp"); + fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false); + fsOut.write("Some random data ".getBytes()); + fsOut.close(); + + } + // let things settle down Thread.sleep(1000); store.close();