diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 13d5c67ba84..e9de8809500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -23,6 +23,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; @@ -88,6 +94,9 @@ private final Map appToClientMap; + private long timelineClientLingerPeriod; + private ScheduledExecutorService scheduler; + public NMTimelinePublisher(Context context) { super(NMTimelinePublisher.class.getName()); this.context = context; @@ -110,6 +119,11 @@ protected void serviceInit(Configuration conf) throws Exception { if (webAppURLWithoutScheme.contains(":")) { httpPort = webAppURLWithoutScheme.split(":")[1]; } + + timelineClientLingerPeriod = + conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS); + scheduler = Executors.newSingleThreadScheduledExecutor(); super.serviceInit(conf); } @@ -127,6 +141,12 @@ protected void serviceStop() throws Exception { for(ApplicationId app : appToClientMap.keySet()) { stopTimelineClient(app); } + + scheduler.shutdown(); + if (!scheduler + .awaitTermination(timelineClientLingerPeriod, TimeUnit.MILLISECONDS)) { + LOG.warn("Publisher terminated before removing the timeline clients"); + } super.serviceStop(); } @@ -433,11 +453,15 @@ public TimelineV2Client run() throws Exception { } } - public void stopTimelineClient(ApplicationId appId) { - TimelineV2Client client = appToClientMap.remove(appId); - if (client != null) { - client.stop(); - } + public Future stopTimelineClient(ApplicationId appId) { + return scheduler.schedule(new Runnable() { + public void run() { + TimelineV2Client client = appToClientMap.remove(appId); + if (client != null) { + client.stop(); + } + } + }, timelineClientLingerPeriod, TimeUnit.MILLISECONDS); } public void setTimelineServiceAddress(ApplicationId appId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index 43196c7d658..cb34a9cacb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.nodemanager.timelineservice; @@ -26,39 +26,48 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map.Entry; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class TestNMTimelinePublisher { private static final String MEMORY_ID = "MEMORY"; private static final String CPU_ID = "CPU"; - @Test - public void testContainerResourceUsage() { - Context context = mock(Context.class); - @SuppressWarnings("unchecked") - final DummyTimelineClient timelineClient = new DummyTimelineClient(null); - when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + private NMTimelinePublisher publisher; + private DummyTimelineClient timelineClient; + private Configuration conf; - Configuration conf = new Configuration(); + @Before public void setup() throws Exception { + conf = new Configuration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS, + 3000L); + timelineClient = new DummyTimelineClient(null); + Context context = createMockContext(); - NMTimelinePublisher publisher = new NMTimelinePublisher(context) { + publisher = new NMTimelinePublisher(context) { public void createTimelineClient(ApplicationId appId) { if (!getAppToClientMap().containsKey(appId)) { timelineClient.init(getConfig()); @@ -69,12 +78,71 @@ public void createTimelineClient(ApplicationId appId) { }; publisher.init(conf); publisher.start(); + } + + private Context createMockContext() { + Context context = mock(Context.class); + when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + return context; + } + + @After public void tearDown() throws Exception { + if (publisher != null) { + publisher.stop(); + } + if (timelineClient != null) { + timelineClient.stop(); + } + } + + @Test public void testPublishContainerFinish() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 2); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + String diag = "test-diagnostics"; + int exitStatus = 0; + ContainerStatus cStatus = mock(ContainerStatus.class); + when(cStatus.getContainerId()).thenReturn(cId); + when(cStatus.getDiagnostics()).thenReturn(diag); + when(cStatus.getExitStatus()).thenReturn(exitStatus); + long timeStamp = System.currentTimeMillis(); + + ApplicationContainerFinishedEvent finishedEvent = + new ApplicationContainerFinishedEvent(cStatus, timeStamp); + + publisher.createTimelineClient(appId); + publisher.publishApplicationEvent(finishedEvent); + Future future = publisher.stopTimelineClient(appId); + + try { + future.get(); + } catch (Exception e) { + Assert.fail("Expeption thrown while removing timelineclient"); + } + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertTrue(cEntity.equals(entity)); + Assert.assertEquals(diag, + entity.getInfo().get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals(exitStatus, + entity.getInfo().get(ContainerMetricsConstants.EXIT_STATUS_INFO)); + } + + @Test public void testContainerResourceUsage() { ApplicationId appId = ApplicationId.newInstance(0, 1); publisher.createTimelineClient(appId); Container aContainer = mock(Container.class); - when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId( - ApplicationAttemptId.newInstance(appId, 1), - 0L)); + when(aContainer.getContainerId()).thenReturn(ContainerId + .newContainerId(ApplicationAttemptId.newInstance(appId, 1), 0L)); publisher.reportContainerResourceUsage(aContainer, 1024L, 8F); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8); timelineClient.reset(); @@ -91,7 +159,6 @@ public void createTimelineClient(ApplicationId appId) { (float) ResourceCalculatorProcessTree.UNAVAILABLE); verifyPublishedResourceUsageMetrics(timelineClient, 1024L, ResourceCalculatorProcessTree.UNAVAILABLE); - publisher.stop(); } private void verifyPublishedResourceUsageMetrics( @@ -151,8 +218,12 @@ public DummyTimelineClient(ApplicationId appId) { private TimelineEntity[] lastPublishedEntities; - @Override - public void putEntitiesAsync(TimelineEntity... entities) + @Override public void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + this.lastPublishedEntities = entities; + } + + @Override public void putEntities(TimelineEntity... entities) throws IOException, YarnException { this.lastPublishedEntities = entities; }