diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java index a4f1084..64c3749 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java @@ -49,7 +49,7 @@ public RMTimelineCollectorManager(RMContext rmContext) { } @Override - public void postPut(ApplicationId appId, TimelineCollector collector) { + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { RMApp app = rmContext.getRMApps().get(appId); if (app == null) { throw new YarnRuntimeException( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index eb05262..d276269 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -34,6 +34,7 @@ import com.google.common.base.Preconditions; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -93,7 +94,8 @@ protected void serviceStart() throws Exception { new ThreadFactoryBuilder() .setNameFormat("TimelineCollector Aggregation thread #%d") .build()); - appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0, + appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, TimeUnit.SECONDS); super.serviceStart(); @@ -126,10 +128,21 @@ public void run() { if (LOG.isDebugEnabled()) { LOG.debug("App-level real-time aggregating"); } + if (!isReadyToAggregate()) { + LOG.warn("App-level collector is not ready, skip aggregation. "); + return; + } try { TimelineCollectorContext currContext = getTimelineEntityContext(); + Map aggregationGroups + = getAggregationGroups(); + if (aggregationGroups == null + || aggregationGroups.isEmpty()) { + LOG.debug("App-level collector is empty, skip aggregation. "); + return; + } TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( - getAggregationGroups(), currContext.getAppId(), + aggregationGroups, currContext.getAppId(), TimelineEntityType.YARN_APPLICATION.toString()); TimelineEntities entities = new TimelineEntities(); entities.addEntity(resultEntity); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 75557a8..0323d7b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -87,7 +87,7 @@ protected void serviceStop() throws Exception { } @Override - public void postPut(ApplicationId appId, TimelineCollector collector) { + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { try { // Get context info from NM updateTimelineCollectorContext(appId, collector); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8cd645c..2fc3033 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -60,6 +60,8 @@ private static Set entityTypesSkipAggregation = new HashSet<>(); + private volatile boolean readyToAggregate = false; + public TimelineCollector(String name) { super(name); } @@ -91,6 +93,14 @@ protected TimelineWriter getWriter() { return aggregationGroups; } + protected void setReadyToAggregate() { + readyToAggregate = true; + } + + protected boolean isReadyToAggregate() { + return readyToAggregate; + } + /** * Method to decide the set of timeline entity types the collector should * skip on aggregations. Subclasses may want to override this method to @@ -258,7 +268,7 @@ static TimelineEntity aggregateWithoutGroupId( // Note: In memory aggregation is performed in an eventually consistent // fashion. - private static class AggregationStatusTable { + protected static class AggregationStatusTable { // On aggregation, for each metric, aggregate all per-entity accumulated // metrics. We only use the id and type for TimelineMetrics in the key set // of this table. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 8f74ffb..a8f88e5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -136,8 +136,24 @@ public TimelineCollector putIfAbsent(ApplicationId appId, return collectorInTable; } - protected void postPut(ApplicationId appId, TimelineCollector collector) { + /** + * Callback handler for the timeline collector manager when a collector has + * been added into the collector map. + * @param appId Application id of the collector. + * @param collector The actual timeline collector that has been added. + */ + public void postPut(ApplicationId appId, TimelineCollector collector) { + doPostPut(appId, collector); + collector.setReadyToAggregate(); + } + /** + * A template method that will be called by + * {@link #postPut(ApplicationId, TimelineCollector)}. + * @param appId Application id of the collector. + * @param collector The actual timeline collector that has been added. + */ + protected void doPostPut(ApplicationId appId, TimelineCollector collector) { } /**