diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index eb05262..bd8593a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -34,6 +34,7 @@ import com.google.common.base.Preconditions; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -58,6 +59,8 @@ private final TimelineCollectorContext context; private ScheduledThreadPoolExecutor appAggregationExecutor; + private volatile boolean readyToAggregate = false; + public AppLevelTimelineCollector(ApplicationId appId) { super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); Preconditions.checkNotNull(appId, "AppId shouldn't be null"); @@ -93,7 +96,8 @@ protected void serviceStart() throws Exception { new ThreadFactoryBuilder() .setNameFormat("TimelineCollector Aggregation thread #%d") .build()); - appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), 0, + appAggregationExecutor.scheduleAtFixedRate(new AppLevelAggregator(), + AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, AppLevelTimelineCollector.AGGREGATION_EXECUTOR_EXEC_INTERVAL_SECS, TimeUnit.SECONDS); super.serviceStart(); @@ -119,6 +123,11 @@ public TimelineCollectorContext getTimelineEntityContext() { return entityTypesSkipAggregation; } + @Override + protected void postPut() { + readyToAggregate = true; + } + private class AppLevelAggregator implements Runnable { @Override @@ -126,10 +135,20 @@ public void run() { if (LOG.isDebugEnabled()) { LOG.debug("App-level real-time aggregating"); } + if (!readyToAggregate) { + LOG.warn("App-level collector is not ready, skip aggregation. "); + return; + } try { TimelineCollectorContext currContext = getTimelineEntityContext(); + Map aggregationGroups + = getAggregationGroups(); + if (aggregationGroups != null + && aggregationGroups.isEmpty()) { + LOG.debug("App-level collector is empty, skip aggregation. "); + } TimelineEntity resultEntity = TimelineCollector.aggregateWithoutGroupId( - getAggregationGroups(), currContext.getAppId(), + aggregationGroups, currContext.getAppId(), TimelineEntityType.YARN_APPLICATION.toString()); TimelineEntities entities = new TimelineEntities(); entities.addEntity(resultEntity); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 8cd645c..2412515 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -92,6 +92,13 @@ protected TimelineWriter getWriter() { } /** + * Method triggered when this collector has been successfully put into the + * collector map of the collector. Semantically, this means the collector is + * loaded with all required data and is ready to publish data. + */ + protected void postPut() {} + + /** * Method to decide the set of timeline entity types the collector should * skip on aggregations. Subclasses may want to override this method to * customize their own behaviors. @@ -258,7 +265,7 @@ static TimelineEntity aggregateWithoutGroupId( // Note: In memory aggregation is performed in an eventually consistent // fashion. - private static class AggregationStatusTable { + protected static class AggregationStatusTable { // On aggregation, for each metric, aggregate all per-entity accumulated // metrics. We only use the id and type for TimelineMetrics in the key set // of this table. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 8f74ffb..5aadd24 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -126,6 +126,7 @@ public TimelineCollector putIfAbsent(ApplicationId appId, LOG.info("the collector for " + appId + " was added"); collectorInTable = collector; postPut(appId, collectorInTable); + collector.postPut(); } catch (Exception e) { throw new YarnRuntimeException(e); }