diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 8cdcaa8..c825871 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -1229,7 +1229,7 @@ private void assignMapsWithLocality(List allocatedContainers) { Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); - // "if (maps.containsKey(tId))" below should be almost always true. + // "if (maps.containsTimelineCollector(tId))" below should be almost always true. // hence this while loop would almost always have O(1) complexity String host = allocated.getNodeId().getHost(); LinkedList list = mapsHostMapping.get(host); @@ -1261,7 +1261,7 @@ private void assignMapsWithLocality(List allocatedContainers) { Container allocated = it.next(); Priority priority = allocated.getPriority(); assert PRIORITY_MAP.equals(priority); - // "if (maps.containsKey(tId))" below should be almost always true. + // "if (maps.containsTimelineCollector(tId))" below should be almost always true. // hence this while loop would almost always have O(1) complexity String host = allocated.getNodeId().getHost(); String rack = RackResolver.resolve(host).getNetworkLocation(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index cbb0a8b..9011500 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -95,7 +95,7 @@ private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; - private RMTimelineCollector timelineCollector; + private RMTimelineCollectorManager timelineCollectorManager; private RMNodeLabelsManager nodeLabelManager; private long epoch; @@ -379,14 +379,15 @@ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { @Private @Unstable - public RMTimelineCollector getRMTimelineCollector() { - return timelineCollector; + public RMTimelineCollectorManager getRMTimelineCollectorManager() { + return timelineCollectorManager; } @Private @Unstable - public void setRMTimelineCollector(RMTimelineCollector timelineCollector) { - this.timelineCollector = timelineCollector; + public void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager) { + this.timelineCollectorManager = timelineCollectorManager; } @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 9197630..9df6819 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -252,6 +252,8 @@ protected synchronized void checkAppNumCompletedLimit() { ApplicationId removeId = completedApps.get(completedApps.size() - completedAppsInStateStore); RMApp removeApp = rmContext.getRMApps().get(removeId); + // Stop the timeline collector before removing the app + ((RMAppImpl) removeApp).stopTimelineCollector(); LOG.info("Max number of completed apps kept in state store met:" + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore + ", removing app " + removeApp.getApplicationId() @@ -334,7 +336,6 @@ private RMAppImpl createAndPopulateNewRMApp( submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), submissionContext.getApplicationTags(), amReq); - // Concurrent app submissions with same applicationId will fail here // Concurrent app submissions with different applicationIds will not // influence each other @@ -345,6 +346,8 @@ private RMAppImpl createAndPopulateNewRMApp( LOG.warn(message); throw RPCUtil.getRemoteException(message); } + // Start timeline collector for the submitted app + application.startTimelineCollector(); // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b96601c..6f36835 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; /** * Context of the ResourceManager. @@ -110,9 +110,10 @@ void setRMApplicationHistoryWriter( SystemMetricsPublisher getSystemMetricsPublisher(); - void setRMTimelineCollector(RMTimelineCollector timelineCollector); + void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager); - RMTimelineCollector getRMTimelineCollector(); + RMTimelineCollectorManager getRMTimelineCollectorManager(); ConfigurationProvider getConfigurationProvider(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 531d4c5..85ecbf9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; @@ -355,14 +355,14 @@ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { } @Override - public void setRMTimelineCollector( - RMTimelineCollector timelineCollector) { - activeServiceContext.setRMTimelineCollector(timelineCollector); + public void setRMTimelineCollectorManager( + RMTimelineCollectorManager timelineCollectorManager) { + activeServiceContext.setRMTimelineCollectorManager(timelineCollectorManager); } @Override - public RMTimelineCollector getRMTimelineCollector() { - return activeServiceContext.getRMTimelineCollector(); + public RMTimelineCollectorManager getRMTimelineCollectorManager() { + return activeServiceContext.getRMTimelineCollectorManager(); } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b993ede..059bc07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -97,11 +97,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -356,8 +356,8 @@ protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { return new RMApplicationHistoryWriter(); } - private RMTimelineCollector createRMTimelineCollector() { - return new RMTimelineCollector(); + private RMTimelineCollectorManager createRMTimelineCollectorManager() { + return new RMTimelineCollectorManager(rmContext); } protected SystemMetricsPublisher createSystemMetricsPublisher() { @@ -482,10 +482,10 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); - RMTimelineCollector timelineCollector = - createRMTimelineCollector(); - addService(timelineCollector); - rmContext.setRMTimelineCollector(timelineCollector); + RMTimelineCollectorManager timelineCollectorManager = + createRMTimelineCollectorManager(); + addService(timelineCollectorManager); + rmContext.setRMTimelineCollectorManager(timelineCollectorManager); // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 61c5748..c86e69f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -432,6 +434,17 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, rmContext.getSystemMetricsPublisher().appCreated(this, startTime); } + public void startTimelineCollector() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(applicationId); + rmContext.getRMTimelineCollectorManager().putIfAbsent( + applicationId, collector); + } + + public void stopTimelineCollector() { + rmContext.getRMTimelineCollectorManager().remove(applicationId); + } + @Override public ApplicationId getApplicationId() { return this.applicationId; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java deleted file mode 100644 index 4ea7a03..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; - -/** - * This class is responsible for posting application and appattempt lifecycle - * related events to timeline service V2 - */ -@Private -@Unstable -public class RMTimelineCollector extends TimelineCollector { - private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class); - - public RMTimelineCollector() { - super("Resource Manager TimelineCollector"); - } - - private Dispatcher dispatcher; - - private boolean publishSystemMetricsForV2; - - @Override - protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetricsForV2 = - conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetricsForV2) { - // having separate dispatcher to avoid load on RMDispatcher - LOG.info("RMTimelineCollector has been configured to publish" - + " System Metrics in ATS V2"); - dispatcher = new AsyncDispatcher(); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - } else { - LOG.warn("RMTimelineCollector has not been configured to publish" - + " System Metrics in ATS V2"); - } - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - super.serviceStop(); - } - - protected void handleSystemMetricsEvent(SystemMetricsEvent event) { - switch (event.getType()) { - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } - } - - @Override - protected TimelineCollectorContext getTimelineEntityContext() { - // TODO address in YARN-3390. - return null; - } - - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler { - - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java new file mode 100644 index 0000000..16db1e3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RMTimelineCollectorManager extends TimelineCollectorManager { + private RMContext rmContext; + + public RMTimelineCollectorManager(RMContext rmContext) { + super(RMTimelineCollectorManager.class.getName()); + this.rmContext = rmContext; + } + + @Override + public TimelineCollector putIfAbsent(ApplicationId appId, + TimelineCollector collector) { + TimelineCollector collectorInTable = super.putIfAbsent(appId, collector); + if (collectorInTable == collector) { + updateTimelineCollectorContext(appId, collector); + } + return collectorInTable; + } + + private void updateTimelineCollectorContext( + ApplicationId appId, TimelineCollector collector) { + RMApp app = rmContext.getRMApps().get(appId); + if (app == null) { + throw new YarnRuntimeException( + "Unable to get the timeline collector context info for a non-existing app " + + appId); + } + String userId = app.getUser(); + if (userId != null && !userId.isEmpty()) { + collector.getTimelineEntityContext().setUserId(userId); + } + for (String tag : app.getApplicationTags()) { + String value = null; + if ((value = getFlowContext(TimelineUtils.FLOW_NAME_TAG_PREFIX, tag)) != null + && !value.isEmpty()) { + collector.getTimelineEntityContext().setFlowName(value); + } else if ((value = getFlowContext(TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag)) != null + && !value.isEmpty()) { + collector.getTimelineEntityContext().setFlowVersion(value); + } else if ((value = getFlowContext(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag)) != null + && !value.isEmpty()) { + collector.getTimelineEntityContext().setFlowRunId(Long.valueOf(value)); + } + } + } + + private static String getFlowContext(String tagPrefix, String tag) { + if (tag.startsWith(tagPrefix + ":") || + tag.startsWith(tagPrefix.toLowerCase() + ":")) { + return tag.substring(tagPrefix.length() + 1); + } + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 54c806c..429bcab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; +import org.apache.hadoop.yarn.server.timelineservice.collector.NMTimelineCollectorManager; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,13 +42,13 @@ import java.io.IOException; public class TestTimelineServiceClientIntegration { - private static TimelineCollectorManager collectorManager; + private static NMTimelineCollectorManager collectorManager; private static PerNodeTimelineCollectorsAuxService auxService; @BeforeClass public static void setupClass() throws Exception { try { - collectorManager = new MyTimelineCollectorManager(); + collectorManager = new MyNMTimelineCollectorManager(); auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], collectorManager); @@ -85,9 +85,9 @@ public void testPutEntities() throws Exception { } } - private static class MyTimelineCollectorManager extends - TimelineCollectorManager { - public MyTimelineCollectorManager() { + private static class MyNMTimelineCollectorManager extends + NMTimelineCollectorManager { + public MyNMTimelineCollectorManager() { super(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 5bc70e3..fa32211 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -75,7 +75,7 @@ protected void serviceStop() throws Exception { } @Override - protected TimelineCollectorContext getTimelineEntityContext() { + public TimelineCollectorContext getTimelineEntityContext() { return context; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NMTimelineCollectorManager.java new file mode 100644 index 0000000..43151b6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NMTimelineCollectorManager.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; + + +/** + * + * It is a singleton, and instances should be obtained via + * {@link #getInstance()}. + * + */ +@Private +@Unstable +public class NMTimelineCollectorManager extends TimelineCollectorManager { + private static final Log LOG = + LogFactory.getLog(NMTimelineCollectorManager.class); + private static final NMTimelineCollectorManager INSTANCE = + new NMTimelineCollectorManager(); + + + // REST server for this collector manager + private HttpServer2 timelineRestServer; + + private String timelineRestServerBindAddress; + + private CollectorNodemanagerProtocol nmCollectorService; + + private InetSocketAddress nmCollectorServiceAddress; + + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; + + static NMTimelineCollectorManager getInstance() { + return INSTANCE; + } + + @VisibleForTesting + protected NMTimelineCollectorManager() { + super(NMTimelineCollectorManager.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + this.nmCollectorServiceAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + nmCollectorService = getNMCollectorService(); + startWebApp(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (timelineRestServer != null) { + timelineRestServer.stop(); + } + super.serviceStop(); + } + + @Override + public TimelineCollector putIfAbsent(ApplicationId appId, + TimelineCollector collector) { + TimelineCollector collectorInTable = super.putIfAbsent(appId, collector); + if (collectorInTable == collector) { + try { + updateTimelineCollectorContext(appId, collector); + // Report to NM if a new collector is added. + reportNewCollectorToNM(appId); + } catch (YarnException | IOException e) { + // throw exception here as it cannot be used if failed communicate with NM + LOG.error("Failed to communicate with NM Collector Service for " + appId); + throw new YarnRuntimeException(e); + } + } + return collectorInTable; + } + + /** + * Launch the REST web server for this collector manager + */ + private void startWebApp() { + Configuration conf = getConfig(); + String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0"; + try { + Configuration confForInfoServer = new Configuration(conf); + confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("timeline") + .setConf(conf) + .addEndpoint(URI.create( + (YarnConfiguration.useHttps(conf) ? "https://" : "http://") + + bindAddress)); + timelineRestServer = builder.build(); + // TODO: replace this by an authentication filter in future. + HashMap options = new HashMap<>(); + String username = conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); + options.put(HADOOP_HTTP_STATIC_USER, username); + HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), + "static_user_filter_timeline", + StaticUserWebFilter.StaticUserFilter.class.getName(), + options, new String[] {"/*"}); + + timelineRestServer.addJerseyResourcePackage( + TimelineCollectorWebService.class.getPackage().getName() + ";" + + GenericExceptionHandler.class.getPackage().getName() + ";" + + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + "/*"); + timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this); + timelineRestServer.start(); + } catch (Exception e) { + String msg = "The per-node collector webapp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + //TODO: We need to think of the case of multiple interfaces + this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( + timelineRestServer.getConnectorAddress(0)); + LOG.info("Instantiated the per-node collector webapp at " + + timelineRestServerBindAddress); + } + + private void reportNewCollectorToNM(ApplicationId appId) + throws YarnException, IOException { + ReportNewCollectorInfoRequest request = + ReportNewCollectorInfoRequest.newInstance(appId, + this.timelineRestServerBindAddress); + LOG.info("Report a new collector for application: " + appId + + " to the NM Collector Service."); + nmCollectorService.reportNewCollectorInfo(request); + } + + private void updateTimelineCollectorContext( + ApplicationId appId, TimelineCollector collector) + throws YarnException, IOException { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance(appId); + LOG.info("Get timeline collector context for " + appId); + GetTimelineCollectorContextResponse response = + nmCollectorService.getTimelineCollectorContext(request); + String userId = response.getUserId(); + if (userId != null && !userId.isEmpty()) { + collector.getTimelineEntityContext().setUserId(userId); + } + String flowName = response.getFlowName(); + if (flowName != null && !flowName.isEmpty()) { + collector.getTimelineEntityContext().setFlowName(flowName); + } + String flowVersion = response.getFlowVersion(); + if (flowVersion != null && !flowVersion.isEmpty()) { + collector.getTimelineEntityContext().setFlowVersion(flowVersion); + } + long flowRunId = response.getFlowRunId(); + if (flowRunId != 0L) { + collector.getTimelineEntityContext().setFlowRunId(flowRunId); + } + } + + @VisibleForTesting + protected CollectorNodemanagerProtocol getNMCollectorService() { + Configuration conf = getConfig(); + final YarnRPC rpc = YarnRPC.create(conf); + + // TODO Security settings. + return (CollectorNodemanagerProtocol) rpc.getProxy( + CollectorNodemanagerProtocol.class, + nmCollectorServiceAddress, conf); + } + + @VisibleForTesting + public String getRestServerBindAddress() { + return timelineRestServerBindAddress; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 2017d01..9e4c70a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -53,15 +53,15 @@ LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class); private static final int SHUTDOWN_HOOK_PRIORITY = 30; - private final TimelineCollectorManager collectorManager; + private final NMTimelineCollectorManager collectorManager; public PerNodeTimelineCollectorsAuxService() { // use the same singleton - this(TimelineCollectorManager.getInstance()); + this(NMTimelineCollectorManager.getInstance()); } @VisibleForTesting PerNodeTimelineCollectorsAuxService( - TimelineCollectorManager collectorsManager) { + NMTimelineCollectorManager collectorsManager) { super("timeline_collector"); this.collectorManager = collectorsManager; } @@ -108,8 +108,7 @@ public boolean addApplication(ApplicationId appId) { * @return whether it was removed successfully */ public boolean removeApplication(ApplicationId appId) { - String appIdString = appId.toString(); - return collectorManager.remove(appIdString); + return collectorManager.remove(appId); } /** @@ -153,8 +152,8 @@ private boolean isApplicationMaster(ContainerContext context) { } @VisibleForTesting - boolean hasApplication(String appId) { - return collectorManager.containsKey(appId); + boolean hasApplication(ApplicationId appId) { + return collectorManager.containsTimelineCollector(appId); } @Override @@ -174,7 +173,7 @@ public ByteBuffer getMetaData() { @VisibleForTesting public static PerNodeTimelineCollectorsAuxService - launchServer(String[] args, TimelineCollectorManager collectorManager) { + launchServer(String[] args, NMTimelineCollectorManager collectorManager) { Thread .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index f1d3d72..4eced5b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -124,6 +124,6 @@ public void putEntitiesAsync(TimelineEntities entities, } } - protected abstract TimelineCollectorContext getTimelineEntityContext(); + public abstract TimelineCollectorContext getTimelineEntityContext(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 9a566a2..2c003bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -18,150 +18,69 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; -import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; -import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; -import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; -import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * Class that manages adding and removing collectors and their lifecycle. It * provides thread safety access to the collectors inside. * - * It is a singleton, and instances should be obtained via - * {@link #getInstance()}. */ -@Private -@Unstable -public class TimelineCollectorManager extends CompositeService { +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class TimelineCollectorManager extends AbstractService { private static final Log LOG = LogFactory.getLog(TimelineCollectorManager.class); - private static final TimelineCollectorManager INSTANCE = - new TimelineCollectorManager(); // access to this map is synchronized with the map itself - private final Map collectors = + private final Map collectors = Collections.synchronizedMap( - new HashMap()); - - // REST server for this collector manager - private HttpServer2 timelineRestServer; - - private String timelineRestServerBindAddress; - - private CollectorNodemanagerProtocol nmCollectorService; - - private InetSocketAddress nmCollectorServiceAddress; + new HashMap()); - static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; - - static TimelineCollectorManager getInstance() { - return INSTANCE; - } - - @VisibleForTesting - protected TimelineCollectorManager() { - super(TimelineCollectorManager.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - this.nmCollectorServiceAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); - - } - - @Override - protected void serviceStart() throws Exception { - nmCollectorService = getNMCollectorService(); - startWebApp(); - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - if (timelineRestServer != null) { - timelineRestServer.stop(); - } - super.serviceStop(); + protected TimelineCollectorManager(String name) { + super(name); } /** * Put the collector into the collection if an collector mapped by id does * not exist. * - * @throws YarnRuntimeException if there was any exception in initializing and - * starting the app level service + * @throws YarnRuntimeException if there was any exception in initializing + * and starting the app level service * @return the collector associated with id after the potential put. */ public TimelineCollector putIfAbsent(ApplicationId appId, TimelineCollector collector) { - String id = appId.toString(); TimelineCollector collectorInTable; - boolean collectorIsNew = false; synchronized (collectors) { - collectorInTable = collectors.get(id); + collectorInTable = collectors.get(appId); if (collectorInTable == null) { try { // initialize, start, and add it to the collection so it can be // cleaned up when the parent shuts down collector.init(getConfig()); collector.start(); - collectors.put(id, collector); - LOG.info("the collector for " + id + " was added"); + collectors.put(appId, collector); + LOG.info("the collector for " + appId + " was added"); collectorInTable = collector; - collectorIsNew = true; } catch (Exception e) { throw new YarnRuntimeException(e); } } else { - String msg = "the collector for " + id + " already exists!"; + String msg = "the collector for " + appId + " already exists!"; LOG.error(msg); } - - } - // Report to NM if a new collector is added. - if (collectorIsNew) { - try { - updateTimelineCollectorContext(appId, collector); - reportNewCollectorToNM(appId); - } catch (Exception e) { - // throw exception here as it cannot be used if failed communicate with NM - LOG.error("Failed to communicate with NM Collector Service for " + appId); - throw new YarnRuntimeException(e); - } } - return collectorInTable; } @@ -171,17 +90,17 @@ public TimelineCollector putIfAbsent(ApplicationId appId, * * @return whether it was removed successfully */ - public boolean remove(String id) { + public boolean remove(ApplicationId appId) { synchronized (collectors) { - TimelineCollector collector = collectors.remove(id); + TimelineCollector collector = collectors.remove(appId); if (collector == null) { - String msg = "the collector for " + id + " does not exist!"; + String msg = "the collector for " + appId + " does not exist!"; LOG.error(msg); return false; } else { // stop the service to do clean up collector.stop(); - LOG.info("the collector service for " + id + " was removed"); + LOG.info("the collector service for " + appId + " was removed"); return true; } } @@ -192,113 +111,16 @@ public boolean remove(String id) { * * @return the collector or null if it does not exist */ - public TimelineCollector get(String id) { - return collectors.get(id); + public TimelineCollector get(ApplicationId appId) { + return collectors.get(appId); } /** * Returns whether the collector for the specified id exists in this * collection. */ - public boolean containsKey(String id) { - return collectors.containsKey(id); - } - - /** - * Launch the REST web server for this collector manager - */ - private void startWebApp() { - Configuration conf = getConfig(); - String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0"; - try { - Configuration confForInfoServer = new Configuration(conf); - confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10); - HttpServer2.Builder builder = new HttpServer2.Builder() - .setName("timeline") - .setConf(conf) - .addEndpoint(URI.create( - (YarnConfiguration.useHttps(conf) ? "https://" : "http://") + - bindAddress)); - timelineRestServer = builder.build(); - // TODO: replace this by an authentication filter in future. - HashMap options = new HashMap<>(); - String username = conf.get(HADOOP_HTTP_STATIC_USER, - DEFAULT_HADOOP_HTTP_STATIC_USER); - options.put(HADOOP_HTTP_STATIC_USER, username); - HttpServer2.defineFilter(timelineRestServer.getWebAppContext(), - "static_user_filter_timeline", - StaticUserWebFilter.StaticUserFilter.class.getName(), - options, new String[] {"/*"}); - - timelineRestServer.addJerseyResourcePackage( - TimelineCollectorWebService.class.getPackage().getName() + ";" - + GenericExceptionHandler.class.getPackage().getName() + ";" - + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), - "/*"); - timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this); - timelineRestServer.start(); - } catch (Exception e) { - String msg = "The per-node collector webapp failed to start."; - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - //TODO: We need to think of the case of multiple interfaces - this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress( - timelineRestServer.getConnectorAddress(0)); - LOG.info("Instantiated the per-node collector webapp at " + - timelineRestServerBindAddress); - } - - private void reportNewCollectorToNM(ApplicationId appId) - throws YarnException, IOException { - ReportNewCollectorInfoRequest request = - ReportNewCollectorInfoRequest.newInstance(appId, - this.timelineRestServerBindAddress); - LOG.info("Report a new collector for application: " + appId + - " to the NM Collector Service."); - nmCollectorService.reportNewCollectorInfo(request); + public boolean containsTimelineCollector(ApplicationId appId) { + return collectors.containsKey(appId); } - private void updateTimelineCollectorContext( - ApplicationId appId, TimelineCollector collector) - throws YarnException, IOException { - GetTimelineCollectorContextRequest request = - GetTimelineCollectorContextRequest.newInstance(appId); - LOG.info("Get timeline collector context for " + appId); - GetTimelineCollectorContextResponse response = - nmCollectorService.getTimelineCollectorContext(request); - String userId = response.getUserId(); - if (userId != null && !userId.isEmpty()) { - collector.getTimelineEntityContext().setUserId(userId); - } - String flowName = response.getFlowName(); - if (flowName != null && !flowName.isEmpty()) { - collector.getTimelineEntityContext().setFlowName(flowName); - } - String flowVersion = response.getFlowVersion(); - if (flowVersion != null && !flowVersion.isEmpty()) { - collector.getTimelineEntityContext().setFlowVersion(flowVersion); - } - long flowRunId = response.getFlowRunId(); - if (flowRunId != 0L) { - collector.getTimelineEntityContext().setFlowRunId(flowRunId); - } - } - - @VisibleForTesting - protected CollectorNodemanagerProtocol getNMCollectorService() { - Configuration conf = getConfig(); - final YarnRPC rpc = YarnRPC.create(conf); - - // TODO Security settings. - return (CollectorNodemanagerProtocol) rpc.getProxy( - CollectorNodemanagerProtocol.class, - nmCollectorServiceAddress, conf); - } - - @VisibleForTesting - public String getRestServerBindAddress() { - return timelineRestServerBindAddress; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java index edec0d3..7343c64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java @@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.ForbiddenException; @@ -129,11 +130,14 @@ public Response putEntities( boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); try { - appId = parseApplicationId(appId); - if (appId == null) { + ApplicationId appID = parseApplicationId(appId); + if (appID == null) { return Response.status(Response.Status.BAD_REQUEST).build(); } - TimelineCollector collector = getCollector(req, appId); + NMTimelineCollectorManager collectorManager = + (NMTimelineCollectorManager) context.getAttribute( + NMTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY); + TimelineCollector collector = collectorManager.get(appID); if (collector == null) { LOG.error("Application: "+ appId + " is not found"); throw new NotFoundException(); // different exception? @@ -147,10 +151,10 @@ public Response putEntities( } } - private String parseApplicationId(String appId) { + private ApplicationId parseApplicationId(String appId) { try { if (appId != null) { - return ConverterUtils.toApplicationId(appId.trim()).toString(); + return ConverterUtils.toApplicationId(appId.trim()); } else { return null; } @@ -159,15 +163,6 @@ private String parseApplicationId(String appId) { } } - private TimelineCollector - getCollector(HttpServletRequest req, String appIdToParse) { - String appIdString = parseApplicationId(appIdToParse); - final TimelineCollectorManager collectorManager = - (TimelineCollectorManager) context.getAttribute( - TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY); - return collectorManager.get(appIdString); - } - private void init(HttpServletResponse response) { response.setContentType(null); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java new file mode 100644 index 0000000..cbc72fa --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java @@ -0,0 +1,160 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNMTimelineCollectorManager { + private NMTimelineCollectorManager collectorManager; + + @Before + public void setup() throws Exception { + collectorManager = createCollectorManager(); + collectorManager.init(new YarnConfiguration()); + collectorManager.start(); + } + + @After + public void tearDown() throws Exception { + if (collectorManager != null) { + collectorManager.stop(); + } + } + + @Test + public void testStartWebApp() throws Exception { + assertNotNull(collectorManager.getRestServerBindAddress()); + String address = collectorManager.getRestServerBindAddress(); + String[] parts = address.split(":"); + assertEquals(2, parts.length); + assertNotNull(parts[0]); + assertTrue(Integer.valueOf(parts[1]) > 0); + } + + @Test(timeout=60000) + public void testMultithreadedAdd() throws Exception { + final int NUM_APPS = 5; + List> tasks = new ArrayList>(); + for (int i = 0; i < NUM_APPS; i++) { + final ApplicationId appId = ApplicationId.newInstance(0L, i); + Callable task = new Callable() { + public Boolean call() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + return (collectorManager.putIfAbsent(appId, collector) == collector); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List> futures = executor.invokeAll(tasks); + for (Future future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + final ApplicationId appId = ApplicationId.newInstance(0L, i); + assertTrue(collectorManager.containsTimelineCollector(appId)); + } + } + + @Test + public void testMultithreadedAddAndRemove() throws Exception { + final int NUM_APPS = 5; + List> tasks = new ArrayList>(); + for (int i = 0; i < NUM_APPS; i++) { + final ApplicationId appId = ApplicationId.newInstance(0L, i); + Callable task = new Callable() { + public Boolean call() { + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + boolean successPut = + (collectorManager.putIfAbsent(appId, collector) == collector); + return successPut && collectorManager.remove(appId); + } + }; + tasks.add(task); + } + ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); + try { + List> futures = executor.invokeAll(tasks); + for (Future future: futures) { + assertTrue(future.get()); + } + } finally { + executor.shutdownNow(); + } + // check the keys + for (int i = 0; i < NUM_APPS; i++) { + final ApplicationId appId = ApplicationId.newInstance(0L, i); + assertFalse(collectorManager.containsTimelineCollector(appId)); + } + } + + private NMTimelineCollectorManager createCollectorManager() { + final NMTimelineCollectorManager collectorManager = + spy(new NMTimelineCollectorManager()); + doReturn(new Configuration()).when(collectorManager).getConfig(); + CollectorNodemanagerProtocol nmCollectorService = + mock(CollectorNodemanagerProtocol.class); + GetTimelineCollectorContextResponse response = + GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L); + try { + when(nmCollectorService.getTimelineCollectorContext(any( + GetTimelineCollectorContextRequest.class))).thenReturn(response); + } catch (YarnException | IOException e) { + fail(); + } + doReturn(nmCollectorService).when(collectorManager).getNMCollectorService(); + return collectorManager; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java index abbe13a..43584cc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java @@ -67,8 +67,7 @@ public void tearDown() throws Shell.ExitCodeException { public void testAddApplication() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app - assertTrue(auxService.hasApplication( - appAttemptId.getApplicationId().toString())); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close(); } @@ -82,16 +81,14 @@ public void testAddApplicationNonAMContainer() throws Exception { when(context.getContainerId()).thenReturn(containerId); auxService.initializeContainer(context); // auxService should not have that app - assertFalse(auxService.hasApplication( - appAttemptId.getApplicationId().toString())); + assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); } @Test public void testRemoveApplication() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(auxService.hasApplication(appIdStr)); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); ContainerId containerId = getAMContainerId(); ContainerTerminationContext context = @@ -99,7 +96,7 @@ public void testRemoveApplication() throws Exception { when(context.getContainerId()).thenReturn(containerId); auxService.stopContainer(context); // auxService should not have that app - assertFalse(auxService.hasApplication(appIdStr)); + assertFalse(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close(); } @@ -107,8 +104,7 @@ public void testRemoveApplication() throws Exception { public void testRemoveApplicationNonAMContainer() throws Exception { auxService = createCollectorAndAddApplication(); // auxService should have a single app - String appIdStr = appAttemptId.getApplicationId().toString(); - assertTrue(auxService.hasApplication(appIdStr)); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); ContainerId containerId = getContainerId(2L); // not an AM ContainerTerminationContext context = @@ -116,7 +112,7 @@ public void testRemoveApplicationNonAMContainer() throws Exception { when(context.getContainerId()).thenReturn(containerId); auxService.stopContainer(context); // auxService should still have that app - assertTrue(auxService.hasApplication(appIdStr)); + assertTrue(auxService.hasApplication(appAttemptId.getApplicationId())); auxService.close(); } @@ -147,7 +143,7 @@ public void testLaunch() throws Exception { } private PerNodeTimelineCollectorsAuxService createCollector() { - TimelineCollectorManager collectorManager = createCollectorManager(); + NMTimelineCollectorManager collectorManager = createCollectorManager(); PerNodeTimelineCollectorsAuxService auxService = spy(new PerNodeTimelineCollectorsAuxService(collectorManager)); auxService.init(new YarnConfiguration()); @@ -155,9 +151,9 @@ private PerNodeTimelineCollectorsAuxService createCollector() { return auxService; } - private TimelineCollectorManager createCollectorManager() { - TimelineCollectorManager collectorManager = - spy(new TimelineCollectorManager()); + private NMTimelineCollectorManager createCollectorManager() { + NMTimelineCollectorManager collectorManager = + spy(new NMTimelineCollectorManager()); doReturn(new Configuration()).when(collectorManager).getConfig(); CollectorNodemanagerProtocol nmCollectorService = mock(CollectorNodemanagerProtocol.class); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java deleted file mode 100644 index c662998..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java +++ /dev/null @@ -1,160 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.collector; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; -import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; -import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestTimelineCollectorManager { - private TimelineCollectorManager collectorManager; - - @Before - public void setup() throws Exception { - collectorManager = createCollectorManager(); - collectorManager.init(new YarnConfiguration()); - collectorManager.start(); - } - - @After - public void tearDown() throws Exception { - if (collectorManager != null) { - collectorManager.stop(); - } - } - - @Test - public void testStartWebApp() throws Exception { - assertNotNull(collectorManager.getRestServerBindAddress()); - String address = collectorManager.getRestServerBindAddress(); - String[] parts = address.split(":"); - assertEquals(2, parts.length); - assertNotNull(parts[0]); - assertTrue(Integer.valueOf(parts[1]) > 0); - } - - @Test(timeout=60000) - public void testMultithreadedAdd() throws Exception { - final int NUM_APPS = 5; - List> tasks = new ArrayList>(); - for (int i = 0; i < NUM_APPS; i++) { - final ApplicationId appId = ApplicationId.newInstance(0L, i); - Callable task = new Callable() { - public Boolean call() { - AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); - return (collectorManager.putIfAbsent(appId, collector) == collector); - } - }; - tasks.add(task); - } - ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); - try { - List> futures = executor.invokeAll(tasks); - for (Future future: futures) { - assertTrue(future.get()); - } - } finally { - executor.shutdownNow(); - } - // check the keys - for (int i = 0; i < NUM_APPS; i++) { - final ApplicationId appId = ApplicationId.newInstance(0L, i); - assertTrue(collectorManager.containsKey(appId.toString())); - } - } - - @Test - public void testMultithreadedAddAndRemove() throws Exception { - final int NUM_APPS = 5; - List> tasks = new ArrayList>(); - for (int i = 0; i < NUM_APPS; i++) { - final ApplicationId appId = ApplicationId.newInstance(0L, i); - Callable task = new Callable() { - public Boolean call() { - AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); - boolean successPut = - (collectorManager.putIfAbsent(appId, collector) == collector); - return successPut && collectorManager.remove(appId.toString()); - } - }; - tasks.add(task); - } - ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS); - try { - List> futures = executor.invokeAll(tasks); - for (Future future: futures) { - assertTrue(future.get()); - } - } finally { - executor.shutdownNow(); - } - // check the keys - for (int i = 0; i < NUM_APPS; i++) { - final ApplicationId appId = ApplicationId.newInstance(0L, i); - assertFalse(collectorManager.containsKey(appId.toString())); - } - } - - private TimelineCollectorManager createCollectorManager() { - final TimelineCollectorManager collectorManager = - spy(new TimelineCollectorManager()); - doReturn(new Configuration()).when(collectorManager).getConfig(); - CollectorNodemanagerProtocol nmCollectorService = - mock(CollectorNodemanagerProtocol.class); - GetTimelineCollectorContextResponse response = - GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L); - try { - when(nmCollectorService.getTimelineCollectorContext(any( - GetTimelineCollectorContextRequest.class))).thenReturn(response); - } catch (YarnException | IOException e) { - fail(); - } - doReturn(nmCollectorService).when(collectorManager).getNMCollectorService(); - return collectorManager; - } -}