diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index 51e3916..9881f7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -141,8 +140,8 @@ protected Version getCurrentVersion() { @Override protected synchronized Version loadVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); - if (fs.exists(versionNodePath)) { - FileStatus status = fs.getFileStatus(versionNodePath); + FileStatus status = getFileStatus(versionNodePath); + if (status != null) { byte[] data = readFile(versionNodePath, status.getLen()); Version version = new VersionPBImpl(VersionProto.parseFrom(data)); @@ -167,9 +166,9 @@ protected synchronized void storeVersion() throws Exception { public synchronized long getAndIncrementEpoch() throws Exception { Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); long currentEpoch = 0; - if (fs.exists(epochNodePath)) { + FileStatus status = getFileStatus(epochNodePath); + if (status != null) { // load current epoch - FileStatus status = fs.getFileStatus(epochNodePath); byte[] data = readFile(epochNodePath, status.getLen()); Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); currentEpoch = epoch.getEpoch(); @@ -563,12 +562,25 @@ private void deleteFile(Path deletePath) throws Exception { } } + private FileStatus getFileStatus(Path path) throws Exception { + try { + return fs.getFileStatus(path); + } catch (FileNotFoundException e) { + return null; + } + } + + private void writeFile(Path outputPath, byte[] data) throws Exception { + writeFile(outputPath, data, false); + } + /* * In order to make this write atomic as a part of write we will first write * data to .tmp file and then rename it. Here we are assuming that rename is * atomic for underlying file system. */ - private void writeFile(Path outputPath, byte[] data) throws Exception { + private void writeFile(Path outputPath, byte[] data, boolean overwrite) + throws Exception { Path tempPath = new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); FSDataOutputStream fsOut = null; @@ -579,22 +591,17 @@ private void writeFile(Path outputPath, byte[] data) throws Exception { fsOut.write(data); fsOut.close(); fsOut = null; + if (overwrite) { + fs.delete(outputPath, true); + } fs.rename(tempPath, outputPath); } finally { IOUtils.cleanup(LOG, fsOut); } } - /* - * In order to make this update atomic as a part of write we will first write - * data to .new file and then rename it. Here we are assuming that rename is - * atomic for underlying file system. - */ protected void updateFile(Path outputPath, byte[] data) throws Exception { - Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new"); - // use writeFile to make sure .new file is created atomically - writeFile(newPath, data); - replaceFile(newPath, outputPath); + writeFile(outputPath, data, true); } protected void replaceFile(Path srcPath, Path dstPath) throws Exception {