diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java new file mode 100644 index 0000000..3f7e349 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/CacheMetadata.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.collect.Maps.newConcurrentMap; + +public class CacheMetadata { + + private final ConcurrentMap metadataMap = newConcurrentMap(); + + private boolean enabled = true; + + boolean isEnabled() { + return enabled; + } + + void disable() { + this.enabled = false; + } + + void put(K key) { + if (!enabled) { + return; + } + getOrCreate(key, false); + } + + void putFromPersistenceAndIncrement(K key) { + if (!enabled) { + return; + } + getOrCreate(key, true).incrementCount(); + } + + void increment(K key) { + if (!enabled) { + return; + } + getOrCreate(key, false).incrementCount(); + } + + MetadataEntry remove(Object key) { + if (!enabled) { + return null; + } + return metadataMap.remove(key); + } + + void putAll(Iterable keys) { + if (!enabled) { + return; + } + for (Object k : keys) { + getOrCreate((K) k, false); + } + } + + void incrementAll(Iterable keys) { + if (!enabled) { + return; + } + for (Object k : keys) { + getOrCreate((K) k, false).incrementCount(); + } + } + + void removeAll(Iterable keys) { + if (!enabled) { + return; + } + for (Object k : keys) { + metadataMap.remove(k); + } + } + + void clear() { + if (!enabled) { + return; + } + metadataMap.clear(); + } + + private MetadataEntry getOrCreate(K key, boolean readFromPersistentCache) { + if (!enabled) { + return null; + } + MetadataEntry metadata = metadataMap.get(key); + if (metadata == null) { + MetadataEntry newEntry = new MetadataEntry(readFromPersistentCache); + MetadataEntry oldEntry = metadataMap.putIfAbsent(key, newEntry); + metadata = oldEntry == null ? newEntry : oldEntry; + } + return metadata; + } + + + static class MetadataEntry { + + private final AtomicLong accessCount = new AtomicLong(); + + private final boolean readFromPersistentCache; + + private MetadataEntry(boolean readFromPersistentCache) { + this.readFromPersistentCache = readFromPersistentCache; + } + + void incrementCount() { + accessCount.incrementAndGet(); + } + + long getAccessCount() { + return accessCount.get(); + } + + boolean isReadFromPersistentCache() { + return readFromPersistentCache; + } + } + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java index 8efadca..5dffef3 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java @@ -70,6 +70,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< private final DataType keyType; private final DataType valueType; private final CacheWriteQueue writerQueue; + private final CacheMetadata memCacheMetadata; NodeCache( PersistentCache cache, @@ -86,12 +87,14 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< map = new MultiGenerationMap(); keyType = new KeyDataType(type); valueType = new ValueDataType(docNodeStore, docStore, type); + this.memCacheMetadata = new CacheMetadata(); if (ASYNC_CACHE) { this.writerQueue = new CacheWriteQueue(dispatcher, cache, map); - LOG.info("The persistent cache writes will be asynchronous"); + LOG.info("The persistent cache {} writes will be asynchronous", type); } else { this.writerQueue = null; - LOG.info("The persistent cache writes will be synchronous"); + this.memCacheMetadata.disable(); + LOG.info("The persistent cache {} writes will be synchronous", type); } this.stats = new PersistentCacheStats(type, statisticsProvider); } @@ -121,9 +124,6 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< } private V readIfPresent(K key) { - if (ASYNC_CACHE && writerQueue.waitsForInvalidation(key)) { - return null; - } cache.switchGenerationIfNeeded(); TimerStats.Context ctx = stats.startReadTimer(); V v = map.get(key); @@ -169,6 +169,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< public V getIfPresent(Object key) { V value = memCache.getIfPresent(key); if (value != null) { + memCacheMetadata.increment((K) key); return value; } stats.markRequest(); @@ -176,6 +177,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< value = readIfPresent((K) key); if (value != null) { memCache.put((K) key, value); + memCacheMetadata.putFromPersistenceAndIncrement((K) key); stats.markHit(); } return value; @@ -196,6 +198,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< TimerStats.Context ctx = stats.startLoaderTimer(); try { value = memCache.get(key, valueLoader); + memCacheMetadata.increment(key); ctx.stop(); if (!ASYNC_CACHE) { write((K) key, value); @@ -211,12 +214,15 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< @Override public ImmutableMap getAllPresent( Iterable keys) { - return memCache.getAllPresent(keys); + ImmutableMap result = memCache.getAllPresent(keys); + memCacheMetadata.incrementAll(keys); + return result; } @Override public void put(K key, V value) { memCache.put(key, value); + memCacheMetadata.put(key); if (!ASYNC_CACHE) { write((K) key, value); } @@ -227,6 +233,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< @Override public void invalidate(Object key) { memCache.invalidate(key); + memCacheMetadata.remove(key); if (ASYNC_CACHE) { writerQueue.addInvalidate(singleton((K) key)); } else { @@ -239,16 +246,19 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< @Override public void putAll(Map m) { memCache.putAll(m); + memCacheMetadata.putAll(m.keySet()); } @Override public void invalidateAll(Iterable keys) { memCache.invalidateAll(keys); + memCacheMetadata.removeAll(keys); } @Override public void invalidateAll() { memCache.invalidateAll(); + memCacheMetadata.clear(); map.clear(); stats.markInvalidateAll(); } @@ -271,6 +281,7 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< @Override public void cleanUp() { memCache.cleanUp(); + memCacheMetadata.clear(); } @Override @@ -281,9 +292,11 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< if (buff.get() == 0) { value = null; memCache.invalidate(key); + memCacheMetadata.remove(key); } else { value = (V) valueType.read(buff); memCache.put(key, value); + memCacheMetadata.put(key); } stats.markRecvBroadcast(); if (!ASYNC_CACHE) { @@ -298,7 +311,9 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< public void evicted(K key, V value, RemovalCause cause) { if (ASYNC_CACHE && EVICTION_CAUSES.contains(cause) && value != null) { // invalidations are handled separately - writerQueue.addPut(key, value); + if (qualifiesToPersist(memCacheMetadata.remove(key))) { + writerQueue.addPut(key, value); + } long memory = 0L; memory += (key == null ? 0L: keyType.getMemory(key)); @@ -308,6 +323,10 @@ class NodeCache implements Cache, GenerationCache, EvictionListener< } } + private boolean qualifiesToPersist(CacheMetadata.MetadataEntry metadata) { + return metadata == null || (!metadata.isReadFromPersistentCache() && metadata.getAccessCount() > 0); + } + public PersistentCacheStats getPersistentCacheStats() { return stats; } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java index 26078e1..0decbbe 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java @@ -29,23 +29,4 @@ interface CacheAction { */ void execute(); - /** - * Cancel the action without executing it - */ - void cancel(); - - /** - * Return the keys affected by this action - * - * @return keys affected by this action - */ - Iterable getAffectedKeys(); - - /** - * Return the owner of this action - * - * @return {@link CacheWriteQueue} executing this action - */ - CacheWriteQueue getOwner(); - } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java index c6ce237..27b66ed 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java @@ -34,9 +34,8 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; /** - * An asynchronous buffer of the CacheAction objects. The buffer removes - * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than - * {@link #MAX_SIZE}. + * An asynchronous buffer of the CacheAction objects. The buffer only accepts + * {@link #MAX_SIZE} number of elements. */ public class CacheActionDispatcher implements Runnable { @@ -45,14 +44,9 @@ public class CacheActionDispatcher implements Runnable { /** * What's the length of the queue. */ - static final int MAX_SIZE = 1024; + static final int MAX_SIZE = 2048; - /** - * How many actions remove once the queue is longer than {@link #MAX_SIZE}. - */ - static final int ACTIONS_TO_REMOVE = 256; - - final BlockingQueue> queue = new ArrayBlockingQueue>(MAX_SIZE * 2); + final BlockingQueue> queue = new ArrayBlockingQueue>(MAX_SIZE); private volatile boolean isRunning = true; @@ -68,7 +62,6 @@ public class CacheActionDispatcher implements Runnable { LOG.debug("Interrupted the queue.poll()", e); } } - applyInvalidateActions(); } /** @@ -79,91 +72,11 @@ public class CacheActionDispatcher implements Runnable { } /** - * Adds the new action and cleans the queue if necessary. + * Tries to add new action. * * @param action to be added */ - synchronized void add(CacheAction action) { - if (queue.size() >= MAX_SIZE) { - cleanTheQueue(); - } + void add(CacheAction action) { queue.offer(action); } - - /** - * Clean the queue and add a single invalidate action for all the removed entries. - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - private void cleanTheQueue() { - List removed = removeOldest(); - for (Entry> e : groupByOwner(removed).entrySet()) { - CacheWriteQueue owner = e.getKey(); - Collection actions = e.getValue(); - List affectedKeys = cancelAll(actions); - owner.addInvalidate(affectedKeys); - } - } - - /** - * Remove {@link #ACTIONS_TO_REMOVE} oldest actions. - * - * @return A list of removed items. - */ - @SuppressWarnings("rawtypes") - private List removeOldest() { - List removed = new ArrayList(); - while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) { - CacheAction toBeCanceled = queue.poll(); - if (toBeCanceled == null) { - break; - } else { - removed.add(toBeCanceled); - } - } - return removed; - } - - /** - * Group passed actions by their owners. - * - * @param actions to be grouped - * @return map in which owner is the key and assigned action list is the value - */ - @SuppressWarnings("rawtypes") - private static Map> groupByOwner(List actions) { - return index(actions, new Function() { - @Override - public CacheWriteQueue apply(CacheAction input) { - return input.getOwner(); - } - }).asMap(); - } - - /** - * Cancel all passed actions. - * - * @param actions to cancel - * @return list of affected keys - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - private static List cancelAll(Collection actions) { - List cancelledKeys = new ArrayList(); - for (CacheAction action : actions) { - action.cancel(); - Iterables.addAll(cancelledKeys, action.getAffectedKeys()); - } - return cancelledKeys; - } - - @SuppressWarnings("rawtypes") - private void applyInvalidateActions() { - CacheAction action; - do { - action = queue.poll(); - if (action instanceof InvalidateCacheAction) { - action.execute(); - } - } while (action != null); - } - } \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java index 01a0a27..2e0843e 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java @@ -16,21 +16,10 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; +import java.util.Map; -/** - * A fronted for the {@link CacheActionDispatcher} creating actions and maintaining their state. - * - * @param key type - * @param value type - */ public class CacheWriteQueue { private final CacheActionDispatcher dispatcher; @@ -39,65 +28,18 @@ public class CacheWriteQueue { private final Map map; - final Multiset queuedKeys = HashMultiset.create(); - - final Set waitsForInvalidation = new HashSet(); - public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache cache, Map map) { this.dispatcher = dispatcher; this.cache = cache; this.map = map; } - /** - * Add new invalidate action. - * - * @param keys to be invalidated - */ - public void addInvalidate(Iterable keys) { - synchronized(this) { - for (K key : keys) { - queuedKeys.add(key); - waitsForInvalidation.add(key); - } - } - dispatcher.add(new InvalidateCacheAction(this, keys)); - } - - /** - * Add new put action - * - * @param key to be put to cache - * @param value to be put to cache - */ public void addPut(K key, V value) { - synchronized(this) { - queuedKeys.add(key); - waitsForInvalidation.remove(key); - } - dispatcher.add(new PutToCacheAction(this, key, value)); + dispatcher.add(new PutToCacheAction(key, value, this)); } - /** - * Check if the last action added for this key was invalidate - * - * @param key to check - * @return {@code true} if the last added action was invalidate - */ - public synchronized boolean waitsForInvalidation(K key) { - return waitsForInvalidation.contains(key); - } - - /** - * Remove the action state when it's finished or cancelled. - * - * @param key to be removed - */ - synchronized void remove(K key) { - queuedKeys.remove(key); - if (!queuedKeys.contains(key)) { - waitsForInvalidation.remove(key); - } + public void addInvalidate(Iterable keys) { + dispatcher.add(new InvalidateCacheAction(keys, this)); } PersistentCache getCache() { @@ -107,4 +49,4 @@ public class CacheWriteQueue { Map getMap() { return map; } -} \ No newline at end of file +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java index e4d145c..38e1a60 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java @@ -32,49 +32,22 @@ class InvalidateCacheAction implements CacheAction { private final Map map; - private final CacheWriteQueue owner; - private final Iterable keys; - InvalidateCacheAction(CacheWriteQueue cacheWriteQueue, Iterable keys) { - this.owner = cacheWriteQueue; + InvalidateCacheAction(Iterable keys, CacheWriteQueue queue) { this.keys = keys; - this.cache = cacheWriteQueue.getCache(); - this.map = cacheWriteQueue.getMap(); + this.cache = queue.getCache(); + this.map = queue.getMap(); } @Override public void execute() { - try { - if (map != null) { - for (K key : keys) { - cache.switchGenerationIfNeeded(); - map.remove(key); - } + if (map != null) { + for (K key : keys) { + cache.switchGenerationIfNeeded(); + map.remove(key); } - } finally { - decrement(); } } - @Override - public void cancel() { - decrement(); - } - - @Override - public CacheWriteQueue getOwner() { - return owner; - } - - @Override - public Iterable getAffectedKeys() { - return keys; - } - - private void decrement() { - for (K key : keys) { - owner.remove(key); - } - } } \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java index 586693e..73fdf27 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java @@ -34,48 +34,22 @@ class PutToCacheAction implements CacheAction { private final Map map; - private final CacheWriteQueue owner; - private final K key; private final V value; - PutToCacheAction(CacheWriteQueue cacheWriteQueue, K key, V value) { - this.owner = cacheWriteQueue; + PutToCacheAction(K key, V value, CacheWriteQueue queue) { this.key = key; this.value = value; - this.cache = cacheWriteQueue.getCache(); - this.map = cacheWriteQueue.getMap(); + this.cache = queue.getCache(); + this.map = queue.getMap(); } @Override public void execute() { - try { - if (map != null) { - cache.switchGenerationIfNeeded(); - map.put(key, value); - } - } finally { - decrement(); + if (map != null) { + cache.switchGenerationIfNeeded(); + map.put(key, value); } } - - @Override - public void cancel() { - decrement(); - } - - @Override - public CacheWriteQueue getOwner() { - return owner; - } - - @Override - public Iterable getAffectedKeys() { - return singleton(key); - } - - private void decrement() { - owner.remove(key); - } } \ No newline at end of file diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java index d35f5c6..4f4ce9f 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java @@ -18,33 +18,21 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; -import static com.google.common.collect.ImmutableSet.of; -import static com.google.common.collect.Iterables.size; import static java.lang.String.valueOf; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; -import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE; import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Random; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction; import org.junit.Test; import org.mockito.Mockito; @@ -59,17 +47,8 @@ public class CacheActionDispatcherTest { for (int i = 0; i < MAX_SIZE + 10; i++) { dispatcher.add(createWriteAction(valueOf(i), queue)); } - assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size()); - assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString()); - - InvalidateCacheAction invalidateAction = null; - for (CacheAction action : dispatcher.queue) { - if (action instanceof InvalidateCacheAction) { - invalidateAction = (InvalidateCacheAction) action; - } - } - assertNotNull(invalidateAction); - assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys())); + assertEquals(MAX_SIZE, dispatcher.queue.size()); + assertEquals("0", dispatcher.queue.peek().toString()); } @Test @@ -128,31 +107,6 @@ public class CacheActionDispatcherTest { assertFalse(queueThread.isAlive()); } - @Test - public void testExecuteInvalidatesOnShutdown() throws InterruptedException { - Map cacheMap = new HashMap(); - CacheActionDispatcher dispatcher = new CacheActionDispatcher(); - CacheWriteQueue queue = new CacheWriteQueue(dispatcher, - Mockito.mock(PersistentCache.class), cacheMap); - Thread queueThread = new Thread(dispatcher); - queueThread.start(); - - cacheMap.put("2", new Object()); - cacheMap.put("3", new Object()); - cacheMap.put("4", new Object()); - dispatcher.add(new DummyCacheWriteAction("1", queue, 100)); - dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("2"))); - dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("3"))); - dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("4"))); - Thread.sleep(10); // make sure the first action started - - dispatcher.stop(); - assertEquals(of("2", "3", "4"), cacheMap.keySet()); - - queueThread.join(); - assertTrue(cacheMap.isEmpty()); - } - private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue queue) { return new DummyCacheWriteAction(id, queue); } @@ -188,22 +142,9 @@ public class CacheActionDispatcherTest { } @Override - public void cancel() { - } - - @Override public String toString() { return id; } - @Override - public Iterable getAffectedKeys() { - return Collections.singleton(id); - } - - @Override - public CacheWriteQueue getOwner() { - return queue; - } } } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java deleted file mode 100644 index 05b9eb2..0000000 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; - -import static java.util.Collections.singleton; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -public class CacheWriteQueueTest { - - private CacheWriteQueue queue; - - @SuppressWarnings("rawtypes") - private List actions = Collections.synchronizedList(new ArrayList()); - - @Before - public void initQueue() { - actions.clear(); - - CacheActionDispatcher dispatcher = new CacheActionDispatcher() { - public void add(CacheAction action) { - actions.add(action); - } - }; - - PersistentCache cache = Mockito.mock(PersistentCache.class); - queue = new CacheWriteQueue(dispatcher, cache, null); - } - - @Test - public void testCounters() throws InterruptedException { - final int threadCount = 10; - final int actionsPerThread = 50; - - final Map counters = new HashMap(); - for (int i = 0; i < 10; i++) { - String key = "key_" + i; - counters.put(key, new AtomicInteger()); - } - - final Random random = new Random(); - List threads = new ArrayList(); - for (int i = 0; i < threadCount; i++) { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - for (int j = 0; j < actionsPerThread; j++) { - for (String key : counters.keySet()) { - if (random.nextBoolean()) { - queue.addPut(key, null); - } else { - queue.addPut(key, new Object()); - } - counters.get(key).incrementAndGet(); - } - } - } - }); - threads.add(t); - } - - for (Thread t : threads) { - t.start(); - } - for (Thread t : threads) { - t.join(); - } - for (String key : counters.keySet()) { - assertEquals(queue.queuedKeys.count(key), counters.get(key).get()); - } - - for (CacheAction action : actions) { - if (random.nextBoolean()) { - action.execute(); - } else { - action.cancel(); - } - } - - assertTrue(queue.queuedKeys.isEmpty()); - assertTrue(queue.waitsForInvalidation.isEmpty()); - } - - @Test - public void testWaitsForInvalidation() { - assertFalse(queue.waitsForInvalidation("key")); - - queue.addInvalidate(singleton("key")); - assertTrue(queue.waitsForInvalidation("key")); - - queue.addPut("key", new Object()); - assertFalse(queue.waitsForInvalidation("key")); - - queue.addInvalidate(singleton("key")); - assertTrue(queue.waitsForInvalidation("key")); - - int i; - for (i = 0; i < actions.size() - 1; i++) { - actions.get(i).execute(); - assertTrue(queue.waitsForInvalidation("key")); - } - - actions.get(i).execute(); - assertFalse(queue.waitsForInvalidation("key")); - } - -} \ No newline at end of file