Index: src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java (revision 1814560) +++ src/main/java/org/apache/jackrabbit/oak/segment/CacheWeights.java (working copy) @@ -67,14 +67,15 @@ return (Weigher) NOOP_WEIGHER; } + static int segmentWeight(Segment segment) { + return SEGMENT_CACHE_OVERHEAD + segment.estimateMemoryUsage(); + } + public static class SegmentCacheWeigher implements Weigher { @Override public int weigh(@Nonnull SegmentId id, @Nonnull Segment segment) { - int size = SEGMENT_CACHE_OVERHEAD; - // segmentId weight estimation is included in segment - size += segment.estimateMemoryUsage(); - return size; + return segmentWeight(segment); } } Index: src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java (revision 1814560) +++ src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java (working copy) @@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.segment; +import static org.apache.jackrabbit.oak.segment.CacheWeights.segmentWeight; + import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -28,132 +30,136 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheStats; import com.google.common.cache.RemovalNotification; -import com.google.common.cache.Weigher; import org.apache.jackrabbit.oak.cache.AbstractCacheStats; import org.apache.jackrabbit.oak.segment.CacheWeights.SegmentCacheWeigher; /** - * A cache for {@link SegmentId#isDataSegmentId() data} {@link Segment} instances by their - * {@link SegmentId}. This cache ignores {@link SegmentId#isBulkSegmentId() bulk} segments. + * A cache for {@link SegmentId#isDataSegmentId() data} {@link Segment} + * instances by their {@link SegmentId}. This cache ignores {@link + * SegmentId#isBulkSegmentId() bulk} segments. *

- * Conceptually this cache serves as a 2nd level cache for segments. The 1st level cache is - * implemented by memoising the segment in its id (see {@link SegmentId#segment}. Every time - * an segment is evicted from this cache the memoised segment is discarded (see - * {@link SegmentId#unloaded()}) and {@link SegmentId#onAccess}. + * Conceptually this cache serves as a 2nd level cache for segments. The 1st + * level cache is implemented by memoising the segment in its id (see {@link + * SegmentId#segment}. Every time an segment is evicted from this cache the + * memoised segment is discarded (see {@link + * SegmentId#onAccess}. */ public class SegmentCache { - /** Default maximum weight of this cache in MB */ - public static final int DEFAULT_SEGMENT_CACHE_MB = 256; - /** Weigher to determine the current weight of all items in this cache */ - private final Weigher weigher = new SegmentCacheWeigher(); + /** + * Default maximum weight of this cache in MB + */ + public static final int DEFAULT_SEGMENT_CACHE_MB = 256; - /** Maximum weight of the items in this cache */ + /** + * Maximum weight of the items in this cache + */ private final long maximumWeight; - /** Cache of recently accessed segments */ + /** + * Cache of recently accessed segments + */ @Nonnull private final Cache cache; /** - * Statistics of this cache. Do to the special access patter (see class comment), we cannot - * rely on {@link Cache#stats()}. + * Statistics of this cache. Do to the special access patter (see class + * comment), we cannot rely on {@link Cache#stats()}. */ @Nonnull private final Stats stats = new Stats("Segment Cache"); /** * Create a new segment cache of the given size. - * @param cacheSizeMB size of the cache in megabytes. + * + * @param cacheSizeMB size of the cache in megabytes. */ public SegmentCache(long cacheSizeMB) { this.maximumWeight = cacheSizeMB * 1024 * 1024; this.cache = CacheBuilder.newBuilder() - .concurrencyLevel(16) - .maximumWeight(maximumWeight) - .weigher(weigher) - .removalListener(this::onRemove) - .build(); + .concurrencyLevel(16) + .maximumWeight(maximumWeight) + .weigher(new SegmentCacheWeigher()) + .removalListener(this::onRemove) + .build(); } /** - * Create a new segment cache with the {@link #DEFAULT_SEGMENT_CACHE_MB default size}. + * Removal handler called whenever an item is evicted from the cache. */ - public SegmentCache() { - this(DEFAULT_SEGMENT_CACHE_MB); + private void onRemove(@Nonnull RemovalNotification notification) { + stats.currentWeight.addAndGet(-segmentWeight(notification.getValue())); + stats.evictionCount.incrementAndGet(); } /** - * Removal handler called whenever an item is evicted from the cache. Propagates - * to {@link SegmentId#unloaded()}. + * Retrieve an segment from the cache or load it and cache it if not yet in + * the cache. + * + * @param id the id of the segment + * @param loader the loader to load the segment if not yet in the cache + * @return the segment identified by {@code id} + * @throws ExecutionException when {@code loader} failed to load an segment */ - private void onRemove(@Nonnull RemovalNotification notification) { - SegmentId id = notification.getKey(); - if (id != null) { - Segment segment = notification.getValue(); - if (segment != null) { - stats.currentWeight.addAndGet(-weigher.weigh(id, segment)); + @Nonnull + public Segment getSegment(@Nonnull final SegmentId id, @Nonnull final Callable loader) throws ExecutionException { + Segment segment; + + if (id.isDataSegmentId()) { + segment = cache.get(id, newStatsUpdateLoader(loader)); + } else { + try { + segment = loader.call(); + } catch (Exception e) { + throw new ExecutionException(e); } - stats.evictionCount.incrementAndGet(); - id.unloaded(); } - } - /** Unconditionally put an item in the cache */ - private Segment put(@Nonnull SegmentId id, @Nonnull Segment segment) { - // Call loaded *before* putting the segment into the cache as the latter - // might cause it to get evicted right away again. id.loaded(segment); - cache.put(id, segment); - stats.currentWeight.addAndGet(weigher.weigh(id, segment)); return segment; } - /** - * Retrieve an segment from the cache or load it and cache it if not yet in the cache. - * @param id the id of the segment - * @param loader the loader to load the segment if not yet in the cache - * @return the segment identified by {@code id} - * @throws ExecutionException when {@code loader} failed to load an segment - */ - @Nonnull - public Segment getSegment(@Nonnull final SegmentId id, @Nonnull final Callable loader) - throws ExecutionException { - // Load bulk segment directly without putting it in cache - try { - if (id.isBulkSegmentId()) { - return loader.call(); + private Callable newStatsUpdateLoader(Callable loader) { + return () -> { + try { + long t0 = System.nanoTime(); + Segment segment = loader.call(); + stats.loadSuccessCount.incrementAndGet(); + stats.loadTime.addAndGet(System.nanoTime() - t0); + stats.missCount.incrementAndGet(); + stats.currentWeight.addAndGet(segmentWeight(segment)); + return segment; + } catch (Exception e) { + stats.loadExceptionCount.incrementAndGet(); + throw e; } - } catch (Exception e) { - throw new ExecutionException(e); - } - - // Load data segment and put it in the cache - try { - long t0 = System.nanoTime(); - Segment segment = loader.call(); - stats.loadSuccessCount.incrementAndGet(); - stats.loadTime.addAndGet(System.nanoTime() - t0); - stats.missCount.incrementAndGet(); - - return put(id, segment); - } catch (Exception e) { - stats.loadExceptionCount.incrementAndGet(); - throw new ExecutionException(e); - } + }; } /** - * Put a segment into the cache. This method does nothing for - * {@link SegmentId#isBulkSegmentId() bulk} segments. - * @param segment the segment to cache + * Put a segment into the cache. This method does nothing for {@link + * SegmentId#isBulkSegmentId() bulk} segments. + * + * @param segment the segment to cache */ public void putSegment(@Nonnull Segment segment) { SegmentId id = segment.getSegmentId(); - if (!id.isBulkSegmentId()) { - put(id, segment); + + Segment internal; + + if (id.isDataSegmentId()) { + try { + internal = cache.get(id, newStatsUpdateLoader(() -> segment)); + } catch (ExecutionException e) { + throw new IllegalStateException("unable to store the segment", e.getCause()); + } + } else { + internal = segment; } + + id.loaded(internal); } /** @@ -164,7 +170,7 @@ } /** - * @return Statistics for this cache. + * @return Statistics for this cache. */ @Nonnull public AbstractCacheStats getCacheStats() { @@ -173,16 +179,20 @@ /** * Record a hit in this cache's underlying statistics. + * * @see SegmentId#onAccess */ public void recordHit() { stats.hitCount.incrementAndGet(); } - /** We cannot rely on the statistics of the underlying Guava cache as all cache hits - * are taken by {@link SegmentId#getSegment()} and thus never seen by the cache. + /** + * We cannot rely on the statistics of the underlying Guava cache as all + * cache hits are taken by {@link SegmentId#getSegment()} and thus never + * seen by the cache. */ private class Stats extends AbstractCacheStats { + @Nonnull final AtomicLong currentWeight = new AtomicLong(); @@ -209,14 +219,15 @@ } @Override - protected com.google.common.cache.CacheStats getCurrentStats() { - return new com.google.common.cache.CacheStats( - hitCount.get(), - missCount.get(), - loadSuccessCount.get(), - loadExceptionCount.get(), - loadTime.get(), - evictionCount.get()); + protected CacheStats getCurrentStats() { + return new CacheStats( + hitCount.get(), + missCount.get(), + loadSuccessCount.get(), + loadExceptionCount.get(), + loadTime.get(), + evictionCount.get() + ); } @Override @@ -233,6 +244,7 @@ public long estimateCurrentWeight() { return currentWeight.get(); } + } -} \ No newline at end of file +} Index: src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java (revision 1814560) +++ src/main/java/org/apache/jackrabbit/oak/segment/SegmentId.java (working copy) @@ -141,7 +141,6 @@ * {@link #segment} field. * @return the segment identified by this instance. * @see #loaded(Segment) - * @see #unloaded() */ @Nonnull public Segment getSegment() { @@ -198,7 +197,6 @@ * passed {@code segment} has been loaded and should be memoised. * @param segment segment with this id. If the id doesn't match the behaviour is undefined. * @see #getSegment() - * @see #unloaded() */ void loaded(@Nonnull Segment segment) { this.segment = segment; @@ -206,16 +204,6 @@ } /** - * This method should only be called from lower level caches to notify this instance that the - * passed {@code segment} has been unloaded and should no longer be memoised. - * @see #getSegment() - * @see #loaded(Segment) - */ - void unloaded() { - this.segment = null; - } - - /** * Determine whether this instance belongs to the passed {@code store} * @param store * @return {@code true} iff this instance belongs to {@code store} Index: src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java (revision 1814560) +++ src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java (working copy) @@ -29,6 +29,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; @@ -129,11 +130,14 @@ protected final IOMonitor ioMonitor; + static final AtomicInteger segmentIdAllocations = new AtomicInteger(); + AbstractFileStore(final FileStoreBuilder builder) { this.directory = builder.getDirectory(); this.tracker = new SegmentTracker(new SegmentIdFactory() { @Override @Nonnull public SegmentId newSegmentId(long msb, long lsb) { + segmentIdAllocations.incrementAndGet(); return new SegmentId(AbstractFileStore.this, msb, lsb, segmentCache::recordHit); } }); @@ -144,7 +148,7 @@ this.ioMonitor = builder.getIOMonitor(); } - static SegmentNotFoundException asSegmentNotFoundException(ExecutionException e, SegmentId id) { + static SegmentNotFoundException asSegmentNotFoundException(Exception e, SegmentId id) { if (e.getCause() instanceof SegmentNotFoundException) { return (SegmentNotFoundException) e.getCause(); } Index: src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (revision 1814560) +++ src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java (working copy) @@ -59,6 +59,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -72,6 +73,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.io.Closer; +import com.google.common.util.concurrent.UncheckedExecutionException; import org.apache.jackrabbit.oak.segment.Compactor; import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; @@ -164,6 +166,8 @@ @Nonnull private final SegmentNotFoundExceptionListener snfeListener; + private static final AtomicInteger uncachedSegmentReads = new AtomicInteger(); + FileStore(final FileStoreBuilder builder) throws InvalidFileStoreVersionException, IOException { super(builder); @@ -477,6 +481,8 @@ fileReaper.reap(); log.info("TarMK closed: {}", directory); + log.info("TarMK segment ID allocations: {}", segmentIdAllocations); + log.info("TarMK uncached segment reads: {}", uncachedSegmentReads); } @Override @@ -490,8 +496,11 @@ @Nonnull public Segment readSegment(final SegmentId id) { try (ShutDownCloser ignored = shutDown.keepAlive()) { - return segmentCache.getSegment(id, () -> readSegmentUncached(tarFiles, id)); - } catch (ExecutionException e) { + return segmentCache.getSegment(id, () -> { + uncachedSegmentReads.incrementAndGet(); + return readSegmentUncached(tarFiles, id); + }); + } catch (ExecutionException | UncheckedExecutionException e) { SegmentNotFoundException snfe = asSegmentNotFoundException(e, id); snfeListener.notify(id, snfe); throw snfe; Index: src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java (revision 1814560) +++ src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java (working copy) @@ -31,11 +31,11 @@ import org.junit.Test; public class SegmentCacheTest { - private final SegmentCache cache = new SegmentCache(); + private final SegmentCache cache = new SegmentCache(SegmentCache.DEFAULT_SEGMENT_CACHE_MB); - private final SegmentId id1 = new SegmentId(EMPTY_STORE, -1, -1, cache::recordHit); + private final SegmentId id1 = new SegmentId(EMPTY_STORE, 0x0000000000000001L, 0xa000000000000001L, cache::recordHit); private final Segment segment1 = mock(Segment.class); - private final SegmentId id2 = new SegmentId(EMPTY_STORE, -1, -2, cache::recordHit); + private final SegmentId id2 = new SegmentId(EMPTY_STORE, 0x0000000000000002L, 0xa000000000000002L, cache::recordHit); private final Segment segment2 = mock(Segment.class); { @@ -111,4 +111,4 @@ assertEquals(0, stats.getRequestCount()); } -} \ No newline at end of file +}