diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index 7eec7c3..6b10881 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -84,14 +84,17 @@ public synchronized TimelineStore getStore() { * @param aclManager * @param jsonFactory * @param objMapper + * @param metrics * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore} * object filled with all entities in the group. * @throws IOException */ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, TimelineACLsManager aclManager, JsonFactory jsonFactory, - ObjectMapper objMapper) throws IOException { + ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics) + throws IOException { if (needRefresh()) { + long startTime = Time.monotonicNow(); // If an application is not finished, we only update summary logs (and put // new entities into summary storage). // Otherwise, since the application is done, we can update detail logs. @@ -108,10 +111,11 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, store.start(); } List removeList = new ArrayList<>(); - try(TimelineDataManager tdm = + try (TimelineDataManager tdm = new TimelineDataManager(store, aclManager)) { tdm.init(config); tdm.start(); + metrics.incrCacheRefreshOps(); for (LogInfo log : appLogs.getDetailLogs()) { LOG.debug("Try refresh logs for {}", log.getFilename()); // Only refresh the log that matches the cache id @@ -133,6 +137,7 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, appLogs.getDetailLogs().removeAll(removeList); } updateRefreshTimeToNow(); + metrics.addCacheRefreshTime(Time.monotonicNow() - startTime); } else { LOG.debug("Cache new enough, skip refreshing"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 34a2072..bd95f6d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -125,12 +125,15 @@ private List cacheIdPlugins; private Map cachedLogs; + private EntityGroupFSTimelineStoreMetrics metrics; + public EntityGroupFSTimelineStore() { super(EntityGroupFSTimelineStore.class.getSimpleName()); } @Override protected void serviceInit(Configuration conf) throws Exception { + metrics = EntityGroupFSTimelineStoreMetrics.create(); summaryStore = createSummaryStore(); addService(summaryStore); @@ -310,6 +313,7 @@ protected void serviceStop() throws Exception { @InterfaceAudience.Private @VisibleForTesting int scanActiveLogs() throws IOException { + long startTime = Time.monotonicNow(); RemoteIterator iter = list(activeRootPath); int logsToScanCount = 0; while (iter.hasNext()) { @@ -325,6 +329,7 @@ int scanActiveLogs() throws IOException { LOG.debug("Unable to parse entry {}", name); } } + metrics.addLogScanTime(Time.monotonicNow() - startTime); return logsToScanCount; } @@ -417,6 +422,7 @@ void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) if (!fs.delete(dirpath, true)) { LOG.error("Unable to remove " + dirpath); } + metrics.incrLogsDirsCleaned(); } catch (IOException e) { LOG.error("Unable to remove " + dirpath, e); } @@ -582,6 +588,7 @@ public synchronized void parseSummaryLogs() throws IOException { @VisibleForTesting synchronized void parseSummaryLogs(TimelineDataManager tdm) throws IOException { + long startTime = Time.monotonicNow(); if (!isDone()) { LOG.debug("Try to parse summary log for log {} in {}", appId, appDirPath); @@ -599,8 +606,10 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) List removeList = new ArrayList(); for (LogInfo log : summaryLogs) { if (fs.exists(log.getPath(appDirPath))) { - log.parseForStore(tdm, appDirPath, isDone(), jsonFactory, + long summaryEntityParsed + = log.parseForStore(tdm, appDirPath, isDone(), jsonFactory, objMapper, fs); + metrics.incrEntitiesScannedToSummary(summaryEntityParsed); } else { // The log may have been removed, remove the log removeList.add(log); @@ -609,6 +618,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) } } summaryLogs.removeAll(removeList); + metrics.addSummaryDataReadTime(Time.monotonicNow() - startTime); } // scans for new logs and returns the modification timestamp of the @@ -781,6 +791,7 @@ public void run() { @Override public void run() { LOG.debug("Cleaner starting"); + long startTime = Time.monotonicNow(); try { cleanLogs(doneRootPath, fs, logRetainMillis); } catch (Exception e) { @@ -790,6 +801,8 @@ public void run() { } else { LOG.error("Error cleaning files", e); } + } finally { + metrics.addLogCleanTime(Time.monotonicNow() - startTime); } LOG.debug("Cleaner finished"); } @@ -818,11 +831,13 @@ void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { if (storeForId != null) { LOG.debug("Adding {} as a store for the query", storeForId.getName()); stores.add(storeForId); + metrics.incrGetEntityToDetailOps(); } } if (stores.size() == 0) { LOG.debug("Using summary store for {}", entityType); stores.add(this.summaryStore); + metrics.incrGetEntityToSummaryOps(); } return stores; } @@ -892,7 +907,7 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId) AppLogs appLogs = cacheItem.getAppLogs(); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); store = cacheItem.refreshCache(groupId, aclManager, jsonFactory, - objMapper); + objMapper, metrics); } else { LOG.warn("AppLogs for group id {} is null", groupId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java new file mode 100644 index 0000000..83c38ef --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStoreMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** This class tracks metrics for the EntityGroupFSTimelineStore. */ +@Metrics(about="Metrics for EntityGroupFSTimelineStore", context="yarn") +public class EntityGroupFSTimelineStoreMetrics { + @Metric("entities scanned into the summary storage") + MutableCounterLong entitiesScannedToSummary; + + @Metric("log cleaner dirs purged") + MutableCounterLong logsDirsCleaned; + + @Metric("getEntity calls to summary storage") + MutableCounterLong getEntityToSummaryOps; + + @Metric("getEntity calls to detail storage") + MutableCounterLong getEntityToDetailOps; + + @Metric("cache storage refresh calls") + MutableCounterLong cacheRefreshOps; + + @Metric("cache storage refresh time") + MutableRate cacheRefreshTime; + + @Metric("summary data read time") + MutableRate summaryDataReadTime; + + @Metric("active log scan time") + MutableRate logScanTime; + + @Metric("log cleaner purging time") + MutableRate logCleanTime; + + private static EntityGroupFSTimelineStoreMetrics instance = null; + + EntityGroupFSTimelineStoreMetrics() { + } + + public static synchronized EntityGroupFSTimelineStoreMetrics create() { + if (instance == null) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + instance = ms.register(new EntityGroupFSTimelineStoreMetrics()); + } + return instance; + } + + public void incrEntitiesScannedToSummary(long delta) { + entitiesScannedToSummary.incr(delta); + } + + public void incrLogsDirsCleaned() { + logsDirsCleaned.incr(); + } + + public void incrGetEntityToSummaryOps() { + getEntityToSummaryOps.incr(); + } + + public void incrGetEntityToDetailOps() { + getEntityToDetailOps.incr(); + } + + public void incrCacheRefreshOps() { + cacheRefreshOps.incr(); + } + + public void addCacheRefreshTime(long msec) { + cacheRefreshTime.add(msec); + } + + public void addSummaryDataReadTime(long msec) { + summaryDataReadTime.add(msec); + } + + public void addLogScanTime(long msec) { + logScanTime.add(msec); + } + + public void addLogCleanTime(long msec) { + logCleanTime.add(msec); + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java index bc80175..96bca8d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java @@ -98,13 +98,14 @@ boolean matchesGroupId(String groupId){ )); } - public void parseForStore(TimelineDataManager tdm, Path appDirPath, + public long parseForStore(TimelineDataManager tdm, Path appDirPath, boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper, FileSystem fs) throws IOException { LOG.debug("Parsing for log dir {} on attempt {}", appDirPath, attemptDirName); Path logPath = getPath(appDirPath); FileStatus status = fs.getFileStatus(logPath); + long numParsed = 0; if (status != null) { long startTime = Time.monotonicNow(); try { @@ -113,6 +114,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, objMapper, fs); LOG.info("Parsed {} entities from {} in {} msec", count, logPath, Time.monotonicNow() - startTime); + numParsed += count; } catch (RuntimeException e) { // If AppLogs cannot parse this log, it may be corrupted or just empty if (e.getCause() instanceof JsonParseException && @@ -125,6 +127,7 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, } else { LOG.warn("{} no longer exists. Skip for scanning. ", logPath); } + return numParsed; } private long parsePath(TimelineDataManager tdm, Path logPath,