From d606ed2476e7e9ea38058389ab96b4a1f280fe46 Mon Sep 17 00:00:00 2001 From: Craig Condit Date: Mon, 20 May 2019 18:13:25 -0500 Subject: [PATCH] YARN-9071 RM state store purging should be configurable if log aggregation does not terminate --- .../hadoop/yarn/conf/YarnConfiguration.java | 16 +++++++++ .../server/resourcemanager/RMAppManager.java | 15 ++++++-- .../resourcemanager/TestAppManager.java | 36 +++++++++++++++++++ 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index b4ed2b00dae..b7d4df8d4b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1018,6 +1018,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS = DEFAULT_RM_MAX_COMPLETED_APPLICATIONS; + /** + * The maximum time in seconds between application termination and purging + * the application from the RM state, regardless of log aggregation status. + * Setting the value to zero will force purging to wait for log aggregation + * to reach a terminal state. + */ + public static final String RM_COMPLETED_APPLICATION_TTL_SECS = + RM_PREFIX + "completed-application-ttl-secs"; + + /** + * The default value for + * {@code yarn.resourcemanager.completed-application-ttl-secs}. + */ + public static final int DEFAULT_RM_COMPLETED_APPLICATION_TTL_SECS = 0; + + /** Default application name */ public static final String DEFAULT_APPLICATION_NAME = "N/A"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index bdc68acf3a8..e9537ca2ec8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -90,6 +90,7 @@ private int maxCompletedAppsInMemory; private int maxCompletedAppsInStateStore; protected int completedAppsInStateStore = 0; + private long completedApplicationTtlMs = 0; protected LinkedList completedApps = new LinkedList<>(); private final RMContext rmContext; @@ -119,6 +120,9 @@ public RMAppManager(RMContext context, if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) { this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory; } + this.completedApplicationTtlMs = 1000L * conf.getLong( + YarnConfiguration.RM_COMPLETED_APPLICATION_TTL_SECS, + YarnConfiguration.DEFAULT_RM_COMPLETED_APPLICATION_TTL_SECS); this.authorizer = YarnAuthorizationProvider.getInstance(conf); this.timelineServiceV2Enabled = YarnConfiguration. timelineServiceV2Enabled(conf); @@ -357,9 +361,14 @@ private void removeCompletedAppsFromMemory() { } } - private boolean shouldDeleteApp(RMApp app) { - return !app.isLogAggregationEnabled() - || app.isLogAggregationFinished(); + @VisibleForTesting + boolean shouldDeleteApp(RMApp app) { + long now = System.currentTimeMillis(); + boolean ttlExpired = (completedApplicationTtlMs > 0L) && + now > (app.getFinishTime() + completedApplicationTtlMs); + boolean aggregationDone = + !app.isLogAggregationEnabled() || app.isLogAggregationFinished(); + return ttlExpired || aggregationDone; } @SuppressWarnings("unchecked") diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index adb638619b4..527766674d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -713,6 +713,42 @@ public void testStateStoreAppLimitGreaterThanMemoryAppLimit() { assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved)); } + @Test + public void testPurgeOfAppWhenAggregationIsNotFinishedButExpired() { + long now = System.currentTimeMillis(); + MockRMApp rmApp = new MockRMApp(1, now - 60000, RMAppState.FINISHED); + rmApp.setLogAggregationEnabled(true); + rmApp.setLogAggregationFinished(false); + rmApp.setFinishTime(now - 40000); + + final int allApps = 10; + RMContext rmContext = + mockRMContextWithMixedLogAggregationStatus(allApps, now - 60000); + Configuration conf = new YarnConfiguration(); + conf.setLong(YarnConfiguration.RM_COMPLETED_APPLICATION_TTL_SECS, 30L); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + assertTrue(appMonitor.shouldDeleteApp(rmApp)); + } + + @Test + public void testPurgeOfAppWhenAggregationIsNotFinishedAndNotExpired() { + long now = System.currentTimeMillis(); + MockRMApp rmApp = new MockRMApp(1, now - 60000, RMAppState.FINISHED); + rmApp.setLogAggregationEnabled(true); + rmApp.setLogAggregationFinished(false); + rmApp.setFinishTime(now - 20000); + + final int allApps = 10; + RMContext rmContext = + mockRMContextWithMixedLogAggregationStatus(allApps, now - 60000); + Configuration conf = new YarnConfiguration(); + conf.setLong(YarnConfiguration.RM_COMPLETED_APPLICATION_TTL_SECS, 30L); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + Assert.assertFalse(appMonitor.shouldDeleteApp(rmApp)); + } + @Test public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() { long now = System.currentTimeMillis(); -- 2.20.1