From 30e1e96cd61229ad83571e8dd9a4f69a70e14701 Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Thu, 5 Dec 2019 18:28:35 +0100 Subject: [PATCH] YARN-6356. Allow different values of yarn.log-aggregation.retain-seconds for succeeded and failed jobs --- .../hadoop/yarn/conf/YarnConfiguration.java | 14 + .../AggregatedLogDeletionService.java | 77 ++- .../TestAggregatedLogDeletionService.java | 571 ++++++++++++------ 3 files changed, 465 insertions(+), 197 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d23b6301efa..49d9fda19a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1391,6 +1391,20 @@ public static boolean isAclEnabled(Configuration conf) { + "log-aggregation.retain-seconds"; public static final long DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS = -1; + /** + * yarn.log-aggregation.retain-seconds for failed and killed + * applications. + */ + public static final String LOG_AGGREGATION_RETAIN_SECONDS_FOR_FAILED = + YARN_PREFIX + "log-aggregation.retain-seconds.failed"; + + /** + * yarn.log-aggregation.retain-seconds for succeeded + * applications. + */ + public static final String LOG_AGGREGATION_RETAIN_SECONDS_FOR_SUCCEEDED = + YARN_PREFIX + "log-aggregation.retain-seconds.succeeded"; + public static final String LOG_AGGREGATION_DEBUG_FILESIZE = YARN_PREFIX + "log-aggregation.debug.filesize"; public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index e0233b31647..dfbae61f7ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -62,6 +61,8 @@ static class LogDeletionTask extends TimerTask { private Configuration conf; private long retentionMillis; + private long retentionMillisForFailed; + private long retentionMillisForSucceeded; private String suffix = null; private Path remoteRootLogDir = null; private ApplicationClientProtocol rmClient = null; @@ -76,11 +77,24 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient factory.getFileControllerForWrite(); this.remoteRootLogDir = fileController.getRemoteRootLogDir(); this.rmClient = rmClient; + this.retentionMillisForFailed = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS_FOR_FAILED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS) * 1000; + this.retentionMillisForFailed = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS_FOR_SUCCEEDED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS) * 1000; } @Override public void run() { long cutoffMillis = System.currentTimeMillis() - retentionMillis; + Long cutOffMillisForFailed, cutOffMillisForSucceeded; + if (retentionMillisForFailed > 0) { + cutOffMillisForFailed = System.currentTimeMillis() - retentionMillisForFailed; + } + if (retentionMillisForFailed > 0) { + cutOffMillisForSucceeded = System.currentTimeMillis() - retentionMillisForSucceeded; + } LOG.info("aggregated log deletion started."); try { FileSystem fs = remoteRootLogDir.getFileSystem(conf); @@ -93,6 +107,7 @@ public void run() { for (FileStatus bucketDir : fs.listStatus(suffixDirPath)) { if (bucketDir.isDirectory()) { deleteOldLogDirsFrom(bucketDir.getPath(), cutoffMillis, + cutOffMillisForFailed, cutOffMillisForSucceeded, fs, rmClient); } } @@ -107,7 +122,8 @@ public void run() { LOG.info("aggregated log deletion finished."); } - private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, + private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, + Long cutOffMillisForFailed, Long cutOffMillisForSucceeded, FileSystem fs, ApplicationClientProtocol rmClient) { FileStatus[] appDirs; try { @@ -117,20 +133,25 @@ private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, return; } for (FileStatus appDir : appDirs) { - deleteAppDirLogs(cutoffMillis, fs, rmClient, appDir); + deleteAppDirLogs(cutoffMillis, cutOffMillisForFailed, + cutOffMillisForSucceeded, fs, rmClient, appDir); } } - private static void deleteAppDirLogs(long cutoffMillis, FileSystem fs, - ApplicationClientProtocol rmClient, - FileStatus appDir) { + private static void deleteAppDirLogs(long cutoffMillis, + Long cutOffMillisForFailed, Long cutOffMillisForSucceeded, + FileSystem fs, ApplicationClientProtocol rmClient, FileStatus appDir) { try { + ApplicationId appId = ApplicationId.fromString( + appDir.getPath().getName()); + YarnApplicationState state = getApplicationState(appId, rmClient); + if (appDir.isDirectory() && appDir.getModificationTime() < cutoffMillis) { - ApplicationId appId = ApplicationId.fromString( - appDir.getPath().getName()); - boolean appTerminated = isApplicationTerminated(appId, rmClient); - if (!appTerminated) { + if (state != null && + state != YarnApplicationState.KILLED && + state != YarnApplicationState.FAILED && + state != YarnApplicationState.FINISHED) { // Application is still running FileStatus[] logFiles; try { @@ -149,13 +170,25 @@ private static void deleteAppDirLogs(long cutoffMillis, FileSystem fs, } } } - } else if (shouldDeleteLogDir(appDir, cutoffMillis, fs)) { - // Application is no longer running - try { - LOG.info("Deleting aggregated logs in " + appDir.getPath()); - fs.delete(appDir.getPath(), true); - } catch (IOException e) { - logException("Could not delete " + appDir.getPath(), e); + } else { + // Application is probably no longer running + long cutoffMillisToUse = cutoffMillis; + if (cutOffMillisForFailed != null && + (state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED)) { + cutoffMillisToUse = cutOffMillisForFailed; + } + if (cutOffMillisForSucceeded != null && + state == YarnApplicationState.FINISHED) { + cutoffMillisToUse = cutOffMillisForSucceeded; + } + if (shouldDeleteLogDir(appDir, cutoffMillisToUse, fs)) { + try { + LOG.info("Deleting aggregated logs in " + appDir.getPath()); + fs.delete(appDir.getPath(), true); + } catch (IOException e) { + logException("Could not delete " + appDir.getPath(), e); + } } } } @@ -181,7 +214,7 @@ private static boolean shouldDeleteLogDir(FileStatus dir, long cutoffMillis, return shouldDelete; } - private static boolean isApplicationTerminated(ApplicationId appId, + private static YarnApplicationState getApplicationState(ApplicationId appId, ApplicationClientProtocol rmClient) throws IOException { ApplicationReport appReport = null; try { @@ -190,14 +223,14 @@ private static boolean isApplicationTerminated(ApplicationId appId, GetApplicationReportRequest.newInstance(appId)) .getApplicationReport(); } catch (ApplicationNotFoundException e) { - return true; + return null; } catch (YarnException e) { throw new IOException(e); } - YarnApplicationState currentState = appReport.getYarnApplicationState(); - return currentState == YarnApplicationState.FAILED + return appReport.getYarnApplicationState(); + /*return currentState == YarnApplicationState.FAILED || currentState == YarnApplicationState.KILLED - || currentState == YarnApplicationState.FINISHED; + || currentState == YarnApplicationState.FINISHED;*/ } public ApplicationClientProtocol getRMClient() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index daa2fc6b01c..5c8c31e7c8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -47,6 +47,13 @@ import static org.mockito.Mockito.*; public class TestAggregatedLogDeletionService { + + private static final String ROOT = "mockfs://foo/"; + private static final Path ROOT_PATH = new Path(ROOT); + private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs"; + + private static final String SUFFIX = "logs"; + private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX; @Before public void closeFilesystems() throws IOException { @@ -54,31 +61,32 @@ public void closeFilesystems() throws IOException { FileSystem.closeAll(); } + private Configuration createTestConf() { + Configuration conf = new Configuration(); + conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); + conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX); + + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), + LogAggregationTFileController.class.getName()); + + return conf; + } + @Test public void testDeletion() throws Exception { long now = System.currentTimeMillis(); long toDeleteTime = now - (2000*1000); long toKeepTime = now - (1500*1000); - String root = "mockfs://foo/"; - String remoteRootLogDir = root+"tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - final Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); - conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); + Configuration conf = createTestConf(); + conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); - Path rootPath = new Path(root); - FileSystem rootFs = rootPath.getFileSystem(conf); - FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); + FileSystem mockFs = getMockFileSystem(conf); - Path remoteRootLogPath = new Path(remoteRootLogDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); Path userDir = new Path(remoteRootLogPath, "me"); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir); @@ -88,36 +96,36 @@ public void testDeletion() throws Exception { ApplicationId appId1 = ApplicationId.newInstance(now, 1); - Path suffixDir = new Path(userDir, newSuffix); + Path suffixDir = new Path(userDir, NEW_SUFFIX); FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, suffixDir); Path bucketDir = LogAggregationUtils.getRemoteBucketDir( - remoteRootLogPath, "me", suffix, appId1); + remoteRootLogPath, "me", SUFFIX, appId1); FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir); Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId1, "me", suffix); + remoteRootLogPath, appId1, "me", SUFFIX); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir); ApplicationId appId2 = ApplicationId.newInstance(now, 2); Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId2, "me", suffix); + remoteRootLogPath, appId2, "me", SUFFIX); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir); ApplicationId appId3 = ApplicationId.newInstance(now, 3); Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId3, "me", suffix); + remoteRootLogPath, appId3, "me", SUFFIX); FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir); ApplicationId appId4 = ApplicationId.newInstance(now, 4); Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId4, "me", suffix); + remoteRootLogPath, appId4, "me", SUFFIX); FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir); @@ -168,23 +176,11 @@ public void testDeletion() throws Exception { final List runningApplications = Collections.unmodifiableList(Arrays.asList(appId4)); + MockRMClientBuilder builder = new MockRMClientBuilder() + .withFinishedApplications(finishedApplications) + .withRunningApplications(runningApplications); AggregatedLogDeletionService deletionService = - new AggregatedLogDeletionService() { - @Override - protected ApplicationClientProtocol createRMClient() - throws IOException { - try { - return createMockRMClient(finishedApplications, - runningApplications); - } catch (Exception e) { - throw new IOException(e); - } - } - @Override - protected void stopRMClient() { - // DO NOTHING - } - }; + createMockLogDeletionService(conf, builder.build()); deletionService.init(conf); deletionService.start(); @@ -205,28 +201,15 @@ public void testRefreshLogRetentionSettings() throws Exception { long before2000Secs = now - (2000 * 1000); //time before 50 sec long before50Secs = now - (50 * 1000); - String root = "mockfs://foo/"; - String remoteRootLogDir = root + "tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - final Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + + Configuration conf = createTestConf(); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); - - Path rootPath = new Path(root); - FileSystem rootFs = rootPath.getFileSystem(conf); - FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem(); + FileSystem mockFs = getMockFileSystem(conf); - Path remoteRootLogPath = new Path(remoteRootLogDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); Path userDir = new Path(remoteRootLogPath, "me"); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs, @@ -235,7 +218,7 @@ public void testRefreshLogRetentionSettings() throws Exception { when(mockFs.listStatus(remoteRootLogPath)).thenReturn( new FileStatus[] { userDirStatus }); - Path suffixDir = new Path(userDir, newSuffix); + Path suffixDir = new Path(userDir, NEW_SUFFIX); FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs, suffixDir); @@ -243,9 +226,9 @@ public void testRefreshLogRetentionSettings() throws Exception { ApplicationId.newInstance(System.currentTimeMillis(), 1); //Set time last modified of app1Dir directory and its files to before2000Secs Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId1, "me", suffix); + remoteRootLogPath, appId1, "me", SUFFIX); Path bucketDir = LogAggregationUtils.getRemoteBucketDir( - remoteRootLogPath, "me", suffix, appId1); + remoteRootLogPath, "me", SUFFIX, appId1); FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, before50Secs, bucketDir); FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs, @@ -255,7 +238,7 @@ public void testRefreshLogRetentionSettings() throws Exception { ApplicationId.newInstance(System.currentTimeMillis(), 2); //Set time last modified of app1Dir directory and its files to before50Secs Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId2, "me", suffix); + remoteRootLogPath, appId2, "me", SUFFIX); FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs, app2Dir); @@ -283,25 +266,10 @@ public void testRefreshLogRetentionSettings() throws Exception { final List finishedApplications = Collections.unmodifiableList(Arrays.asList(appId1, appId2)); - AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() { - @Override - protected Configuration createConf() { - return conf; - } - @Override - protected ApplicationClientProtocol createRMClient() - throws IOException { - try { - return createMockRMClient(finishedApplications, null); - } catch (Exception e) { - throw new IOException(e); - } - } - @Override - protected void stopRMClient() { - // DO NOTHING - } - }; + MockRMClientBuilder builder = new MockRMClientBuilder() + .withFinishedApplications(finishedApplications); + AggregatedLogDeletionService deletionSvc = createMockLogDeletionService( + conf, builder.build()); deletionSvc.init(conf); deletionSvc.start(); @@ -334,29 +302,13 @@ public void testCheckInterval() throws Exception { long now = System.currentTimeMillis(); long toDeleteTime = now - RETENTION_SECS*1000; - String root = "mockfs://foo/"; - String remoteRootLogDir = root+"tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + Configuration conf = createTestConf(); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); + FileSystem mockFs = getMockFileSystem(conf); - // prevent us from picking up the same mockfs instance from another test - FileSystem.closeAll(); - Path rootPath = new Path(root); - FileSystem rootFs = rootPath.getFileSystem(conf); - FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); - - Path remoteRootLogPath = new Path(remoteRootLogDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); Path userDir = new Path(remoteRootLogPath, "me"); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir); @@ -366,13 +318,13 @@ public void testCheckInterval() throws Exception { ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1); - Path suffixDir = new Path(userDir, newSuffix); + Path suffixDir = new Path(userDir, NEW_SUFFIX); FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now, suffixDir); Path bucketDir = LogAggregationUtils.getRemoteBucketDir( - remoteRootLogPath, "me", suffix, appId1); + remoteRootLogPath, "me", SUFFIX, appId1); Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( - remoteRootLogPath, appId1, "me", suffix); + remoteRootLogPath, appId1, "me", SUFFIX); FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, now, bucketDir); @@ -394,22 +346,10 @@ public void testCheckInterval() throws Exception { final List finishedApplications = Collections.unmodifiableList(Arrays.asList(appId1)); - AggregatedLogDeletionService deletionSvc = - new AggregatedLogDeletionService() { - @Override - protected ApplicationClientProtocol createRMClient() - throws IOException { - try { - return createMockRMClient(finishedApplications, null); - } catch (Exception e) { - throw new IOException(e); - } - } - @Override - protected void stopRMClient() { - // DO NOTHING - } - }; + MockRMClientBuilder builder = new MockRMClientBuilder() + .withFinishedApplications(finishedApplications); + AggregatedLogDeletionService deletionSvc = createMockLogDeletionService( + conf, builder.build()); deletionSvc.init(conf); deletionSvc.start(); @@ -438,33 +378,17 @@ protected void stopRMClient() { public void testRobustLogDeletion() throws Exception { final long RETENTION_SECS = 10 * 24 * 3600; - String root = "mockfs://foo/"; - String remoteRootLogDir = root+"tmp/logs"; - String suffix = "logs"; - String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix; - Configuration conf = new Configuration(); - conf.setClass("fs.mockfs.impl", MockFileSystem.class, - FileSystem.class); - conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true"); + Configuration conf = createTestConf(); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); - conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); - conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); - conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), - LogAggregationTFileController.class.getName()); - // prevent us from picking up the same mockfs instance from another test - FileSystem.closeAll(); - Path rootPath = new Path(root); - FileSystem rootFs = rootPath.getFileSystem(conf); - FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem(); + FileSystem mockFs = getMockFileSystem(conf); - Path remoteRootLogPath = new Path(remoteRootLogDir); + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); Path userDir = new Path(remoteRootLogPath, "me"); - Path suffixDir = new Path(userDir, newSuffix); + Path suffixDir = new Path(userDir, NEW_SUFFIX); FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir); FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir); Path bucketDir = new Path(suffixDir, String.valueOf(0)); @@ -505,8 +429,10 @@ public void testRobustLogDeletion() throws Exception { final List finishedApplications = Collections.unmodifiableList(Arrays.asList(appId1, appId3)); - ApplicationClientProtocol rmClient = - createMockRMClient(finishedApplications, null); + MockRMClientBuilder builder = new MockRMClientBuilder() + .withFinishedApplications(finishedApplications); + ApplicationClientProtocol rmClient = builder.build(); + AggregatedLogDeletionService.LogDeletionTask deletionTask = new AggregatedLogDeletionService.LogDeletionTask(conf, RETENTION_SECS, @@ -515,6 +441,251 @@ public void testRobustLogDeletion() throws Exception { verify(mockFs).delete(app3Dir, true); } + @Test + public void testFailedAndSucceededAppLogDeletion() throws Exception { + long now = System.currentTimeMillis(); + long toDeleteTimeForFailed = now - (2500 * 1000); + long toKeepTimeForFailed = now - (2000 * 1000); + long toDeleteTimeForSucceeded = now - (2000 * 1000); + long toKeepTimeForSucceeded = now - (1500 * 1000); + long latestTime = toDeleteTimeForFailed; + + Configuration conf = createTestConf(); + conf.setLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS_FOR_FAILED, 2200); + conf.setLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS_FOR_SUCCEEDED, 1800); + + FileSystem mockFs = getMockFileSystem(conf); + + Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR); + + Path userDir = new Path(remoteRootLogPath, "me"); + FileStatus userDirStatus = + new FileStatus(0, true, 0, 0, latestTime, userDir); + + when(mockFs.listStatus(remoteRootLogPath)).thenReturn( + new FileStatus[]{userDirStatus}); + + ApplicationId appId1 = + ApplicationId.newInstance(now, 1); + Path suffixDir = new Path(userDir, NEW_SUFFIX); + FileStatus suffixDirStatus = new FileStatus(0, true, + 0, 0, latestTime, suffixDir); + Path bucketDir = LogAggregationUtils.getRemoteBucketDir( + remoteRootLogPath, "me", SUFFIX, appId1); + FileStatus bucketDirStatus = + new FileStatus(0, true, 0, 0, latestTime, bucketDir); + + Path app1Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId1, "me", SUFFIX); + FileStatus app1DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app1Dir); + + ApplicationId appId2 = + ApplicationId.newInstance(now, 2); + Path app2Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId2, "me", SUFFIX); + FileStatus app2DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app2Dir); + + ApplicationId appId3 = + ApplicationId.newInstance(now, 3); + Path app3Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId3, "me", SUFFIX); + FileStatus app3DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app3Dir); + + ApplicationId appId4 = + ApplicationId.newInstance(now, 4); + Path app4Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId4, "me", SUFFIX); + FileStatus app4DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app4Dir); + + ApplicationId appId5 = + ApplicationId.newInstance(now, 5); + Path app5Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId5, "me", SUFFIX); + FileStatus app5DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app5Dir); + + ApplicationId appId6 = + ApplicationId.newInstance(now, 6); + Path app6Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId6, "me", SUFFIX); + FileStatus app6DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app6Dir); + + ApplicationId appId7 = + ApplicationId.newInstance(now, 7); + Path app7Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId7, "me", SUFFIX); + FileStatus app7DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app7Dir); + + ApplicationId appId8 = + ApplicationId.newInstance(now, 8); + Path app8Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId8, "me", SUFFIX); + FileStatus app8DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app8Dir); + + ApplicationId appId9 = + ApplicationId.newInstance(now, 9); + Path app9Dir = LogAggregationUtils.getRemoteAppLogDir( + remoteRootLogPath, appId9, "me", SUFFIX); + FileStatus app9DirStatus = + new FileStatus(0, true, 0, 0, latestTime, app9Dir); + + when(mockFs.listStatus(userDir)).thenReturn( + new FileStatus[] {suffixDirStatus}); + when(mockFs.listStatus(suffixDir)).thenReturn( + new FileStatus[] {bucketDirStatus}); + when(mockFs.listStatus(bucketDir)).thenReturn( + new FileStatus[] {app1DirStatus, app2DirStatus, app3DirStatus, + app4DirStatus, app5DirStatus, app6DirStatus, app7DirStatus, + app8DirStatus, app9DirStatus}); + + when(mockFs.listStatus(app1Dir)).thenReturn( + new FileStatus[]{}); + + + Path app2Log1 = new Path(app2Dir, "host1"); + FileStatus app2Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForSucceeded, app2Log1); + + Path app2Log2 = new Path(app2Dir, "host2"); + FileStatus app2Log2Status = + new FileStatus(10, false, 1, 1, toKeepTimeForSucceeded, app2Log2); + + when(mockFs.listStatus(app2Dir)).thenReturn( + new FileStatus[]{app2Log1Status, app2Log2Status}); + + + Path app3Log1 = new Path(app3Dir, "host1"); + FileStatus app3Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForSucceeded, app3Log1); + + Path app3Log2 = new Path(app3Dir, "host2"); + FileStatus app3Log2Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForSucceeded, app3Log2); + + when(mockFs.listStatus(app3Dir)).thenReturn( + new FileStatus[]{app3Log1Status, app3Log2Status}); + + + Path app4Log1 = new Path(app4Dir, "host1"); + FileStatus app4Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForSucceeded, app4Log1); + + Path app4Log2 = new Path(app4Dir, "host2"); + FileStatus app4Log2Status = + new FileStatus(10, false, 1, 1, toKeepTimeForSucceeded, app4Log2); + + when(mockFs.listStatus(app4Dir)).thenReturn( + new FileStatus[]{app4Log1Status, app4Log2Status}); + + + Path app5Log1 = new Path(app5Dir, "host1"); + FileStatus app5Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForSucceeded, app5Log1); + + Path app5Log2 = new Path(app5Dir, "host2"); + FileStatus app5Log2Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForSucceeded, app5Log2); + + when(mockFs.listStatus(app5Dir)).thenReturn( + new FileStatus[]{app5Log1Status, app5Log2Status}); + + + Path app6Log1 = new Path(app6Dir, "host1"); + FileStatus app6Log1Status = + new FileStatus(10, false, 1, 1, toKeepTimeForFailed, app6Log1); + + Path app6Log2 = new Path(app6Dir, "host2"); + FileStatus app6Log2Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForFailed, app6Log2); + + when(mockFs.listStatus(app6Dir)).thenReturn( + new FileStatus[]{app6Log1Status, app6Log2Status}); + + + Path app7Log1 = new Path(app7Dir, "host1"); + FileStatus app7Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForFailed, app7Log1); + + Path app7Log2 = new Path(app7Dir, "host2"); + FileStatus app7Log2Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForFailed, app7Log2); + + when(mockFs.listStatus(app7Dir)).thenReturn( + new FileStatus[]{app7Log1Status, app7Log2Status}); + + + Path app8Log1 = new Path(app8Dir, "host1"); + FileStatus app8Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForFailed, app8Log1); + + Path app8Log2 = new Path(app8Dir, "host2"); + FileStatus app8Log2Status = + new FileStatus(10, false, 1, 1, toKeepTimeForFailed, app8Log2); + + when(mockFs.listStatus(app8Dir)).thenReturn( + new FileStatus[]{app8Log1Status, app8Log2Status}); + + + Path app9Log1 = new Path(app9Dir, "host1"); + FileStatus app9Log1Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForFailed, app9Log1); + + Path app9Log2 = new Path(app9Dir, "host2"); + FileStatus app9Log2Status = + new FileStatus(10, false, 1, 1, toDeleteTimeForFailed, app9Log2); + + when(mockFs.listStatus(app9Dir)).thenReturn( + new FileStatus[]{app9Log1Status, app9Log2Status}); + + final List finishedApplications = + Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3)); + final List runningApplications = + Collections.unmodifiableList(Arrays.asList(appId4, appId5)); + final List killedApplications = + Collections.unmodifiableList(Arrays.asList(appId6, appId7)); + final List failedApplications = + Collections.unmodifiableList(Arrays.asList(appId8, appId9)); + + MockRMClientBuilder builder = new MockRMClientBuilder() + .withFinishedApplications(finishedApplications) + .withRunningApplications(runningApplications) + .withKilledApplications(killedApplications) + .withFailedApplications(failedApplications); + AggregatedLogDeletionService deletionService = + createMockLogDeletionService(conf, builder.build()); + deletionService.init(conf); + deletionService.start(); + + verify(mockFs, timeout(2000)).delete(app1Dir, true); + + verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true); + verify(mockFs, timeout(2000).times(0)).delete(app2Log1, true); + verify(mockFs, timeout(2000)).delete(app2Log2, true); + verify(mockFs, timeout(2000)).delete(app3Dir, true); + + verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true); + verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true); + + verify(mockFs, timeout(2000).times(0)).delete(app6Dir, true); + verify(mockFs, timeout(2000).times(0)).delete(app6Log1, true); + verify(mockFs, timeout(2000)).delete(app6Log2, true); + verify(mockFs, timeout(2000)).delete(app7Dir, true); + + verify(mockFs, timeout(2000).times(0)).delete(app8Dir, true); + verify(mockFs, timeout(2000)).delete(app8Log1, true); + verify(mockFs, timeout(2000).times(0)).delete(app8Log2, true); + verify(mockFs, timeout(2000)).delete(app9Dir, true); + + deletionService.stop(); + } + static class MockFileSystem extends FilterFileSystem { MockFileSystem() { super(mock(FileSystem.class)); @@ -522,53 +693,103 @@ public void testRobustLogDeletion() throws Exception { public void initialize(URI name, Configuration conf) throws IOException {} } - private static ApplicationClientProtocol createMockRMClient( - List finishedApplicaitons, - List runningApplications) throws Exception { - final ApplicationClientProtocol mockProtocol = - mock(ApplicationClientProtocol.class); - if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) { - for (ApplicationId appId : finishedApplicaitons) { - GetApplicationReportRequest request = - GetApplicationReportRequest.newInstance(appId); - GetApplicationReportResponse response = - createApplicationReportWithFinishedApplication(); - when(mockProtocol.getApplicationReport(request)) - .thenReturn(response); - } + private FileSystem getMockFileSystem(Configuration conf) throws IOException { + FileSystem rootFs = ROOT_PATH.getFileSystem(conf); + return ((FilterFileSystem)rootFs).getRawFileSystem(); + } + + private static class MockRMClientBuilder { + private List runningApplications; + private List finishedApplications; + private List failedApplications; + private List killedApplications; + + public MockRMClientBuilder withRunningApplications( + List runningApplications) { + this.runningApplications = runningApplications; + return this; } - if (runningApplications != null && !runningApplications.isEmpty()) { - for (ApplicationId appId : runningApplications) { - GetApplicationReportRequest request = - GetApplicationReportRequest.newInstance(appId); - GetApplicationReportResponse response = - createApplicationReportWithRunningApplication(); - when(mockProtocol.getApplicationReport(request)) - .thenReturn(response); + + public MockRMClientBuilder withFinishedApplications( + List finishedApplications) { + this.finishedApplications = finishedApplications; + return this; + } + + public MockRMClientBuilder withFailedApplications( + List failedApplications) { + this.failedApplications = failedApplications; + return this; + } + + public MockRMClientBuilder withKilledApplications( + List killedApplications) { + this.killedApplications = killedApplications; + return this; + } + + private void populateMockReports(ApplicationClientProtocol mockProtocol, + List apps, YarnApplicationState state) + throws Exception { + if (apps != null && !apps.isEmpty()) { + for (ApplicationId appId : apps) { + GetApplicationReportRequest request = + GetApplicationReportRequest.newInstance(appId); + GetApplicationReportResponse response = + createApplicationReport(state); + when(mockProtocol.getApplicationReport(request)) + .thenReturn(response); + } } } - return mockProtocol; + + public ApplicationClientProtocol build() throws Exception { + final ApplicationClientProtocol mockProtocol = + mock(ApplicationClientProtocol.class); + + populateMockReports(mockProtocol, finishedApplications, + YarnApplicationState.FINISHED); + populateMockReports(mockProtocol, runningApplications, + YarnApplicationState.RUNNING); + populateMockReports(mockProtocol, killedApplications, + YarnApplicationState.KILLED); + populateMockReports(mockProtocol, failedApplications, + YarnApplicationState.FAILED); + + return mockProtocol; + } } - private static GetApplicationReportResponse - createApplicationReportWithRunningApplication() { + private static GetApplicationReportResponse createApplicationReport( + YarnApplicationState state) { ApplicationReport report = mock(ApplicationReport.class); - when(report.getYarnApplicationState()).thenReturn( - YarnApplicationState.RUNNING); + when(report.getYarnApplicationState()).thenReturn(state); GetApplicationReportResponse response = mock(GetApplicationReportResponse.class); when(response.getApplicationReport()).thenReturn(report); return response; } - private static GetApplicationReportResponse - createApplicationReportWithFinishedApplication() { - ApplicationReport report = mock(ApplicationReport.class); - when(report.getYarnApplicationState()).thenReturn( - YarnApplicationState.FINISHED); - GetApplicationReportResponse response = - mock(GetApplicationReportResponse.class); - when(response.getApplicationReport()).thenReturn(report); - return response; + private AggregatedLogDeletionService createMockLogDeletionService( + Configuration conf, ApplicationClientProtocol mockRMClient) { + return new AggregatedLogDeletionService() { + @Override + protected Configuration createConf() { + return conf; + } + @Override + protected ApplicationClientProtocol createRMClient() + throws IOException { + try { + return mockRMClient; + } catch (Exception e) { + throw new IOException(e); + } + } + @Override + protected void stopRMClient() { + // DO NOTHING + } + }; } } -- 2.21.0