diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 13d5c67ba84..b29420674f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -23,7 +23,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +93,9 @@ private final Map appToClientMap; + private long timelineClientLingerPeriod; + private ScheduledExecutorService scheduler; + public NMTimelinePublisher(Context context) { super(NMTimelinePublisher.class.getName()); this.context = context; @@ -110,6 +118,11 @@ protected void serviceInit(Configuration conf) throws Exception { if (webAppURLWithoutScheme.contains(":")) { httpPort = webAppURLWithoutScheme.split(":")[1]; } + + timelineClientLingerPeriod = + conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS); + scheduler = Executors.newSingleThreadScheduledExecutor(); super.serviceInit(conf); } @@ -127,6 +140,12 @@ protected void serviceStop() throws Exception { for(ApplicationId app : appToClientMap.keySet()) { stopTimelineClient(app); } + scheduler.shutdown(); + if (!scheduler + .awaitTermination(timelineClientLingerPeriod, TimeUnit.MILLISECONDS)) { + LOG.warn( + "Publisher terminated before removing the timeline clients"); + } super.serviceStop(); } @@ -434,10 +453,7 @@ public TimelineV2Client run() throws Exception { } public void stopTimelineClient(ApplicationId appId) { - TimelineV2Client client = appToClientMap.remove(appId); - if (client != null) { - client.stop(); - } + removeTimelineClients(appId); } public void setTimelineServiceAddress(ApplicationId appId, @@ -451,4 +467,15 @@ public void setTimelineServiceAddress(ApplicationId appId, private TimelineV2Client getTimelineClient(ApplicationId appId) { return appToClientMap.get(appId); } + + protected Future removeTimelineClients(final ApplicationId appId) { + return scheduler.schedule(new Runnable() { + public void run() { + TimelineV2Client client = appToClientMap.remove(appId); + if (client != null) { + client.stop(); + } + } + }, timelineClientLingerPeriod, TimeUnit.MILLISECONDS); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index 43196c7d658..14e0eac6f48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -57,6 +57,8 @@ public void testContainerResourceUsage() { Configuration conf = new Configuration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + 1000L); NMTimelinePublisher publisher = new NMTimelinePublisher(context) { public void createTimelineClient(ApplicationId appId) {