diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java index f6bdeb2..6fb4e96 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java @@ -16,14 +16,10 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache; -import static com.google.common.cache.RemovalCause.COLLECTED; -import static com.google.common.cache.RemovalCause.EXPIRED; -import static com.google.common.cache.RemovalCause.SIZE; import static java.util.Collections.singleton; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -44,13 +40,9 @@ import org.h2.mvstore.type.DataType; import com.google.common.base.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheStats; -import com.google.common.cache.RemovalCause; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -class NodeCache implements Cache, GenerationCache, EvictionListener { - - private static final Set EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE); +class NodeCache implements Cache, GenerationCache { private final PersistentCache cache; private final PersistentCacheStats stats; @@ -132,6 +124,19 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< }); } + private void write(final K key, final V value) { + if (value == null) { + writerQueue.addInvalidate(singleton(key)); + } else { + writerQueue.addPut(key, value); + } + long memory = 0L; + memory += (key == null ? 0L: keyType.getMemory(key)); + memory += (value == null ? 0L: valueType.getMemory(value)); + stats.markBytesWritten(memory); + stats.markPut(); + } + @SuppressWarnings("unchecked") @Override @Nullable @@ -166,6 +171,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< try { value = memCache.get(key, valueLoader); ctx.stop(); + write((K) key, value); broadcast(key, value); return value; } catch (ExecutionException e) { @@ -183,6 +189,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< @Override public void put(K key, V value) { memCache.put(key, value); + write((K) key, value); broadcast(key, value); } @@ -190,7 +197,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< @Override public void invalidate(Object key) { memCache.invalidate(key); - writerQueue.addInvalidate(singleton((K) key)); + write((K) key, null); broadcast((K) key, null); stats.markInvalidateOne(); } @@ -245,23 +252,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< memCache.put(key, value); } stats.markRecvBroadcast(); - } - - /** - * Invoked on the eviction from the {@link #memCache} - */ - @Override - public void evicted(K key, V value, RemovalCause cause) { - if (EVICTION_CAUSES.contains(cause) && value != null) { - // invalidations are handled separately - writerQueue.addPut(key, value); - - long memory = 0L; - memory += (key == null ? 0L: keyType.getMemory(key)); - memory += (value == null ? 0L: valueType.getMemory(value)); - stats.markBytesWritten(memory); - stats.markPut(); - } + write(key, value); } public PersistentCacheStats getPersistentCacheStats() { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java index c6ce237..935f939 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java @@ -16,13 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; -import static com.google.common.collect.Multimaps.index; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -30,29 +23,14 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; - /** - * An asynchronous buffer of the CacheAction objects. The buffer removes - * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than - * {@link #MAX_SIZE}. + * An asynchronous buffer of the CacheAction objects. */ public class CacheActionDispatcher implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(CacheActionDispatcher.class); - /** - * What's the length of the queue. - */ - static final int MAX_SIZE = 1024; - - /** - * How many actions remove once the queue is longer than {@link #MAX_SIZE}. - */ - static final int ACTIONS_TO_REMOVE = 256; - - final BlockingQueue> queue = new ArrayBlockingQueue>(MAX_SIZE * 2); + final BlockingQueue> queue = new ArrayBlockingQueue>(4096); private volatile boolean isRunning = true; @@ -79,80 +57,16 @@ public class CacheActionDispatcher implements Runnable { } /** - * Adds the new action and cleans the queue if necessary. + * Adds the new action. This is a blocking operation. * * @param action to be added */ - synchronized void add(CacheAction action) { - if (queue.size() >= MAX_SIZE) { - cleanTheQueue(); - } - queue.offer(action); - } - - /** - * Clean the queue and add a single invalidate action for all the removed entries. - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void cleanTheQueue() { - List removed = removeOldest(); - for (Entry> e : groupByOwner(removed).entrySet()) { - CacheWriteQueue owner = e.getKey(); - Collection actions = e.getValue(); - List affectedKeys = cancelAll(actions); - owner.addInvalidate(affectedKeys); - } - } - - /** - * Remove {@link #ACTIONS_TO_REMOVE} oldest actions. - * - * @return A list of removed items. - */ - @SuppressWarnings("rawtypes") - private List removeOldest() { - List removed = new ArrayList(); - while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) { - CacheAction toBeCanceled = queue.poll(); - if (toBeCanceled == null) { - break; - } else { - removed.add(toBeCanceled); - } - } - return removed; - } - - /** - * Group passed actions by their owners. - * - * @param actions to be grouped - * @return map in which owner is the key and assigned action list is the value - */ - @SuppressWarnings("rawtypes") - private static Map> groupByOwner(List actions) { - return index(actions, new Function() { - @Override - public CacheWriteQueue apply(CacheAction input) { - return input.getOwner(); - } - }).asMap(); - } - - /** - * Cancel all passed actions. - * - * @param actions to cancel - * @return list of affected keys - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - private static List cancelAll(Collection actions) { - List cancelledKeys = new ArrayList(); - for (CacheAction action : actions) { - action.cancel(); - Iterables.addAll(cancelledKeys, action.getAffectedKeys()); + void add(CacheAction action) { + try { + queue.put(action); + } catch (InterruptedException e) { + LOG.error("Can't put item", e); } - return cancelledKeys; } @SuppressWarnings("rawtypes") diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java index d35f5c6..3451d3b 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java @@ -19,18 +19,12 @@ package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; import static com.google.common.collect.ImmutableSet.of; -import static com.google.common.collect.Iterables.size; -import static java.lang.String.valueOf; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; -import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE; -import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.Collections; @@ -51,28 +45,6 @@ import org.mockito.Mockito; public class CacheActionDispatcherTest { @Test - public void testMaxQueueSize() { - CacheActionDispatcher dispatcher = new CacheActionDispatcher(); - @SuppressWarnings({ "unchecked", "rawtypes" }) - CacheWriteQueue queue = new CacheWriteQueue(dispatcher, mock(PersistentCache.class), null); - - for (int i = 0; i < MAX_SIZE + 10; i++) { - dispatcher.add(createWriteAction(valueOf(i), queue)); - } - assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size()); - assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString()); - - InvalidateCacheAction invalidateAction = null; - for (CacheAction action : dispatcher.queue) { - if (action instanceof InvalidateCacheAction) { - invalidateAction = (InvalidateCacheAction) action; - } - } - assertNotNull(invalidateAction); - assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys())); - } - - @Test public void testQueue() throws InterruptedException { int threads = 5; int actionsPerThread = 100; @@ -153,10 +125,6 @@ public class CacheActionDispatcherTest { assertTrue(cacheMap.isEmpty()); } - private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue queue) { - return new DummyCacheWriteAction(id, queue); - } - private class DummyCacheWriteAction implements CacheAction { private final CacheWriteQueue queue;