From 993037c9b54bd7bd3e7bcabf253ad68c272af0ca Mon Sep 17 00:00:00 2001 From: Prabhu Joseph Date: Sat, 6 Apr 2019 00:14:28 +0530 Subject: [PATCH] YARN-9080 --- .../timeline/EntityGroupFSTimelineStore.java | 65 +++++++++++++++++----- .../timeline/TestEntityGroupFSTimelineStore.java | 53 +++++++++++++++++- 2 files changed, 100 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 80baf89..498230a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -456,43 +458,76 @@ private AppLogs getAndSetAppLogs(ApplicationId applicationId) * dirpath should be a directory that contains a set of * application log directories. The cleaner method will not * work if the given dirpath itself is an application log dir. - * @param fs * @param retainMillis * @throws IOException */ @InterfaceAudience.Private @VisibleForTesting - void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) + void cleanLogs(Path dirpath, long retainMillis) throws IOException { long now = Time.now(); + RemoteIterator iter = list(dirpath); + while (iter.hasNext()) { + FileStatus stat = iter.next(); + Path clusterTimeStampPath = stat.getPath(); + if (isValidClusterTimeStampDir(clusterTimeStampPath)) { + MutableBoolean appLogDirPresent = new MutableBoolean(false); + cleanAppLogDir(clusterTimeStampPath, retainMillis, appLogDirPresent); + if (appLogDirPresent.isFalse() && + (now - stat.getModificationTime() > retainMillis)) { + deleteDir(clusterTimeStampPath); + } + } + } + } + + + private void cleanAppLogDir(Path dirpath, long retainMillis, + MutableBoolean appLogDirPresent) throws IOException { + long now = Time.now(); // Depth first search from root directory for all application log dirs RemoteIterator iter = list(dirpath); while (iter.hasNext()) { FileStatus stat = iter.next(); + Path childPath = stat.getPath(); if (stat.isDirectory()) { // If current is an application log dir, decide if we need to remove it // and remove if necessary. // Otherwise, keep iterating into it. - ApplicationId appId = parseApplicationId(dirpath.getName()); + ApplicationId appId = parseApplicationId(childPath.getName()); if (appId != null) { // Application log dir - if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) { - try { - LOG.info("Deleting {}", dirpath); - if (!fs.delete(dirpath, true)) { - LOG.error("Unable to remove " + dirpath); - } - metrics.incrLogsDirsCleaned(); - } catch (IOException e) { - LOG.error("Unable to remove " + dirpath, e); - } + appLogDirPresent.setTrue(); + if (shouldCleanAppLogDir(childPath, now, fs, retainMillis)) { + deleteDir(childPath); } } else { // Keep cleaning inside - cleanLogs(stat.getPath(), fs, retainMillis); + cleanAppLogDir(childPath, retainMillis, appLogDirPresent); } } } } + private void deleteDir(Path path) { + try { + LOG.info("Deleting {}", path); + if (fs.delete(path, true)) { + metrics.incrLogsDirsCleaned(); + } else { + LOG.error("Unable to remove {}", path); + } + } catch (IOException e) { + LOG.error("Unable to remove {}", path, e); + } + } + + private boolean isValidClusterTimeStampDir(Path clusterTimeStampPath) + throws IOException { + FileStatus stat = fs.getFileStatus(clusterTimeStampPath); + return stat.isDirectory() && + StringUtils.isNumeric(clusterTimeStampPath.getName()); + } + + private static boolean shouldCleanAppLogDir(Path appLogPath, long now, FileSystem fs, long logRetainMillis) throws IOException { RemoteIterator iter = fs.listStatusIterator(appLogPath); @@ -908,7 +943,7 @@ public void run() { LOG.debug("Cleaner starting"); long startTime = Time.monotonicNow(); try { - cleanLogs(doneRootPath, fs, logRetainMillis); + cleanLogs(doneRootPath, logRetainMillis); } catch (Exception e) { Throwable t = extract(e); if (t instanceof InterruptedException) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 61da3c8..c2d5d6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -267,7 +267,8 @@ public void testCleanLogs() throws Exception { Path irrelevantDirPath = new Path(testDoneDirPath, "irrelevant"); fs.mkdirs(irrelevantDirPath); - Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001"); + Path doneAppHomeDir = new Path(new Path(new Path(testDoneDirPath, + Long.toString(mainTestAppId.getClusterTimestamp())), "0000"), "001"); // First application, untouched after creation Path appDirClean = new Path(doneAppHomeDir, appDirName); Path attemptDirClean = new Path(appDirClean, attemptDirName); @@ -299,7 +300,7 @@ public void testCleanLogs() throws Exception { // Should retain all logs after this run MutableCounterLong dirsCleaned = store.metrics.getLogsDirsCleaned(); long before = dirsCleaned.value(); - store.cleanLogs(testDoneDirPath, fs, 10000); + store.cleanLogs(testDoneDirPath, 10000); assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantFilePath)); assertTrue(fs.exists(filePath)); @@ -316,7 +317,7 @@ public void testCleanLogs() throws Exception { // Touch the third application by creating a new dir fs.mkdirs(new Path(dirPathHold, "holdByMe")); - store.cleanLogs(testDoneDirPath, fs, 1000); + store.cleanLogs(testDoneDirPath, 1000); // Verification after the second cleaner call assertTrue(fs.exists(irrelevantDirPath)); @@ -332,6 +333,52 @@ public void testCleanLogs() throws Exception { } @Test + public void testCleanBuckets() throws Exception { + // ClusterTimeStampDir with App Log Dirs + Path clusterTimeStampDir1 = new Path(testDoneDirPath, + Long.toString(sampleAppIds.get(0).getClusterTimestamp())); + Path appDir1 = new Path(new Path(new Path( + clusterTimeStampDir1, "0000"), "000"), sampleAppIds.get(0).toString()); + Path appDir2 = new Path(new Path(new Path( + clusterTimeStampDir1, "0000"), "001"), sampleAppIds.get(1).toString()); + Path appDir3 = new Path(new Path(new Path( + clusterTimeStampDir1, "0000"), "002"), sampleAppIds.get(2).toString()); + Path appDir4 = new Path(new Path(new Path( + clusterTimeStampDir1, "0001"), "000"), sampleAppIds.get(3).toString()); + + // ClusterTimeStampDir with no App Log Dirs + Path clusterTimeStampDir2 = new Path(testDoneDirPath, "1235"); + + // Irrevelant ClusterTimeStampDir + Path clusterTimeStampDir3 = new Path(testDoneDirPath, "irrevelant"); + Path appDir5 = new Path(new Path(new Path( + clusterTimeStampDir3, "0000"), "000"), sampleAppIds.get(4).toString()); + + fs.mkdirs(appDir1); + fs.mkdirs(appDir2); + fs.mkdirs(appDir3); + fs.mkdirs(appDir4); + fs.mkdirs(clusterTimeStampDir2); + fs.mkdirs(appDir5); + + Thread.sleep(2000); + + store.cleanLogs(testDoneDirPath, 1000); + + // ClusterTimeStampDir will be removed only if no App Log Dir Present + assertTrue(fs.exists(clusterTimeStampDir1)); + assertFalse(fs.exists(appDir1)); + assertFalse(fs.exists(appDir2)); + assertFalse(fs.exists(appDir3)); + assertFalse(fs.exists(appDir4)); + assertFalse(fs.exists(clusterTimeStampDir2)); + assertTrue(fs.exists(appDir5)); + + store.cleanLogs(testDoneDirPath, 1000); + assertFalse(fs.exists(clusterTimeStampDir1)); + } + + @Test public void testPluginRead() throws Exception { // Verify precondition assertEquals(EntityGroupPlugInForTest.class.getName(), -- 2.7.4 (Apple Git-66)