diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 66f9aab034b..2201b1be9cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -31,6 +35,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; @@ -59,6 +64,8 @@ private final NodeTimelineCollectorManager collectorManager; private long collectorLingerPeriod; private ScheduledExecutorService scheduler; + private Map> appIdToContainerId = + new ConcurrentHashMap<>(); public PerNodeTimelineCollectorsAuxService() { this(new NodeTimelineCollectorManager(true)); @@ -148,6 +155,14 @@ public void initializeContainer(ContainerInitializationContext context) { if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { ApplicationId appId = context.getContainerId(). getApplicationAttemptId().getApplicationId(); + Set masterContainers = appIdToContainerId.get(appId); + if (masterContainers == null) { + masterContainers = new HashSet<>(); + appIdToContainerId.put(appId, masterContainers); + } + synchronized (masterContainers) { + masterContainers.add(context.getContainerId()); + } addApplication(appId, context.getUser()); } } @@ -162,11 +177,26 @@ public void stopContainer(ContainerTerminationContext context) { // intercept the event of the AM container being stopped and remove the app // level collector service if (context.getContainerType() == ContainerType.APPLICATION_MASTER) { + final ContainerId containerId = context.getContainerId(); final ApplicationId appId = - context.getContainerId().getApplicationAttemptId().getApplicationId(); + containerId.getApplicationAttemptId().getApplicationId(); scheduler.schedule(new Runnable() { public void run() { - removeApplication(appId); + Set masterContainers = appIdToContainerId.get(appId); + if (masterContainers == null) { + LOG.info("Stop container for " + containerId + + " is called before initializing container."); + return; + } + // need to synchronize since it is running in thread + synchronized (masterContainers) { + masterContainers.remove(containerId); + if (masterContainers.size() == 0) { + // remove only if it is last master container + removeApplication(appId); + appIdToContainerId.remove(appId); + } + } } }, collectorLingerPeriod, TimeUnit.MILLISECONDS); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index cb9ced09309..ca82f1c3ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -53,10 +53,10 @@ private ApplicationAttemptId appAttemptId; private PerNodeTimelineCollectorsAuxService auxService; private Configuration conf; + private ApplicationId appId; public TestPerNodeTimelineCollectorsAuxService() { - ApplicationId appId = - ApplicationId.newInstance(System.currentTimeMillis(), 1); + appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); appAttemptId = ApplicationAttemptId.newInstance(appId, 1); conf = new YarnConfiguration(); // enable timeline service v.2 @@ -123,6 +123,53 @@ public void testRemoveApplication() throws Exception { } @Test + public void testRemoveApplicationWhenSecondAttemptAMCotainerIsLaunchedSameNode() throws Exception { + // add first attempt collector + auxService = createCollectorAndAddApplication(); + // auxService should have a single app + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); + + // add second attempt collector before first attempt master container stop + ContainerInitializationContext containerInitalizationContext = + createContainerInitalizationContext(2); + auxService.initializeContainer(containerInitalizationContext); + + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); + + // first attempt stop container + ContainerTerminationContext context = createContainerTerminationContext(1); + auxService.stopContainer(context); + + // auxService should have the app's collector and need to remove only after + // a configured period + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); + for (int i = 0; i < 4; i++) { + Thread.sleep(500L); + if (!auxService.hasApplication(appAttemptId.getApplicationId())) { + break; + } + } + // collector should not be removed since 2nd attempt is running this node + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); + + // second attempt stop container + context = createContainerTerminationContext(2); + auxService.stopContainer(context); + + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); + for (int i = 0; i < 4; i++) { + Thread.sleep(500L); + if (!auxService.hasApplication(appAttemptId.getApplicationId())) { + break; + } + } + + // auxService should not have that app + assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); + auxService.close(); + } + + @Test public void testRemoveApplicationNonAMContainer() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app @@ -152,18 +199,37 @@ public void testLaunch() throws Exception { } } - private PerNodeTimelineCollectorsAuxService - createCollectorAndAddApplication() { + private PerNodeTimelineCollectorsAuxService createCollectorAndAddApplication() { PerNodeTimelineCollectorsAuxService service = createCollector(); + ContainerInitializationContext context = + createContainerInitalizationContext(1); + service.initializeContainer(context); + return service; + } + + ContainerInitializationContext createContainerInitalizationContext( + int attempt) { + appAttemptId = ApplicationAttemptId.newInstance(appId, attempt); // create an AM container ContainerId containerId = getAMContainerId(); ContainerInitializationContext context = mock(ContainerInitializationContext.class); when(context.getContainerId()).thenReturn(containerId); - when(context.getContainerType()).thenReturn( - ContainerType.APPLICATION_MASTER); - service.initializeContainer(context); - return service; + when(context.getContainerType()) + .thenReturn(ContainerType.APPLICATION_MASTER); + return context; + } + + ContainerTerminationContext createContainerTerminationContext(int attempt) { + appAttemptId = ApplicationAttemptId.newInstance(appId, attempt); + // create an AM container + ContainerId containerId = getAMContainerId(); + ContainerTerminationContext context = + mock(ContainerTerminationContext.class); + when(context.getContainerId()).thenReturn(containerId); + when(context.getContainerType()) + .thenReturn(ContainerType.APPLICATION_MASTER); + return context; } private PerNodeTimelineCollectorsAuxService createCollector() {