diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ContainerEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ContainerEntity.java index cde6040..74914df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ContainerEntity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/ContainerEntity.java @@ -24,6 +24,10 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; +/** + * These Entities are published by the NM and it represents the NM side + * container life cycle events + */ @XmlRootElement(name = "container") @XmlAccessorType(XmlAccessType.NONE) @InterfaceAudience.Public diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/RMContainerEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/RMContainerEntity.java new file mode 100644 index 0000000..09be277 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/RMContainerEntity.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * These Entities are published by the NM and it represents the NM side + * container life cycle events + */ +@XmlRootElement(name = "rmcontainer") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class RMContainerEntity extends HierarchicalTimelineEntity { + public RMContainerEntity() { + super(TimelineEntityType.YARN_RM_CONTAINER.toString()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java index 1afb564..a92f7e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java @@ -45,6 +45,11 @@ public Identifier() { } + public Identifier(String type, String id) { + this.type = type; + this.id = id; + } + @XmlElement(name = "type") public String getType() { return type; @@ -62,6 +67,10 @@ public String getId() { public void setId(String id) { this.id = id; } + + public String toString() { + return "[" + type + "," + id + "]"; + } } private Identifier identifier; @@ -296,5 +305,7 @@ public void setModifiedTime(long modifiedTime) { this.modifiedTime = modifiedTime; } - + public String toString() { + return identifier.toString(); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java index 6062fe1..25dbe1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java @@ -27,7 +27,8 @@ YARN_FLOW, YARN_APPLICATION, YARN_APPLICATION_ATTEMPT, - YARN_CONTAINER, + YARN_CONTAINER, /* CONTAINER entities published by NM */ + YARN_RM_CONTAINER, /* CONTAINER entities published by RM */ YARN_USER, YARN_QUEUE; @@ -42,6 +43,7 @@ public boolean isParent(TimelineEntityType type) { case YARN_APPLICATION_ATTEMPT: return YARN_APPLICATION == type; case YARN_CONTAINER: + case YARN_RM_CONTAINER: return YARN_APPLICATION_ATTEMPT == type; case YARN_QUEUE: return YARN_QUEUE == type; @@ -61,6 +63,7 @@ public boolean isChild(TimelineEntityType type) { case YARN_APPLICATION_ATTEMPT: return YARN_CONTAINER == type; case YARN_CONTAINER: + case YARN_RM_CONTAINER: return false; case YARN_QUEUE: return YARN_QUEUE == type; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e819d1d..ed9ef33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -366,6 +366,16 @@ private static void addDeprecatedKeys() { + "system-metrics-publisher.enabled"; public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false; + /** + * The setting that controls whether yarn container metrics is published to the + * timeline server or not by RM. This configuration setting is for ATS + * V2 + */ + public static final String RM_PUBLISH_CONTAINER_METRICS_ENABLED = YARN_PREFIX + + "system-metrics-publisher.rm.publish.container-metrics"; + public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED = + false; + public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size"; public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index cbb0a8b..681944d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -95,7 +94,6 @@ private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; - private RMTimelineCollector timelineCollector; private RMNodeLabelsManager nodeLabelManager; private long epoch; @@ -379,18 +377,6 @@ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { @Private @Unstable - public RMTimelineCollector getRMTimelineCollector() { - return timelineCollector; - } - - @Private - @Unstable - public void setRMTimelineCollector(RMTimelineCollector timelineCollector) { - this.timelineCollector = timelineCollector; - } - - @Private - @Unstable public void setSystemMetricsPublisher( SystemMetricsPublisher systemMetricsPublisher) { this.systemMetricsPublisher = systemMetricsPublisher; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b96601c..ecf6166 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; /** * Context of the ResourceManager. @@ -109,10 +108,6 @@ void setRMApplicationHistoryWriter( void setSystemMetricsPublisher(SystemMetricsPublisher systemMetricsPublisher); SystemMetricsPublisher getSystemMetricsPublisher(); - - void setRMTimelineCollector(RMTimelineCollector timelineCollector); - - RMTimelineCollector getRMTimelineCollector(); ConfigurationProvider getConfigurationProvider(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 531d4c5..38c5722 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; @@ -355,17 +354,6 @@ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { } @Override - public void setRMTimelineCollector( - RMTimelineCollector timelineCollector) { - activeServiceContext.setRMTimelineCollector(timelineCollector); - } - - @Override - public RMTimelineCollector getRMTimelineCollector() { - return activeServiceContext.getRMTimelineCollector(); - } - - @Override public void setSystemMetricsPublisher( SystemMetricsPublisher systemMetricsPublisher) { activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b993ede..cf98bd0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -97,7 +97,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; -import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; @@ -356,10 +355,6 @@ protected RMApplicationHistoryWriter createRMApplicationHistoryWriter() { return new RMApplicationHistoryWriter(); } - private RMTimelineCollector createRMTimelineCollector() { - return new RMTimelineCollector(); - } - protected SystemMetricsPublisher createSystemMetricsPublisher() { return new SystemMetricsPublisher(); } @@ -482,11 +477,6 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(systemMetricsPublisher); rmContext.setSystemMetricsPublisher(systemMetricsPublisher); - RMTimelineCollector timelineCollector = - createRMTimelineCollector(); - addService(timelineCollector); - rmContext.setRMTimelineCollector(timelineCollector); - // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java index 2373b3b..7a0a334 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; +import java.util.Map; + import org.apache.hadoop.yarn.api.records.ApplicationId; public class ApplicationCreatedEvent extends @@ -29,6 +31,7 @@ private String user; private String queue; private long submittedTime; + private Map config; public ApplicationCreatedEvent(ApplicationId appId, String name, @@ -37,6 +40,17 @@ public ApplicationCreatedEvent(ApplicationId appId, String queue, long submittedTime, long createdTime) { + this(appId, name, type, user, queue, submittedTime, createdTime, null); + } + + public ApplicationCreatedEvent(ApplicationId appId, + String name, + String type, + String user, + String queue, + long submittedTime, + long createdTime, + Map config) { super(SystemMetricsEventType.APP_CREATED, createdTime); this.appId = appId; this.name = name; @@ -44,6 +58,7 @@ public ApplicationCreatedEvent(ApplicationId appId, this.user = user; this.queue = queue; this.submittedTime = submittedTime; + this.config = config; } @Override @@ -75,4 +90,7 @@ public long getSubmittedTime() { return submittedTime; } + public Map getConfig() { + return config; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java index 2828aec..54215d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java @@ -19,9 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.metrics; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,31 +27,24 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector; + +import com.google.common.annotations.VisibleForTesting; /** - * The class that helps RM publish metrics to the timeline server V1. RM will + * The class that helps RM publish metrics to the timeline server. RM will * always invoke the methods of this class regardless the service is enabled or * not. If it is disabled, publishing requests will be ignored silently. */ @@ -65,8 +56,8 @@ .getLog(SystemMetricsPublisher.class); private Dispatcher dispatcher; - private TimelineClient client; - private boolean publishSystemMetricsToATSv1; + private boolean publishSystemMetrics; + private boolean publishContainerMetrics; public SystemMetricsPublisher() { super(SystemMetricsPublisher.class.getName()); @@ -74,21 +65,27 @@ public SystemMetricsPublisher() { @Override protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetricsToATSv1 = + publishSystemMetrics = conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetricsToATSv1) { - client = TimelineClient.createTimelineClient(); - addIfService(client); - - dispatcher = createDispatcher(conf); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - addIfService(dispatcher); + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + if (publishSystemMetrics) { + TimelineServicePublisher timelineServicePublisher = + getTimelineServicePublisher(conf); + if (timelineServicePublisher != null) { + addService(timelineServicePublisher); + // init required to be called so that other methods of + // TimelineServicePublisher can be utilized + timelineServicePublisher.init(conf); + dispatcher = timelineServicePublisher.getDispatcher(); + publishContainerMetrics = + timelineServicePublisher.publishRMContainerMetrics(); + dispatcher.register(SystemMetricsEventType.class, + timelineServicePublisher.getEventHandler()); + addIfService(dispatcher); + } else { + LOG.info("TimelineServicePublisher is not configured"); + publishSystemMetrics = false; + } LOG.info("YARN system metrics publishing service is enabled"); } else { LOG.info("YARN system metrics publishing service is not enabled"); @@ -96,9 +93,22 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } + public TimelineServicePublisher getTimelineServicePublisher(Configuration conf) { + if(conf.getBoolean( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + return new TimelineServiceV1Publisher(); + } else if (conf.getBoolean( + YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, + YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) { + return new RMTimelineCollector();// AKA TimelineServiceV2Publisher + } + return null; + } + @SuppressWarnings("unchecked") public void appCreated(RMApp app, long createdTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationCreatedEvent( app.getApplicationId(), @@ -113,7 +123,7 @@ public void appCreated(RMApp app, long createdTime) { @SuppressWarnings("unchecked") public void appFinished(RMApp app, RMAppState state, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationFinishedEvent( app.getApplicationId(), @@ -130,7 +140,7 @@ public void appFinished(RMApp app, RMAppState state, long finishedTime) { @SuppressWarnings("unchecked") public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new ApplicationACLsUpdatedEvent( app.getApplicationId(), @@ -142,7 +152,7 @@ public void appACLsUpdated(RMApp app, String appViewACLs, @SuppressWarnings("unchecked") public void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new AppAttemptRegisteredEvent( appAttempt.getAppAttemptId(), @@ -158,7 +168,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt, @SuppressWarnings("unchecked") public void appAttemptFinished(RMAppAttempt appAttempt, RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishSystemMetrics) { dispatcher.getEventHandler().handle( new AppAttemptFinishedEvent( appAttempt.getAppAttemptId(), @@ -175,7 +185,7 @@ public void appAttemptFinished(RMAppAttempt appAttempt, @SuppressWarnings("unchecked") public void containerCreated(RMContainer container, long createdTime) { - if (publishSystemMetricsToATSv1) { + if (publishContainerMetrics) { dispatcher.getEventHandler().handle( new ContainerCreatedEvent( container.getContainerId(), @@ -188,7 +198,7 @@ public void containerCreated(RMContainer container, long createdTime) { @SuppressWarnings("unchecked") public void containerFinished(RMContainer container, long finishedTime) { - if (publishSystemMetricsToATSv1) { + if (publishContainerMetrics) { dispatcher.getEventHandler().handle( new ContainerFinishedEvent( container.getContainerId(), @@ -199,265 +209,18 @@ public void containerFinished(RMContainer container, long finishedTime) { } } - protected Dispatcher createDispatcher(Configuration conf) { - MultiThreadedDispatcher dispatcher = - new MultiThreadedDispatcher( - conf.getInt( - YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, - YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); - dispatcher.setDrainEventsOnStop(); - return dispatcher; - } - - protected void handleSystemMetricsEvent( - SystemMetricsEvent event) { - switch (event.getType()) { - case APP_CREATED: - publishApplicationCreatedEvent((ApplicationCreatedEvent) event); - break; - case APP_FINISHED: - publishApplicationFinishedEvent((ApplicationFinishedEvent) event); - break; - case APP_ACLS_UPDATED: - publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); - break; - case APP_ATTEMPT_REGISTERED: - publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); - break; - case APP_ATTEMPT_FINISHED: - publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); - break; - case CONTAINER_CREATED: - publishContainerCreatedEvent((ContainerCreatedEvent) event); - break; - case CONTAINER_FINISHED: - publishContainerFinishedEvent((ContainerFinishedEvent) event); - break; - default: - LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); - } - } - - private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, - event.getApplicationName()); - entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, - event.getApplicationType()); - entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, - event.getUser()); - entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, - event.getQueue()); - entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, - event.getSubmittedTime()); - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationState().toString()); - if (event.getLatestApplicationAttemptId() != null) { - eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, - event.getLatestApplicationAttemptId().toString()); - } - RMAppMetrics appMetrics = event.getAppMetrics(); - entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, - appMetrics.getVcoreSeconds()); - entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, - appMetrics.getMemorySeconds()); - - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishApplicationACLsUpdatedEvent( - ApplicationACLsUpdatedEvent event) { - TimelineEntity entity = - createApplicationEntity(event.getApplicationId()); - TimelineEvent tEvent = new TimelineEvent(); - Map entityInfo = new HashMap(); - entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, - event.getViewAppACLs()); - entity.setOtherInfo(entityInfo); - tEvent.setEventType( - ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createApplicationEntity( - ApplicationId applicationId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); - entity.setEntityId(applicationId.toString()); - return entity; - } - - private void - publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType( - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, - event.getHost()); - eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, - event.getRpcPort()); - eventInfo.put( - AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, - event.getMasterContainerId().toString()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); + @VisibleForTesting + boolean isPublishContainerMetrics() { + return publishContainerMetrics; } - private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { - TimelineEntity entity = - createAppAttemptEntity(event.getApplicationAttemptId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put( - AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, - event.getTrackingUrl()); - eventInfo.put( - AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, - event.getOriginalTrackingURL()); - eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, - event.getFinalApplicationStatus().toString()); - eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, - event.getYarnApplicationAttemptState().toString()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createAppAttemptEntity( - ApplicationAttemptId appAttemptId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - AppAttemptMetricsConstants.ENTITY_TYPE); - entity.setEntityId(appAttemptId.toString()); - entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, - appAttemptId.getApplicationId().toString()); - return entity; - } - - private void publishContainerCreatedEvent(ContainerCreatedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - Map entityInfo = new HashMap(); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, - event.getAllocatedResource().getMemory()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, - event.getAllocatedResource().getVirtualCores()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, - event.getAllocatedNode().getHost()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, - event.getAllocatedNode().getPort()); - entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, - event.getAllocatedPriority().getPriority()); - entityInfo.put( - ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, - event.getNodeHttpAddress()); - entity.setOtherInfo(entityInfo); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - entity.addEvent(tEvent); - putEntity(entity); - } - - private void publishContainerFinishedEvent(ContainerFinishedEvent event) { - TimelineEntity entity = createContainerEntity(event.getContainerId()); - TimelineEvent tEvent = new TimelineEvent(); - tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); - tEvent.setTimestamp(event.getTimestamp()); - Map eventInfo = new HashMap(); - eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, - event.getDiagnosticsInfo()); - eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, - event.getContainerExitStatus()); - eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, - event.getContainerState().toString()); - tEvent.setEventInfo(eventInfo); - entity.addEvent(tEvent); - putEntity(entity); - } - - private static TimelineEntity createContainerEntity( - ContainerId containerId) { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityType( - ContainerMetricsConstants.ENTITY_TYPE); - entity.setEntityId(containerId.toString()); - entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, - containerId.getApplicationAttemptId().toString()); - return entity; - } - - private void putEntity(TimelineEntity entity) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Publishing the entity " + entity.getEntityId() + - ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - client.putEntities(entity); - } catch (Exception e) { - LOG.error("Error when publishing entity [" + entity.getEntityType() + "," - + entity.getEntityId() + "]", e); - } - } - - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler { - - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); - } - + @VisibleForTesting + Dispatcher getDispatcher() { + return dispatcher; } - + @SuppressWarnings({ "rawtypes", "unchecked" }) - protected static class MultiThreadedDispatcher extends CompositeService + public static class MultiThreadedDispatcher extends CompositeService implements Dispatcher { private List dispatchers = @@ -507,7 +270,23 @@ public void handle(Event event) { protected AsyncDispatcher createDispatcher() { return new AsyncDispatcher(); } - } + public interface TimelineServicePublisher extends Service { + /** + * @return the Dispatcher which needs to be used to dispatch events + */ + public Dispatcher getDispatcher(); + + /** + * @return true if RMContainerMetricsNeeds to be sent + */ + public boolean publishRMContainerMetrics(); + + /** + * @return EventHandler which needs to be registered to the dispatcher to + * handle the SystemMetricsEvent + */ + public EventHandler getEventHandler(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java new file mode 100644 index 0000000..bf42b70 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java @@ -0,0 +1,318 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.MultiThreadedDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; + +public class TimelineServiceV1Publisher extends CompositeService +implements TimelineServicePublisher, EventHandler { + + private static final Log LOG = LogFactory + .getLog(TimelineServiceV1Publisher.class); + + public TimelineServiceV1Publisher() { + super("TimelineserviceV1Publisher"); + } + + private Configuration conf; + private TimelineClient client; + + @Override + public void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + client = TimelineClient.createTimelineClient(); + addIfService(client); + super.serviceInit(conf); + } + + @Override + public Dispatcher getDispatcher() { + MultiThreadedDispatcher dispatcher = + new MultiThreadedDispatcher( + conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE)); + dispatcher.setDrainEventsOnStop(); + return dispatcher; + } + + @Override + public boolean publishRMContainerMetrics() { + return true; + } + + @Override + public EventHandler getEventHandler() { + return this; + } + + @Override + public void handle(SystemMetricsEvent event) { + switch (event.getType()) { + case APP_CREATED: + publishApplicationCreatedEvent((ApplicationCreatedEvent) event); + break; + case APP_FINISHED: + publishApplicationFinishedEvent((ApplicationFinishedEvent) event); + break; + case APP_ACLS_UPDATED: + publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); + break; + case APP_ATTEMPT_REGISTERED: + publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); + break; + case APP_ATTEMPT_FINISHED: + publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); + break; + case CONTAINER_CREATED: + publishContainerCreatedEvent((ContainerCreatedEvent) event); + break; + case CONTAINER_FINISHED: + publishContainerFinishedEvent((ContainerFinishedEvent) event); + break; + default: + LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); + } +} + + private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, + event.getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, + event.getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + RMAppMetrics appMetrics = event.getAppMetrics(); + entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishApplicationACLsUpdatedEvent( + ApplicationACLsUpdatedEvent event) { + TimelineEntity entity = + createApplicationEntity(event.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + event.getViewAppACLs()); + entity.setOtherInfo(entityInfo); + tEvent.setEventType( + ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createApplicationEntity( + ApplicationId applicationId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE); + entity.setEntityId(applicationId.toString()); + return entity; + } + + private void + publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, + event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + eventInfo.put( + AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, + event.getMasterContainerId().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put( + AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, + event.getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, + event.getYarnApplicationAttemptState().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType( + AppAttemptMetricsConstants.ENTITY_TYPE); + entity.setEntityId(appAttemptId.toString()); + entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, + appAttemptId.getApplicationId().toString()); + return entity; + } + + private void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + event.getAllocatedResource().getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, + event.getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, + event.getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, + event.getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + event.getNodeHttpAddress()); + entity.setOtherInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, + event.getContainerState().toString()); + tEvent.setEventInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static TimelineEntity createContainerEntity( + ContainerId containerId) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityType( + ContainerMetricsConstants.ENTITY_TYPE); + entity.setEntityId(containerId.toString()); + entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, + containerId.getApplicationAttemptId().toString()); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity.getEntityId() + + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + client.putEntities(entity); + } catch (Exception e) { + LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + + entity.getEntityId() + "]", e); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java index 4ea7a03..73e8c01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java @@ -18,57 +18,75 @@ package org.apache.hadoop.yarn.server.resourcemanager.timelineservice; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.RMContainerEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.AppAttemptFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.AppAttemptRegisteredEvent; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.ApplicationACLsUpdatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.ApplicationCreatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.ApplicationFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.ContainerCreatedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.ContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; -import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.MultiThreadedDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** - * This class is responsible for posting application and appattempt lifecycle - * related events to timeline service V2 + * This class is responsible for posting application, appattempt & Container + * lifecycle related events to timeline service V2 */ @Private @Unstable -public class RMTimelineCollector extends TimelineCollector { +public class RMTimelineCollector extends TimelineCollector implements + TimelineServicePublisher, EventHandler { private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class); public RMTimelineCollector() { super("Resource Manager TimelineCollector"); } - private Dispatcher dispatcher; - - private boolean publishSystemMetricsForV2; + private Configuration conf; + private boolean publishContainerMetrics; @Override protected void serviceInit(Configuration conf) throws Exception { - publishSystemMetricsForV2 = - conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) - && conf.getBoolean( - YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, - YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED); - - if (publishSystemMetricsForV2) { - // having separate dispatcher to avoid load on RMDispatcher - LOG.info("RMTimelineCollector has been configured to publish" - + " System Metrics in ATS V2"); - dispatcher = new AsyncDispatcher(); - dispatcher.register(SystemMetricsEventType.class, - new ForwardingEventHandler()); - } else { - LOG.warn("RMTimelineCollector has not been configured to publish" - + " System Metrics in ATS V2"); - } + LOG.info("RMTimelineCollector has been configured to publish" + + " System Metrics in ATS V2"); + this.conf = conf; + + publishContainerMetrics = + conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED); super.serviceInit(conf); } @@ -82,30 +100,263 @@ protected void serviceStop() throws Exception { super.serviceStop(); } - protected void handleSystemMetricsEvent(SystemMetricsEvent event) { + public void handle(SystemMetricsEvent event) { switch (event.getType()) { + case APP_CREATED: + publishApplicationCreatedEvent((ApplicationCreatedEvent) event); + break; + case APP_FINISHED: + publishApplicationFinishedEvent((ApplicationFinishedEvent) event); + break; + case APP_ACLS_UPDATED: + publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event); + break; + case APP_ATTEMPT_REGISTERED: + publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event); + break; + case APP_ATTEMPT_FINISHED: + publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event); + break; + case CONTAINER_CREATED: + publishContainerCreatedEvent((ContainerCreatedEvent) event); + break; + case CONTAINER_FINISHED: + publishContainerFinishedEvent((ContainerFinishedEvent) event); + break; default: LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); } } - + + private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, + event.getApplicationName()); + entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, + event.getApplicationType()); + entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, + event.getUser()); + entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, + event.getQueue()); + entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, + event.getSubmittedTime()); + entity.setInfo(entityInfo); + if (event.getConfig() != null) { + Iterator> iterator = + event.getConfig().entrySet().iterator(); + while (iterator.hasNext()) { + Entry config = iterator.next(); + entity.addConfig(config.getKey(), config.getValue()); + } + } + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationState().toString()); + if (event.getLatestApplicationAttemptId() != null) { + eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, + event.getLatestApplicationAttemptId().toString()); + } + RMAppMetrics appMetrics = event.getAppMetrics(); + entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS, + appMetrics.getVcoreSeconds()); + entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS, + appMetrics.getMemorySeconds()); + + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishApplicationACLsUpdatedEvent( + ApplicationACLsUpdatedEvent event) { + ApplicationEntity entity = + createApplicationEntity(event.getApplicationId()); + TimelineEvent tEvent = new TimelineEvent(); + Map entityInfo = new HashMap(); + entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO, + event.getViewAppACLs()); + entity.setInfo(entityInfo); + tEvent.setId(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static ApplicationEntity createApplicationEntity( + ApplicationId applicationId) { + ApplicationEntity entity = new ApplicationEntity(); + entity.setId(applicationId.toString()); + return entity; + } + + private void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { + TimelineEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost()); + eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, + event.getRpcPort()); + eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event + .getMasterContainerId().toString()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { + ApplicationAttemptEntity entity = + createAppAttemptEntity(event.getApplicationAttemptId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, + event.getTrackingUrl()); + eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, + event.getOriginalTrackingURL()); + eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event + .getFinalApplicationStatus().toString()); + eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event + .getYarnApplicationAttemptState().toString()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishContainerCreatedEvent(ContainerCreatedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, + event.getAllocatedResource().getMemory()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event + .getAllocatedResource().getVirtualCores()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event + .getAllocatedNode().getHost()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event + .getAllocatedNode().getPort()); + entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, + event.getAllocatedPriority().getPriority()); + entityInfo.put( + ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, + event.getNodeHttpAddress()); + entity.setInfo(entityInfo); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + entity.addEvent(tEvent); + putEntity(entity); + } + + private void publishContainerFinishedEvent(ContainerFinishedEvent event) { + TimelineEntity entity = createContainerEntity(event.getContainerId()); + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setTimestamp(event.getTimestamp()); + Map eventInfo = new HashMap(); + eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, + event.getDiagnosticsInfo()); + eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, + event.getContainerExitStatus()); + eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event + .getContainerState().toString()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + putEntity(entity); + } + + private static RMContainerEntity createContainerEntity(ContainerId containerId) { + RMContainerEntity entity = new RMContainerEntity(); + entity.setId(containerId.toString()); + entity.setParent(new Identifier()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION_ATTEMPT + .name(), containerId.getApplicationAttemptId().toString())); + return entity; + } + + private void putEntity(TimelineEntity entity) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + + TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + // TODO need to take care of security : + // UserGroupInformation.getCurrentUser() + putEntities(entities, null); + } catch (Exception e) { + LOG.error("Error when publishing entity " + entity, e); + } + } + + private static ApplicationAttemptEntity createAppAttemptEntity( + ApplicationAttemptId appAttemptId) { + ApplicationAttemptEntity entity = new ApplicationAttemptEntity(); + entity.setId(appAttemptId.toString()); + entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(), + appAttemptId.getApplicationId().toString())); + return entity; + } + @Override protected TimelineCollectorContext getTimelineEntityContext() { // TODO address in YARN-3390. return null; } - /** - * EventHandler implementation which forward events to SystemMetricsPublisher. - * Making use of it, SystemMetricsPublisher can avoid to have a public handle - * method. - */ - private final class ForwardingEventHandler implements - EventHandler { + @Override + public Dispatcher getDispatcher() { + Dispatcher dispatcher = null; - @Override - public void handle(SystemMetricsEvent event) { - handleSystemMetricsEvent(event); + if (publishContainerMetrics) { + int poolSize = + conf.getInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE); + dispatcher = new MultiThreadedDispatcher(poolSize); + ((MultiThreadedDispatcher) dispatcher).setDrainEventsOnStop(); + } else { + // Normal dispatcher is sufficient if container metrics are not required + // to be published + dispatcher = new AsyncDispatcher(); } + return dispatcher; + } + + @Override + public boolean publishRMContainerMetrics() { + return publishContainerMetrics; + } + + @Override + public EventHandler getEventHandler() { + return this; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestRMTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestRMTimelineCollector.java new file mode 100644 index 0000000..90a4882 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestRMTimelineCollector.java @@ -0,0 +1,367 @@ +package org.apache.hadoop.yarn.server.resourcemanager.metrics; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.MultiThreadedDispatcher; +import org.junit.Test; + +public class TestRMTimelineCollector { + + private static Configuration getTimelineV2Conf() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + conf.setInt( + YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, + 2); + return conf; + } + + @Test + public void testSystemMetricPublisherInitialization() { + SystemMetricsPublisher metricsPublisher = new SystemMetricsPublisher(); + Configuration conf = getTimelineV2Conf(); + metricsPublisher.init(conf); + assertFalse( + "Default configuration should not publish container Metrics from RM", + metricsPublisher.isPublishContainerMetrics()); + assertTrue( + "Simple AsyncDispatcher expected when container Metrics is not published", + metricsPublisher.getDispatcher() instanceof AsyncDispatcher); + + metricsPublisher.stop(); + + metricsPublisher = new SystemMetricsPublisher(); + conf = getTimelineV2Conf(); + conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED, + true); + metricsPublisher.init(conf); + assertTrue( + "Default configuration should not publish container Metrics from RM", + metricsPublisher.isPublishContainerMetrics()); + assertTrue( + "Simple AsyncDispatcher expected when container Metrics is not published", + metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher); + + metricsPublisher.stop(); + + } + /* + private static SystemMetricsPublisher metricsPublisher; + + @BeforeClass + public static void setup() throws Exception { + metricsPublisher = new SystemMetricsPublisher(); + metricsPublisher.init(getTimelineV2Conf()); + metricsPublisher.start(); + } + + + + @AfterClass + public static void tearDown() throws Exception { + if (metricsPublisher != null) { + metricsPublisher.stop(); + } + } + @Test(timeout = 10000) + public void testPublishApplicationMetrics() throws Exception { + for (int i = 1; i <= 2; ++i) { + ApplicationId appId = ApplicationId.newInstance(0, i); + RMApp app = createRMApp(appId); + metricsPublisher.appCreated(app, app.getStartTime()); + metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); + if (i == 1) { + metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L); + } else { + // in case user doesn't specify the ACLs + metricsPublisher.appACLsUpdated(app, null, 4L); + } + TimelineEntity entity = null; + do { + entity = + store.getEntity(appId.toString(), + ApplicationMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + // ensure three events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 3); + // verify all the fields + Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, + entity.getEntityType()); + Assert + .assertEquals(app.getApplicationId().toString(), entity.getEntityId()); + Assert + .assertEquals( + app.getName(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.NAME_ENTITY_INFO)); + Assert.assertEquals(app.getQueue(), + entity.getOtherInfo() + .get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); + Assert + .assertEquals( + app.getUser(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.USER_ENTITY_INFO)); + Assert + .assertEquals( + app.getApplicationType(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.TYPE_ENTITY_INFO)); + Assert.assertEquals(app.getSubmitTime(), + entity.getOtherInfo().get( + ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO)); + if (i == 1) { + Assert.assertEquals("uers1,user2", + entity.getOtherInfo().get( + ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)); + } else { + Assert.assertEquals( + "", + entity.getOtherInfo().get( + ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)); + Assert.assertEquals( + app.getRMAppMetrics().getMemorySeconds(), + Long.parseLong(entity.getOtherInfo() + .get(ApplicationMetricsConstants.APP_MEM_METRICS).toString())); + Assert.assertEquals( + app.getRMAppMetrics().getVcoreSeconds(), + Long.parseLong(entity.getOtherInfo() + .get(ApplicationMetricsConstants.APP_CPU_METRICS).toString())); + } + boolean hasCreatedEvent = false; + boolean hasFinishedEvent = false; + boolean hasACLsUpdatedEvent = false; + for (TimelineEvent event : entity.getEvents()) { + if (event.getEventType().equals( + ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + hasCreatedEvent = true; + Assert.assertEquals(app.getStartTime(), event.getTimestamp()); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals(app.getFinishTime(), event.getTimestamp()); + Assert.assertEquals( + app.getDiagnostics().toString(), + event.getEventInfo().get( + ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); + Assert.assertEquals( + app.getFinalApplicationStatus().toString(), + event.getEventInfo().get( + ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)); + Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event + .getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO)); + } else if (event.getEventType().equals( + ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) { + hasACLsUpdatedEvent = true; + Assert.assertEquals(4L, event.getTimestamp()); + } + } + Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent); + } + } + + @Test(timeout = 10000) + public void testPublishAppAttemptMetrics() throws Exception { + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); + RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); + metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); + RMApp app = mock(RMApp.class); + when(app.getFinalApplicationStatus()).thenReturn(FinalApplicationStatus.UNDEFINED); + metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app, + Integer.MAX_VALUE + 2L); + TimelineEntity entity = null; + do { + entity = + store.getEntity(appAttemptId.toString(), + AppAttemptMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + // verify all the fields + Assert.assertEquals(AppAttemptMetricsConstants.ENTITY_TYPE, + entity.getEntityType()); + Assert.assertEquals(appAttemptId.toString(), entity.getEntityId()); + Assert.assertEquals( + appAttemptId.getApplicationId().toString(), + entity.getPrimaryFilters() + .get(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER).iterator() + .next()); + boolean hasRegisteredEvent = false; + boolean hasFinishedEvent = false; + for (TimelineEvent event : entity.getEvents()) { + if (event.getEventType().equals( + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { + hasRegisteredEvent = true; + Assert.assertEquals(appAttempt.getHost(), + event.getEventInfo() + .get(AppAttemptMetricsConstants.HOST_EVENT_INFO)); + Assert + .assertEquals(appAttempt.getRpcPort(), + event.getEventInfo().get( + AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO)); + Assert.assertEquals( + appAttempt.getMasterContainer().getId().toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)); + } else if (event.getEventType().equals( + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals(appAttempt.getDiagnostics(), event.getEventInfo() + .get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); + Assert.assertEquals(appAttempt.getTrackingUrl(), event.getEventInfo() + .get(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)); + Assert.assertEquals( + appAttempt.getOriginalTrackingUrl(), + event.getEventInfo().get( + AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)); + Assert.assertEquals( + FinalApplicationStatus.UNDEFINED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO)); + Assert.assertEquals( + YarnApplicationAttemptState.FINISHED.toString(), + event.getEventInfo().get( + AppAttemptMetricsConstants.STATE_EVENT_INFO)); + } + } + Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent); + } + + @Test(timeout = 10000) + public void testPublishContainerMetrics() throws Exception { + ContainerId containerId = + ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 1), 1), 1); + RMContainer container = createRMContainer(containerId); + metricsPublisher.containerCreated(container, container.getCreationTime()); + metricsPublisher.containerFinished(container, container.getFinishTime()); + TimelineEntity entity = null; + do { + entity = + store.getEntity(containerId.toString(), + ContainerMetricsConstants.ENTITY_TYPE, + EnumSet.allOf(Field.class)); + // ensure two events are both published before leaving the loop + } while (entity == null || entity.getEvents().size() < 2); + // verify all the fields + Assert.assertEquals(ContainerMetricsConstants.ENTITY_TYPE, + entity.getEntityType()); + Assert.assertEquals(containerId.toString(), entity.getEntityId()); + Assert.assertEquals( + containerId.getApplicationAttemptId().toString(), + entity.getPrimaryFilters() + .get(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER).iterator() + .next()); + Assert.assertEquals( + container.getAllocatedNode().getHost(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedNode().getPort(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedResource().getMemory(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedResource().getVirtualCores(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO)); + Assert.assertEquals( + container.getAllocatedPriority().getPriority(), + entity.getOtherInfo().get( + ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO)); + boolean hasCreatedEvent = false; + boolean hasFinishedEvent = false; + for (TimelineEvent event : entity.getEvents()) { + if (event.getEventType().equals( + ContainerMetricsConstants.CREATED_EVENT_TYPE)) { + hasCreatedEvent = true; + Assert.assertEquals(container.getCreationTime(), event.getTimestamp()); + } else if (event.getEventType().equals( + ContainerMetricsConstants.FINISHED_EVENT_TYPE)) { + hasFinishedEvent = true; + Assert.assertEquals(container.getFinishTime(), event.getTimestamp()); + Assert.assertEquals( + container.getDiagnosticsInfo(), + event.getEventInfo().get( + ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); + Assert.assertEquals( + container.getContainerExitStatus(), + event.getEventInfo().get( + ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO)); + Assert.assertEquals(container.getContainerState().toString(), event + .getEventInfo().get(ContainerMetricsConstants.STATE_EVENT_INFO)); + } + } + Assert.assertTrue(hasCreatedEvent && hasFinishedEvent); + } + + private static RMApp createRMApp(ApplicationId appId) { + RMApp app = mock(RMApp.class); + when(app.getApplicationId()).thenReturn(appId); + when(app.getName()).thenReturn("test app"); + when(app.getApplicationType()).thenReturn("test app type"); + when(app.getUser()).thenReturn("test user"); + when(app.getQueue()).thenReturn("test queue"); + when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); + when(app.getDiagnostics()).thenReturn( + new StringBuilder("test diagnostics info")); + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(appId, 1)); + when(app.getCurrentAppAttempt()).thenReturn(appAttempt); + when(app.getFinalApplicationStatus()).thenReturn( + FinalApplicationStatus.UNDEFINED); + when(app.getRMAppMetrics()).thenReturn( + new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE)); + return app; + } + + private static RMAppAttempt createRMAppAttempt( + ApplicationAttemptId appAttemptId) { + RMAppAttempt appAttempt = mock(RMAppAttempt.class); + when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); + when(appAttempt.getHost()).thenReturn("test host"); + when(appAttempt.getRpcPort()).thenReturn(-100); + Container container = mock(Container.class); + when(container.getId()) + .thenReturn(ContainerId.newContainerId(appAttemptId, 1)); + when(appAttempt.getMasterContainer()).thenReturn(container); + when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); + when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); + when(appAttempt.getOriginalTrackingUrl()).thenReturn( + "test original tracking url"); + return appAttempt; + } + + private static RMContainer createRMContainer(ContainerId containerId) { + RMContainer container = mock(RMContainer.class); + when(container.getContainerId()).thenReturn(containerId); + when(container.getAllocatedNode()).thenReturn( + NodeId.newInstance("test host", -100)); + when(container.getAllocatedResource()).thenReturn( + Resource.newInstance(-1, -1)); + when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); + when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L); + when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L); + when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); + when(container.getContainerExitStatus()).thenReturn(-1); + when(container.getContainerState()).thenReturn(ContainerState.COMPLETE); + Container mockContainer = mock(Container.class); + when(container.getContainer()).thenReturn(mockContainer); + when(mockContainer.getNodeHttpAddress()) + .thenReturn("http://localhost:1234"); + return container; + } + */ +}