diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 7011aaa6ca8..c543d9ec2b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -284,31 +284,72 @@ protected void writeAuditLog(ApplicationId appId) { * check to see if hit the limit for max # completed apps kept */ protected synchronized void checkAppNumCompletedLimit() { - // check apps kept in state store. - while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) { - ApplicationId removeId = - completedApps.get(completedApps.size() - completedAppsInStateStore); + if (completedAppsInStateStore > maxCompletedAppsInStateStore) { + removeCompletedAppsFromStateStore(); + } + + if (completedApps.size() > maxCompletedAppsInMemory) { + removeCompletedAppsFromMemory(); + } + } + + private void removeCompletedAppsFromStateStore() { + int numDelete = completedAppsInStateStore - maxCompletedAppsInStateStore; + for (int i = 0; i < numDelete; i++) { + ApplicationId removeId = completedApps.get(i); RMApp removeApp = rmContext.getRMApps().get(removeId); - LOG.info("Max number of completed apps kept in state store met:" - + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore - + ", removing app " + removeApp.getApplicationId() - + " from state store."); - rmContext.getStateStore().removeApplication(removeApp); - completedAppsInStateStore--; + boolean deleteApp = shouldDeleteApp(removeApp); + + if (deleteApp) { + LOG.info("Max number of completed apps kept in state store met:" + + " maxCompletedAppsInStateStore = " + + maxCompletedAppsInStateStore + ", removing app " + removeId + + " from state store."); + rmContext.getStateStore().removeApplication(removeApp); + completedAppsInStateStore--; + } else { + LOG.info("Max number of completed apps kept in state store met:" + + " maxCompletedAppsInStateStore = " + + maxCompletedAppsInStateStore + ", but not removing app " + + removeId + + " from state store as log aggregation have not finished yet."); + } } + } - // check apps kept in memorty. - while (completedApps.size() > this.maxCompletedAppsInMemory) { - ApplicationId removeId = completedApps.remove(); - LOG.info("Application should be expired, max number of completed apps" - + " kept in memory met: maxCompletedAppsInMemory = " - + this.maxCompletedAppsInMemory + ", removing app " + removeId - + " from memory: "); - rmContext.getRMApps().remove(removeId); - this.applicationACLsManager.removeApplication(removeId); + private void removeCompletedAppsFromMemory() { + int numDelete = completedApps.size() - maxCompletedAppsInMemory; + int offset = 0; + for (int i = 0; i < numDelete; i++) { + int deletionIdx = i - offset; + ApplicationId removeId = completedApps.get(deletionIdx); + RMApp removeApp = rmContext.getRMApps().get(removeId); + boolean deleteApp = shouldDeleteApp(removeApp); + + if (deleteApp) { + ++offset; + LOG.info("Application should be expired, max number of completed apps" + + " kept in memory met: maxCompletedAppsInMemory = " + + this.maxCompletedAppsInMemory + ", removing app " + removeId + + " from memory: "); + completedApps.remove(deletionIdx); + rmContext.getRMApps().remove(removeId); + this.applicationACLsManager.removeApplication(removeId); + } else { + LOG.info("Application should be expired, max number of completed apps" + + " kept in memory met: maxCompletedAppsInMemory = " + + this.maxCompletedAppsInMemory + ", but not removing app " + + removeId + + " from memory as log aggregation have not finished yet."); + } } } + private boolean shouldDeleteApp(RMApp app) { + return !app.isLogAggregationEnabled() + || app.isLogAggregationFinished(); + } + @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 99cce87102d..535888cfcb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -242,7 +242,11 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, * @return the number of max attempts of the application. */ int getMaxAppAttempts(); - + + boolean isLogAggregationEnabled(); + + boolean isLogAggregationFinished(); + /** * Returns the application type * @return the application type. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 9f1ea4403f4..a4d6dfeb89d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -903,7 +903,6 @@ public int getMaxAppAttempts() { @Override public void handle(RMAppEvent event) { - this.writeLock.lock(); try { @@ -1459,8 +1458,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } - private static final class AppRejectedTransition extends - FinalTransition{ + private static final class AppRejectedTransition extends FinalTransition { public AppRejectedTransition() { super(RMAppState.FAILED); } @@ -1502,39 +1500,51 @@ static void appAdminClientCleanUp(RMAppImpl app) { private final RMAppState finalState; - public FinalTransition(RMAppState finalState) { + FinalTransition(RMAppState finalState) { this.finalState = finalState; } + @Override public void transition(RMAppImpl app, RMAppEvent event) { app.logAggregationStartTime = System.currentTimeMillis(); + cleanAppInRMNodes(app); + recordFinishTime(app); + removeAppFromScheduler(app); + sendEventToAppManager(app, RMAppManagerEventType.APP_COMPLETED); + handleAppFinishedWithRmContext(app); + app.clearUnusedFields(); + appAdminClientCleanUp(app); + } + + private void cleanAppInRMNodes(RMAppImpl app) { for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( - new RMNodeCleanAppEvent(nodeId, app.applicationId)); + new RMNodeCleanAppEvent(nodeId, app.applicationId)); } + } + + private void recordFinishTime(RMAppImpl app) { app.finishTime = app.storedFinishTime; - if (app.finishTime == 0 ) { + if (app.finishTime == 0) { app.finishTime = app.systemClock.getTime(); } + } + + private void removeAppFromScheduler(RMAppImpl app) { // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. if (app.recoveredFinalState == null) { app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + finalState)); } - app.handler.handle( - new RMAppManagerEvent(app.applicationId, - RMAppManagerEventType.APP_COMPLETED)); + } + private void handleAppFinishedWithRmContext(RMAppImpl app) { app.rmContext.getRMApplicationHistoryWriter() - .applicationFinished(app, finalState); + .applicationFinished(app, finalState); app.rmContext.getSystemMetricsPublisher() - .appFinished(app, finalState, app.finishTime); - // set the memory free - app.clearUnusedFields(); - - appAdminClientCleanUp(app); - }; + .appFinished(app, finalState, app.finishTime); + } } public int getNumFailedAppAttempts() { @@ -1549,7 +1559,7 @@ public int getNumFailedAppAttempts() { return completedAttempts; } - private static final class AttemptFailedTransition implements + private static final class AttemptFailedTransition implements MultipleArcTransition { private final RMAppState initialState; @@ -1812,8 +1822,8 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { == LogAggregationStatus.TIME_OUT && report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, + // If the log aggregation status got from latest NM heartbeat + // is RUNNING, and current log aggregation status is TIME_OUT, // based on whether there are any failure messages for this NM, // we will reset the log aggregation status as RUNNING or // RUNNING_WITH_FAILURE @@ -1912,7 +1922,13 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { } } - private boolean isLogAggregationFinished() { + @Override + public boolean isLogAggregationEnabled() { + return logAggregationEnabled; + } + + @Override + public boolean isLogAggregationFinished() { return this.logAggregationStatusForAppReport .equals(LogAggregationStatus.SUCCEEDED) || this.logAggregationStatusForAppReport @@ -2131,4 +2147,10 @@ protected void onInvalidStateTransition(RMAppEventType rmAppEventType, RMAppState state){ /* TODO fail the application on the failed transition */ } + + private static void sendEventToAppManager(RMAppImpl app, + RMAppManagerEventType event) { + app.handler.handle( + new RMAppManagerEvent(app.applicationId, event)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 6a6f9cf0271..47f32dbb020 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -19,28 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.isA; -import static org.mockito.Matchers.matches; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -88,28 +69,48 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity - .ManagedParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import static java.util.stream.Collectors.toSet; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.matches; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Testing applications being retired from RM. @@ -131,7 +132,7 @@ public synchronized void setAppEventType(RMAppEventType newType) { } - public static List newRMApps(int n, long time, RMAppState state) { + private static List newRMApps(int n, long time, RMAppState state) { List list = Lists.newArrayList(); for (int i = 0; i < n; ++i) { list.add(new MockRMApp(i, time, state)); @@ -139,23 +140,63 @@ public synchronized void setAppEventType(RMAppEventType newType) { return list; } + private static List newRMAppsMixedLogAggregationStatus(int n, + long time, RMAppState state) { + List list = Lists.newArrayList(); + for (int i = 0; i < n; ++i) { + MockRMApp rmApp = new MockRMApp(i, time, state); + rmApp.setLogAggregationEnabled(true); + rmApp.setLogAggregationFinished(i % 2 == 0); + list.add(rmApp); + } + return list; + } + public RMContext mockRMContext(int n, long time) { + final ConcurrentMap map = createRMAppsMap(n, time); + return createMockRMContextInternal(map); + } + + public RMContext mockRMContextWithMixedLogAggregationStatus(int n, + long time) { + final ConcurrentMap map = + createRMAppsMapMixedLogAggregationStatus(n, time); + return createMockRMContextInternal(map); + } + + private ConcurrentMap createRMAppsMap(int n, + long time) { final List apps = newRMApps(n, time, RMAppState.FINISHED); final ConcurrentMap map = Maps.newConcurrentMap(); for (RMApp app : apps) { map.put(app.getApplicationId(), app); } + return map; + } + + private ConcurrentMap createRMAppsMapMixedLogAggregationStatus( + int n, long time) { + final List apps = + newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED); + final ConcurrentMap map = Maps.newConcurrentMap(); + for (RMApp app : apps) { + map.put(app.getApplicationId(), app); + } + return map; + } + + private RMContext createMockRMContextInternal(ConcurrentMap map) { Dispatcher rmDispatcher = new AsyncDispatcher(); ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer( - rmDispatcher); + rmDispatcher); AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor( - rmDispatcher); + rmDispatcher); AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor( - rmDispatcher); + rmDispatcher); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMContext context = new RMContextImpl(rmDispatcher, - containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, - null, null, null, null, null) { + containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, + null, null, null, null, null) { @Override public ConcurrentMap getRMApps() { return map; @@ -198,9 +239,13 @@ public void handle(RMAppEvent event) { // Extend and make the functions we want to test public public class TestRMAppManager extends RMAppManager { + private final Field completedAppsField; + private final RMStateStore stateStore; public TestRMAppManager(RMContext context, Configuration conf) { super(context, null, null, new ApplicationACLsManager(conf), conf); + this.stateStore = context.getStateStore(); + this.completedAppsField = getCompletedAppsField(); } public TestRMAppManager(RMContext context, @@ -208,6 +253,28 @@ public TestRMAppManager(RMContext context, YarnScheduler scheduler, ApplicationMasterService masterService, ApplicationACLsManager applicationACLsManager, Configuration conf) { super(context, scheduler, masterService, applicationACLsManager, conf); + this.stateStore = context.getStateStore(); + this.completedAppsField = getCompletedAppsField(); + } + + private Field getCompletedAppsField() { + final String fieldName = "completedApps"; + try { + Class clazz = getClass(); + while (clazz != Object.class) { + Field[] fields = clazz.getDeclaredFields(); + for (Field f : fields) { + if (f.getName().equals(fieldName)) { + f.setAccessible(true); + return f; + } + } + clazz = clazz.getSuperclass(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + throw new RuntimeException("No field found with name: " + fieldName); } public void checkAppNumCompletedLimit() { @@ -222,10 +289,48 @@ public int getCompletedAppsListSize() { return super.getCompletedAppsListSize(); } - public int getCompletedAppsInStateStore() { + public int getNumberOfCompletedAppsInStateStore() { return this.completedAppsInStateStore; } + List getCompletedApps() { + try { + return (List) completedAppsField.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + Set getFirstNCompletedApps(int n) { + try { + List completedApps = + (List) completedAppsField.get(this); + return completedApps.stream().limit(n).collect(toSet()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + Set getCompletedAppsWithEvenIdsInRange(int n) { + try { + List completedApps = + (List) completedAppsField.get(this); + return completedApps.stream().limit(n) + .filter(app -> app.getId() % 2 == 0).collect(toSet()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + Set getRemovedAppsFromStateStore(int numRemoves) { + ArgumentCaptor argumentCaptor = + ArgumentCaptor.forClass(RMApp.class); + verify(stateStore, times(numRemoves)) + .removeApplication(argumentCaptor.capture()); + return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId) + .collect(toSet()); + } + public void submitApplication( ApplicationSubmissionContext submissionContext, String user) throws YarnException, IOException { @@ -234,10 +339,14 @@ public void submitApplication( } } - protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) { - for (RMApp app : rmContext.getRMApps().values()) { + private void addToCompletedApps(TestRMAppManager appMonitor, + RMContext rmContext) { + // ensure applications are finished in order by their IDs + List sortedApps = new ArrayList<>(rmContext.getRMApps().values()); + sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId())); + for (RMApp app : sortedApps) { if (app.getState() == RMAppState.FINISHED - || app.getState() == RMAppState.KILLED + || app.getState() == RMAppState.KILLED || app.getState() == RMAppState.FAILED) { appMonitor.finishApplication(app.getApplicationId()); } @@ -631,7 +740,8 @@ public void testRMAppRetireZeroSetting() throws Exception { @Test public void testStateStoreAppLimitLessThanMemoryAppLimit() { long now = System.currentTimeMillis(); - RMContext rmContext = mockRMContext(10, now - 20000); + final int allApps = 10; + RMContext rmContext = mockRMContext(allApps, now - 20000); Configuration conf = new YarnConfiguration(); int maxAppsInMemory = 8; int maxAppsInStateStore = 4; @@ -641,39 +751,57 @@ public void testStateStoreAppLimitLessThanMemoryAppLimit() { TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); addToCompletedApps(appMonitor, rmContext); - Assert.assertEquals("Number of completed apps incorrect", 10, + Assert.assertEquals("Number of completed apps incorrect", allApps, appMonitor.getCompletedAppsListSize()); + + int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore; + Set appsShouldBeRemovedFromStateStore = appMonitor + .getFirstNCompletedApps(numRemoveAppsFromStateStore); appMonitor.checkAppNumCompletedLimit(); + Set removedAppsFromStateStore = appMonitor + .getRemovedAppsFromStateStore(numRemoveAppsFromStateStore); + Assert.assertEquals("Number of apps incorrect after # completed check", maxAppsInMemory, rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", maxAppsInMemory, appMonitor.getCompletedAppsListSize()); - int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore; verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore)) .removeApplication(isA(RMApp.class)); Assert.assertEquals(maxAppsInStateStore, - appMonitor.getCompletedAppsInStateStore()); + appMonitor.getNumberOfCompletedAppsInStateStore()); + + List completedApps = appMonitor.getCompletedApps(); + Assert.assertEquals(maxAppsInMemory, completedApps.size()); + Assert.assertEquals(numRemoveAppsFromStateStore, + removedAppsFromStateStore.size()); + Assert.assertEquals(numRemoveAppsFromStateStore, + Sets.intersection(appsShouldBeRemovedFromStateStore, + removedAppsFromStateStore).size()); } @Test - public void testStateStoreAppLimitLargerThanMemoryAppLimit() { + public void testStateStoreAppLimitGreaterThanMemoryAppLimit() { long now = System.currentTimeMillis(); - RMContext rmContext = mockRMContext(10, now - 20000); + final int allApps = 10; + RMContext rmContext = mockRMContext(allApps, now - 20000); Configuration conf = new YarnConfiguration(); int maxAppsInMemory = 8; conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory); - // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS. + // greater than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS. conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000); TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); addToCompletedApps(appMonitor, rmContext); - Assert.assertEquals("Number of completed apps incorrect", 10, + Assert.assertEquals("Number of completed apps incorrect", allApps, appMonitor.getCompletedAppsListSize()); - appMonitor.checkAppNumCompletedLimit(); - int numRemoveApps = 10 - maxAppsInMemory; + int numRemoveApps = allApps - maxAppsInMemory; + Set appsShouldBeRemoved = appMonitor + .getFirstNCompletedApps(numRemoveApps); + appMonitor.checkAppNumCompletedLimit(); + Assert.assertEquals("Number of apps incorrect after # completed check", maxAppsInMemory, rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", @@ -681,7 +809,57 @@ public void testStateStoreAppLimitLargerThanMemoryAppLimit() { verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication( isA(RMApp.class)); Assert.assertEquals(maxAppsInMemory, - appMonitor.getCompletedAppsInStateStore()); + appMonitor.getNumberOfCompletedAppsInStateStore()); + + List completedApps = appMonitor.getCompletedApps(); + Assert.assertEquals(maxAppsInMemory, completedApps.size()); + Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size()); + assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved)); + } + + @Test + public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() { + long now = System.currentTimeMillis(); + final int allApps = 10; + RMContext rmContext = + mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000); + Configuration conf = new YarnConfiguration(); + int maxAppsInMemory = 2; + conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, + maxAppsInMemory); + // greater than maxCompletedAppsInMemory, reset to + // RM_MAX_COMPLETED_APPLICATIONS. + conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, + 1000); + TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); + + addToCompletedApps(appMonitor, rmContext); + Assert.assertEquals("Number of completed apps incorrect", allApps, + appMonitor.getCompletedAppsListSize()); + + int numRemoveApps = allApps - maxAppsInMemory; + int effectiveNumRemoveApps = numRemoveApps / 2; + //only apps with even ID would be deleted due to log aggregation status + int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps; + + Set appsShouldBeRemoved = appMonitor + .getCompletedAppsWithEvenIdsInRange(numRemoveApps); + appMonitor.checkAppNumCompletedLimit(); + + Assert.assertEquals("Number of apps incorrect after # completed check", + expectedNumberOfAppsInMemory, rmContext.getRMApps().size()); + Assert.assertEquals("Number of completed apps incorrect after check", + expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize()); + verify(rmContext.getStateStore(), times(effectiveNumRemoveApps)) + .removeApplication(isA(RMApp.class)); + Assert.assertEquals(expectedNumberOfAppsInMemory, + appMonitor.getNumberOfCompletedAppsInStateStore()); + + List completedApps = appMonitor.getCompletedApps(); + + Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size()); + Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size()); + assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved)); } protected void setupDispatcher(RMContext rmContext, Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 6c6c4b4e803..342dab8d8a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -146,6 +146,17 @@ public String getOriginalTrackingUrl() { public int getMaxAppAttempts() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public boolean isLogAggregationEnabled() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isLogAggregationFinished() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public ApplicationReport createAndGetApplicationReport( String clientUserName,boolean allowAccess) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index ad29d274a4f..32ece346c21 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -70,6 +70,8 @@ int maxAppAttempts = 1; List amReqs; private Set applicationTags = null; + private boolean logAggregationEnabled; + private boolean logAggregationFinished; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -236,6 +238,24 @@ public int getMaxAppAttempts() { return maxAppAttempts; } + @Override + public boolean isLogAggregationEnabled() { + return logAggregationEnabled; + } + + @Override + public boolean isLogAggregationFinished() { + return logAggregationFinished; + } + + public void setLogAggregationEnabled(boolean enabled) { + this.logAggregationEnabled = enabled; + } + + public void setLogAggregationFinished(boolean finished) { + this.logAggregationFinished = finished; + } + public void setNumMaxRetries(int maxAppAttempts) { this.maxAppAttempts = maxAppAttempts; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImplTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImplTest.java new file mode 100644 index 00000000000..d12f00f059f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImplTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.state.StateMachine; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.List; +import java.util.Set; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class RMAppImplTest { + private EventHandler eventHandler; + private RMContext rmContext; + private long testCaseStartTime; + private ApplicationId appId; + private int appIdentifier = 1111; + + private static void setRMAppCurrentState(RMApp rmApp, RMAppState state) { + try { + Field stateMachineField = RMAppImpl.class.getDeclaredField( + "stateMachine"); + stateMachineField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(stateMachineField, + stateMachineField.getModifiers() & ~Modifier.FINAL); + + StateMachine stateMachine = (StateMachine) stateMachineField.get(rmApp); + + Field currentStateField = stateMachine.getClass().getDeclaredField( + "currentState"); + currentStateField.setAccessible(true); + currentStateField.set(stateMachine, state); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object getFieldValue(RMApp rmApp, String fieldName) { + Field f; + try { + f = rmApp.getClass().getDeclaredField(fieldName); + f.setAccessible(true); + return f.get(rmApp); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void setFieldValue(RMApp rmApp, String fieldName, Object value) { + Field f; + try { + f = rmApp.getClass().getDeclaredField(fieldName); + f.setAccessible(true); + f.set(rmApp, value); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + private void recordTestCaseStartTime() { + testCaseStartTime = System.currentTimeMillis(); + } + + private List captureEventHandlerArguments(int expected) { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass( + AbstractEvent.class); + verify(eventHandler, times(expected)).handle(argumentCaptor.capture()); + return argumentCaptor.getAllValues(); + } + + private void verifyAppRemovedEvent(AbstractEvent e) { + if (!(e instanceof AppRemovedSchedulerEvent)) { + fail("First captured event " + e + " should be an instance of " + + "AppRemovedSchedulerEvent"); + } + AppRemovedSchedulerEvent event = (AppRemovedSchedulerEvent) e; + assertEquals(appId, event.getApplicationID()); + assertEquals(RMAppState.FINISHED, event.getFinalState()); + } + + private void verifyAppManagerEvent(AbstractEvent e) { + if (!(e instanceof RMAppManagerEvent)) { + fail("First captured event " + e + " should be an instance of " + + "AppRemovedSchedulerEvent"); + } + RMAppManagerEvent event = (RMAppManagerEvent) e; + assertEquals(appId, event.getApplicationId()); + assertEquals(RMAppManagerEventType.APP_COMPLETED, event.getType()); + } + + private Dispatcher createDispatcher() { + eventHandler = mock(EventHandler.class); + + Dispatcher dispatcher = mock(Dispatcher.class); + when(dispatcher.getEventHandler()).thenReturn(eventHandler); + return dispatcher; + } + + private RMContext createRmContext(Dispatcher dispatcher) { + final RMContext rmContext = mock(RMContext.class); + RMApplicationHistoryWriter mockAppHistoryWriter = mock( + RMApplicationHistoryWriter.class); + SystemMetricsPublisher mockSystemMetricsPublisher = mock( + SystemMetricsPublisher.class); + + when(rmContext.getDispatcher()).thenReturn(dispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn( + mockAppHistoryWriter); + when(rmContext.getSystemMetricsPublisher()).thenReturn( + mockSystemMetricsPublisher); + return rmContext; + } + + private Configuration createConfigWithLogAggregationDisabled() { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + return conf; + } + + private Configuration createConfigWithLogAggregationEnabled() { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + return conf; + } + + private RMAppImpl createRmAppWithState(Configuration conf, RMAppState state) { + long clusterTimestamp = 1234L; + appId = ApplicationId.newInstance(clusterTimestamp, appIdentifier++); + String rmAppName = "app1"; + String user = "testUser"; + String queue = "testQueue"; + ApplicationSubmissionContext mockAppSubmissionContext = mock( + ApplicationSubmissionContext.class); + YarnScheduler scheduler = null; + ApplicationMasterService appMasterService = null; + long submitTime = 12345L; + String appType = "testApplication"; + Set appTags = Sets.newHashSet(); + List amReqs = Lists.newArrayList(); + RMAppImpl rmApp = new RMAppImpl(appId, this.rmContext, conf, rmAppName, + user, queue, mockAppSubmissionContext, scheduler, + appMasterService, submitTime, appType, appTags, amReqs); + setRMAppCurrentState(rmApp, state); + + return rmApp; + } + + private void aggregateLogReportWithAggregationStatus(RMAppImpl rmApp, + LogAggregationStatus status) { + rmApp.aggregateLogReport(NodeId.newInstance("host", 1111), + LogAggregationReport.newInstance(appId, status, "diagMessage")); + } + + @Before + public void setUp() { + recordTestCaseStartTime(); + } + + @Test + public void testFinalTransitionIfLogAggregationDisabled() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + Configuration conf = createConfigWithLogAggregationDisabled(); + RMAppImpl rmApp = createRmAppWithState(conf, RMAppState.FINISHING); + + verifyAppBeforeFinishEvent(rmApp); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.ATTEMPT_FINISHED); + rmApp.handle(event); + + verifyAppAfterFinishEvent(rmApp, 2); + } + + @Test + public void testFinalTransitionIfLogAggregationEnabledAndFinished() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, RMAppState.FINISHING); + setFieldValue(rmApp, "currentAttempt", mock(RMAppAttempt.class)); + + verifyAppBeforeFinishEvent(rmApp); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.ATTEMPT_FINISHED); + rmApp.handle(event); + aggregateLogReportWithAggregationStatus(rmApp, + LogAggregationStatus.SUCCEEDED); + + verifyAppAfterFinishEvent(rmApp, 2); + } + + @Test + public void testFinalTransitionIfLogAggregationEnabledAndNotFinished() { + eventHandler = mock(EventHandler.class); + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, RMAppState.FINISHING); + + verifyAppBeforeFinishEvent(rmApp); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.ATTEMPT_FINISHED); + rmApp.handle(event); + + verifyAppAfterFinishEvent(rmApp, 2); + } + + @Test + public void testLogAggregationDoesNotSendAppCompletedEventIfStateIsNotFinal() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.NEW); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.NEW_SAVING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.SUBMITTED); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.ACCEPTED); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.RUNNING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.FINAL_SAVING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.FINISHING); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.KILLING); + } + + @Test + public void testLogAggregationDoesNotSendAppCompletedEventIfStateIsFinal() { + Dispatcher dispatcher = createDispatcher(); + rmContext = createRmContext(dispatcher); + + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.FINISHED); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.KILLED); + verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState.FAILED); + } + + private void verifyLogAggregationDoesNotSendAppCompletedEvent(RMAppState + rmAppState) { + Configuration conf = createConfigWithLogAggregationEnabled(); + RMAppImpl rmApp = createRmAppWithState(conf, rmAppState); + verifyAppBeforeFinishEvent(rmApp); + aggregateLogReportWithAggregationStatus(rmApp, + LogAggregationStatus.SUCCEEDED); + captureEventHandlerArguments(0); + } + + private void verifyAppBeforeFinishEvent(RMApp rmApp) { + assertEquals(0L, getFieldValue(rmApp, "logAggregationStartTime")); + assertEquals(0L, getFieldValue(rmApp, "finishTime")); + + verifyZeroInteractions(eventHandler); + verify(rmContext, never()).getRMApplicationHistoryWriter(); + verify(rmContext, never()).getSystemMetricsPublisher(); + } + + private void verifyAppAfterFinishEvent(RMAppImpl rmApp, int expectedEvents) { + assertTrue(testCaseStartTime < (long) getFieldValue(rmApp, + "logAggregationStartTime")); + assertTrue(testCaseStartTime < (long) getFieldValue(rmApp, "finishTime")); + + List capturedEvents = captureEventHandlerArguments( + expectedEvents); + assertEquals(expectedEvents, capturedEvents.size()); + if (expectedEvents == 1) { + verifyAppRemovedEvent(capturedEvents.get(0)); + } else if (expectedEvents > 1) { + verifyAppManagerEvent(capturedEvents.get(1)); + } + } + +} \ No newline at end of file