diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index dd2a27d..16399a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -16,6 +16,8 @@ */ package org.apache.hadoop.yarn.server.timeline; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; /** * Cache item for timeline server v1.5 reader cache. Each cache item has a @@ -40,12 +43,17 @@ = LoggerFactory.getLogger(EntityCacheItem.class); private TimelineStore store; + private TimelineEntityGroupId groupId; private EntityGroupFSTimelineStore.AppLogs appLogs; private long lastRefresh; private Configuration config; private FileSystem fs; + private long refCount = 0L; + private static AtomicLong activeStores = new AtomicLong(0L); - public EntityCacheItem(Configuration config, FileSystem fs) { + public EntityCacheItem(TimelineEntityGroupId gId, Configuration config, + FileSystem fs) { + this.groupId = gId; this.config = config; this.fs = fs; } @@ -70,12 +78,20 @@ public synchronized void setAppLogs( /** * @return The timeline store, either loaded or unloaded, of this cache item. + * This method will not hold the storage from being reclaimed. */ public synchronized TimelineStore getStore() { return store; } /** + * @return The number of currently active stores in all CacheItems. + */ + public static long getActiveStores() { + return activeStores.get(); + } + + /** * Refresh this cache item if it needs refresh. This will enforce an appLogs * rescan and then load new data. The refresh process is synchronized with * other operations on the same cache item. @@ -105,6 +121,7 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, } if (!appLogs.getDetailLogs().isEmpty()) { if (store == null) { + activeStores.getAndIncrement(); store = new LevelDBCacheTimelineStore(groupId.toString(), "LeveldbCache." + groupId); store.init(config); @@ -148,11 +165,35 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, } /** - * Release the cache item for the given group id. + * Increase the number of references to this cache item by 1. + */ + public synchronized void incrRefs() { + refCount++; + } + + /** + * Unregister a reader. Try to release the cache if the reader to current + * cache reaches 0. * - * @param groupId the group id that the cache should release + * @return true if the cache has been released, otherwise false */ - public synchronized void releaseCache(TimelineEntityGroupId groupId) { + public synchronized boolean tryRelease() { + refCount--; + // Only reclaim the storage if there is no reader. + if (refCount > 0) { + LOG.debug("{} references left for cached group {}, skipping the release", + refCount, groupId); + return false; + } + forceRelease(); + return true; + } + + /** + * Force releasing the cache item for the given group id, even though there + * may be active references. + */ + public synchronized void forceRelease() { try { if (store != null) { store.close(); @@ -161,12 +202,21 @@ public synchronized void releaseCache(TimelineEntityGroupId groupId) { LOG.warn("Error closing timeline store", e); } store = null; + activeStores.getAndDecrement(); + refCount = 0L; // reset offsets so next time logs are re-parsed for (LogInfo log : appLogs.getDetailLogs()) { if (log.getFilename().contains(groupId.toString())) { log.setOffset(0); } } + LOG.debug("Cache for group {} released. ", groupId); + } + + @InterfaceAudience.Private + @VisibleForTesting + synchronized long getRefCount() { + return refCount; } private boolean needRefresh() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index edd430c..bd58f1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -172,7 +172,15 @@ protected boolean removeEldestEntry( TimelineEntityGroupId groupId = eldest.getKey(); LOG.debug("Evicting {} due to space limitations", groupId); EntityCacheItem cacheItem = eldest.getValue(); - cacheItem.releaseCache(groupId); + long activeStores = EntityCacheItem.getActiveStores(); + if (activeStores > appCacheMaxSize * 2) { + LOG.debug("Force release cache {} since {} stores are active", + groupId, activeStores); + cacheItem.forceRelease(); + } else { + LOG.debug("Try release cache {}", groupId); + cacheItem.tryRelease(); + } if (cacheItem.getAppLogs().isDone()) { appIdLogMap.remove(groupId.getApplicationId()); } @@ -826,17 +834,19 @@ void setFs(FileSystem incomingFs) { @InterfaceAudience.Private @VisibleForTesting void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { + cacheItem.incrRefs(); cachedLogs.put(groupId, cacheItem); } private List getTimelineStoresFromCacheIds( - Set groupIds, String entityType) + Set groupIds, String entityType, + List cacheItems) throws IOException { List stores = new LinkedList(); // For now we just handle one store in a context. We return the first // non-null storage for the group ids. for (TimelineEntityGroupId groupId : groupIds) { - TimelineStore storeForId = getCachedStore(groupId); + TimelineStore storeForId = getCachedStore(groupId, cacheItems); if (storeForId != null) { LOG.debug("Adding {} as a store for the query", storeForId.getName()); stores.add(storeForId); @@ -852,7 +862,8 @@ void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { } private List getTimelineStoresForRead(String entityId, - String entityType) throws IOException { + String entityType, List cacheItems) + throws IOException { Set groupIds = new HashSet(); for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { LOG.debug("Trying plugin {} for id {} and type {}", @@ -871,12 +882,12 @@ void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { cacheIdPlugin.getClass().getName()); } } - return getTimelineStoresFromCacheIds(groupIds, entityType); + return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems); } private List getTimelineStoresForRead(String entityType, - NameValuePair primaryFilter, Collection secondaryFilters) - throws IOException { + NameValuePair primaryFilter, Collection secondaryFilters, + List cacheItems) throws IOException { Set groupIds = new HashSet(); for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) { Set idsFromPlugin = @@ -888,24 +899,26 @@ void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) { groupIds.addAll(idsFromPlugin); } } - return getTimelineStoresFromCacheIds(groupIds, entityType); + return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems); } // find a cached timeline store or null if it cannot be located - private TimelineStore getCachedStore(TimelineEntityGroupId groupId) - throws IOException { + private TimelineStore getCachedStore(TimelineEntityGroupId groupId, + List cacheItems) throws IOException { EntityCacheItem cacheItem; synchronized (this.cachedLogs) { // Note that the content in the cache log storage may be stale. cacheItem = this.cachedLogs.get(groupId); if (cacheItem == null) { LOG.debug("Set up new cache item for id {}", groupId); - cacheItem = new EntityCacheItem(getConfig(), fs); + cacheItem = new EntityCacheItem(groupId, getConfig(), fs); AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId()); if (appLogs != null) { LOG.debug("Set applogs {} for group id {}", appLogs, groupId); cacheItem.setAppLogs(appLogs); this.cachedLogs.put(groupId, cacheItem); + // Add the reference by the cache + cacheItem.incrRefs(); } else { LOG.warn("AppLogs for groupId {} is set to null!", groupId); } @@ -915,6 +928,9 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId) if (cacheItem.getAppLogs() != null) { AppLogs appLogs = cacheItem.getAppLogs(); LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId()); + // Add the reference by the store + cacheItem.incrRefs(); + cacheItems.add(cacheItem); store = cacheItem.refreshCache(groupId, aclManager, jsonFactory, objMapper, metrics); } else { @@ -923,22 +939,32 @@ private TimelineStore getCachedStore(TimelineEntityGroupId groupId) return store; } + private void tryReleaseCacheItems(List relatedCacheItems) { + for (EntityCacheItem item : relatedCacheItems) { + item.tryRelease(); + } + } + @Override public TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection secondaryFilters, EnumSet fieldsToRetrieve, CheckAcl checkAcl) throws IOException { LOG.debug("getEntities type={} primary={}", entityType, primaryFilter); + List relatedCacheItems = new ArrayList<>(); List stores = getTimelineStoresForRead(entityType, - primaryFilter, secondaryFilters); + primaryFilter, secondaryFilters, relatedCacheItems); TimelineEntities returnEntities = new TimelineEntities(); for (TimelineStore store : stores) { LOG.debug("Try timeline store {} for the request", store.getName()); - returnEntities.addEntities( - store.getEntities(entityType, limit, windowStart, windowEnd, fromId, - fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve, - checkAcl).getEntities()); + TimelineEntities entities = store.getEntities(entityType, limit, + windowStart, windowEnd, fromId, fromTs, primaryFilter, + secondaryFilters, fieldsToRetrieve, checkAcl); + if (entities != null) { + returnEntities.addEntities(entities.getEntities()); + } } + tryReleaseCacheItems(relatedCacheItems); return returnEntities; } @@ -946,17 +972,21 @@ public TimelineEntities getEntities(String entityType, Long limit, public TimelineEntity getEntity(String entityId, String entityType, EnumSet fieldsToRetrieve) throws IOException { LOG.debug("getEntity type={} id={}", entityType, entityId); - List stores = getTimelineStoresForRead(entityId, entityType); + List relatedCacheItems = new ArrayList<>(); + List stores = getTimelineStoresForRead(entityId, entityType, + relatedCacheItems); for (TimelineStore store : stores) { LOG.debug("Try timeline store {}:{} for the request", store.getName(), store.toString()); TimelineEntity e = store.getEntity(entityId, entityType, fieldsToRetrieve); if (e != null) { + tryReleaseCacheItems(relatedCacheItems); return e; } } LOG.debug("getEntity: Found nothing"); + tryReleaseCacheItems(relatedCacheItems); return null; } @@ -966,10 +996,11 @@ public TimelineEvents getEntityTimelines(String entityType, Long windowEnd, Set eventTypes) throws IOException { LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds); TimelineEvents returnEvents = new TimelineEvents(); + List relatedCacheItems = new ArrayList<>(); for (String entityId : entityIds) { LOG.debug("getEntityTimeline type={} id={}", entityType, entityId); List stores - = getTimelineStoresForRead(entityId, entityType); + = getTimelineStoresForRead(entityId, entityType, relatedCacheItems); for (TimelineStore store : stores) { LOG.debug("Try timeline store {}:{} for the request", store.getName(), store.toString()); @@ -978,9 +1009,12 @@ public TimelineEvents getEntityTimelines(String entityType, TimelineEvents events = store.getEntityTimelines(entityType, entityIdSet, limit, windowStart, windowEnd, eventTypes); - returnEvents.addEvents(events.getAllEvents()); + if (events != null) { + returnEvents.addEvents(events.getAllEvents()); + } } } + tryReleaseCacheItems(relatedCacheItems); return returnEvents; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java index 71e26cb..d245ae5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java @@ -26,7 +26,7 @@ class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin { - private static TimelineEntityGroupId timelineEntityGroupId + private static final TimelineEntityGroupId timelineEntityGroupId = TimelineEntityGroupId.newInstance( TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index 7a8ff2f..0a16048 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -276,7 +276,8 @@ public void testPluginRead() throws Exception { EntityGroupFSTimelineStore.AppLogs appLogs = store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath, AppState.COMPLETED); - EntityCacheItem cacheItem = new EntityCacheItem(config, fs); + EntityCacheItem cacheItem = new EntityCacheItem( + EntityGroupPlugInForTest.getStandardTimelineGroupId(), config, fs); cacheItem.setAppLogs(appLogs); store.setCachedLogs( EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem); @@ -296,11 +297,13 @@ public void testPluginRead() throws Exception { UserGroupInformation.getLoginUser()); assertNotNull(entity3); assertEquals(entityNew.getStartTime(), entity3.getStartTime()); + assertEquals(1, cacheItem.getRefCount()); + assertEquals(1L, EntityCacheItem.getActiveStores()); // Verify multiple entities read TimelineEntities entities = tdm.getEntities("type_3", null, null, null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class), UserGroupInformation.getLoginUser()); - assertEquals(entities.getEntities().size(), 1); + assertEquals(1, entities.getEntities().size()); for (TimelineEntity entity : entities.getEntities()) { assertEquals(entityNew.getStartTime(), entity.getStartTime()); }