diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java index 61ee864..53ab69e 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java @@ -17,7 +17,9 @@ package org.apache.jackrabbit.oak.plugins.document; import java.io.InputStream; +import java.util.EnumSet; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -27,6 +29,9 @@ import javax.sql.DataSource; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.cache.Weigher; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; @@ -36,6 +41,7 @@ import org.apache.jackrabbit.mk.api.MicroKernel; import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.cache.CacheLIRS; +import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.cache.CacheValue; import org.apache.jackrabbit.oak.cache.EmpiricalWeigher; @@ -51,6 +57,7 @@ import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobStore; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.EvictionListener; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBBlobStore; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore; @@ -66,6 +73,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.cache.RemovalCause.COLLECTED; +import static com.google.common.cache.RemovalCause.EXPIRED; +import static com.google.common.cache.RemovalCause.SIZE; /** * A MicroKernel implementation that stores the data in a {@link DocumentStore}. @@ -913,19 +923,24 @@ public class DocumentMK implements MicroKernel { return new NodeDocumentCache(cache, cacheStats, locks); } + @SuppressWarnings("unchecked") private Cache buildCache( CacheType cacheType, long maxWeight, DocumentNodeStore docNodeStore, DocumentStore docStore ) { - Cache cache = buildCache(maxWeight); + Set> listeners = new CopyOnWriteArraySet>(); + Cache cache = buildCache(maxWeight, listeners); PersistentCache p = getPersistentCache(); if (p != null) { if (docNodeStore != null) { docNodeStore.setPersistentCache(p); } cache = p.wrap(docNodeStore, docStore, cache, cacheType); + if (cache instanceof EvictionListener) { + listeners.add((EvictionListener) cache); + } } return cache; } @@ -946,7 +961,8 @@ public class DocumentMK implements MicroKernel { } private Cache buildCache( - long maxWeight) { + long maxWeight, + final Set> listeners) { // by default, use the LIRS cache when using the persistent cache, // but don't use it otherwise boolean useLirs = persistentCacheURI != null; @@ -962,6 +978,14 @@ public class DocumentMK implements MicroKernel { segmentCount(cacheSegmentCount). stackMoveDistance(cacheStackMoveDistance). recordStats(). + evictionCallback(new EvictionCallback() { + @Override + public void evicted(K key, V value, RemovalCause cause) { + for (EvictionListener l : listeners) { + l.evicted(key, value, cause); + } + } + }). build(); } return CacheBuilder.newBuilder(). @@ -969,9 +993,16 @@ public class DocumentMK implements MicroKernel { weigher(weigher). maximumWeight(maxWeight). recordStats(). + removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + for (EvictionListener l : listeners) { + l.evicted(notification.getKey(), notification.getValue(), notification.getCause()); + } + } + }). build(); } - } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java new file mode 100644 index 0000000..9d37f6a --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache; + +import com.google.common.cache.RemovalCause; + +public interface EvictionListener { + + void evicted(K key, V value, RemovalCause cause); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java index aafaa37..abb9298 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java @@ -16,7 +16,13 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache; +import static com.google.common.cache.RemovalCause.COLLECTED; +import static com.google.common.cache.RemovalCause.EXPIRED; +import static com.google.common.cache.RemovalCause.SIZE; +import static java.util.Collections.singleton; + import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -26,15 +32,21 @@ import javax.annotation.Nullable; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; import org.h2.mvstore.MVMap; import org.h2.mvstore.type.DataType; import com.google.common.cache.Cache; import com.google.common.cache.CacheStats; +import com.google.common.cache.RemovalCause; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +class NodeCache implements Cache, GenerationCache, EvictionListener { + + private static final Set EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE); -class NodeCache implements Cache, GenerationCache { - private final PersistentCache cache; private final Cache memCache; private final MultiGenerationMap map; @@ -42,11 +54,15 @@ class NodeCache implements Cache, GenerationCache { private final DocumentNodeStore docNodeStore; private final DocumentStore docStore; + private final CacheWriteQueue writerQueue; + NodeCache( PersistentCache cache, Cache memCache, DocumentNodeStore docNodeStore, - DocumentStore docStore, CacheType type) { + DocumentStore docStore, + CacheType type, + CacheActionDispatcher dispatcher) { this.cache = cache; this.memCache = memCache; this.type = type; @@ -54,6 +70,7 @@ class NodeCache implements Cache, GenerationCache { this.docStore = docStore; PersistentCache.LOG.info("wrapping map " + this.type); map = new MultiGenerationMap(); + this.writerQueue = new CacheWriteQueue(dispatcher, cache, map); } @Override @@ -76,20 +93,14 @@ class NodeCache implements Cache, GenerationCache { } private V readIfPresent(K key) { + if (writerQueue.waitsForInvalidation(key)) { + return null; + } cache.switchGenerationIfNeeded(); V v = map.get(key); return v; } - public void write(K key, V value) { - cache.switchGenerationIfNeeded(); - if (value == null) { - map.remove(key); - } else { - map.put(key, value); - } - } - @SuppressWarnings("unchecked") @Override @Nullable @@ -114,7 +125,7 @@ class NodeCache implements Cache, GenerationCache { return value; } value = memCache.get(key, valueLoader); - write(key, value); + writerQueue.addPut(key, value); return value; } @@ -127,14 +138,14 @@ class NodeCache implements Cache, GenerationCache { @Override public void put(K key, V value) { memCache.put(key, value); - write(key, value); + writerQueue.addPut(key, value); } @SuppressWarnings("unchecked") @Override public void invalidate(Object key) { memCache.invalidate(key); - write((K) key, (V) null); + writerQueue.addInvalidate(singleton((K) key)); } @Override @@ -173,4 +184,11 @@ class NodeCache implements Cache, GenerationCache { memCache.cleanUp(); } + @Override + public void evicted(K key, V value, RemovalCause cause) { + if (EVICTION_CAUSES.contains(cause) && value != null) { // invalidations are handled separately + writerQueue.addPut(key, value); + } + } + } \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java index bc74ce9..f6d0346 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.h2.mvstore.FileStore; import org.h2.mvstore.MVMap; @@ -70,6 +71,9 @@ public class PersistentCache { private int exceptionCount; + private CacheActionDispatcher writeDispatcher; + private Thread writeDispatcherThread; + public PersistentCache(String url) { LOG.info("start version 1"); String[] parts = url.split(","); @@ -163,6 +167,11 @@ public class PersistentCache { readStore = createMapFactory(readGeneration, true); } writeStore = createMapFactory(writeGeneration, false); + + writeDispatcher = new CacheActionDispatcher(); + writeDispatcherThread = new Thread(writeDispatcher, "Oak CacheWriteQueue"); + writeDispatcherThread.setDaemon(true); + writeDispatcherThread.start(); } private String getFileName(int generation) { @@ -289,6 +298,13 @@ public class PersistentCache { } public void close() { + writeDispatcher.stop(); + try { + writeDispatcherThread.join(); + } catch (InterruptedException e) { + LOG.error("Can't join the {}", writeDispatcherThread.getName(), e); + } + if (writeStore != null) { writeStore.closeStore(); } @@ -333,7 +349,7 @@ public class PersistentCache { break; } if (wrap) { - NodeCache c = new NodeCache(this, base, docNodeStore, docStore, type); + NodeCache c = new NodeCache(this, base, docNodeStore, docStore, type, writeDispatcher); initGenerationCache(c); return c; } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java new file mode 100644 index 0000000..26078e1 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +/** + * Object represents an action on the cache (eg. put or invalidate). + * + * @param key type + * @param value type + */ +interface CacheAction { + + /** + * Execute the action + */ + void execute(); + + /** + * Cancel the action without executing it + */ + void cancel(); + + /** + * Return the keys affected by this action + * + * @return keys affected by this action + */ + Iterable getAffectedKeys(); + + /** + * Return the owner of this action + * + * @return {@link CacheWriteQueue} executing this action + */ + CacheWriteQueue getOwner(); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java new file mode 100644 index 0000000..49897e4 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static com.google.common.collect.Multimaps.index; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/** + * An asynchronous buffer of the CacheAction objects. The buffer removes + * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than + * {@link #MAX_SIZE}. + */ +public class CacheActionDispatcher implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(CacheActionDispatcher.class); + + /** + * What's the length of the queue. + */ + static final int MAX_SIZE = 1024; + + /** + * How many actions remove once the queue is longer than {@link #MAX_SIZE}. + */ + static final int ACTIONS_TO_REMOVE = 256; + + final BlockingQueue> queue = new ArrayBlockingQueue>(MAX_SIZE * 2); + + private volatile boolean isRunning = true; + + @Override + public void run() { + while (isRunning) { + try { + CacheAction action = queue.poll(10, TimeUnit.MILLISECONDS); + if (action != null && isRunning) { + action.execute(); + } + } catch (InterruptedException e) { + LOG.debug("Interrupted the queue.poll()", e); + } + } + applyInvalidateActions(); + } + + /** + * Stop the processing. + */ + public void stop() { + isRunning = false; + } + + /** + * Adds the new action and cleans the queue if necessary. + * + * @param action to be added + */ + synchronized void add(CacheAction action) { + if (queue.size() >= MAX_SIZE) { + cleanTheQueue(); + } + queue.offer(action); + } + + /** + * Clean the queue and add a single invalidate action for all the removed entries. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void cleanTheQueue() { + List removed = removeOldest(); + for (Entry> e : groupByOwner(removed).entrySet()) { + CacheWriteQueue owner = e.getKey(); + Collection actions = e.getValue(); + List affectedKeys = cancelAll(actions); + owner.addInvalidate(affectedKeys); + } + } + + /** + * Remove {@link #ACTIONS_TO_REMOVE} oldest actions. + * + * @return A list of removed items. + */ + @SuppressWarnings("rawtypes") + private List removeOldest() { + List removed = new ArrayList(); + while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) { + CacheAction toBeCanceled = queue.poll(); + if (toBeCanceled == null) { + break; + } else { + removed.add(toBeCanceled); + } + } + return removed; + } + + /** + * Group passed actions by their owners. + * + * @param actions to be grouped + * @return map in which owner is the key and assigned action list is the value + */ + @SuppressWarnings("rawtypes") + private Map> groupByOwner(List actions) { + return index(actions, new Function() { + @Override + public CacheWriteQueue apply(CacheAction input) { + return input.getOwner(); + } + }).asMap(); + } + + /** + * Cancel all passed actions. + * + * @param actions to cancel + * @return list of affected keys + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private List cancelAll(Collection actions) { + List cancelledKeys = new ArrayList(); + for (CacheAction action : actions) { + action.cancel(); + Iterables.addAll(cancelledKeys, action.getAffectedKeys()); + } + return cancelledKeys; + } + + @SuppressWarnings("rawtypes") + private void applyInvalidateActions() { + CacheAction action; + do { + action = queue.poll(); + if (action instanceof InvalidateCacheAction) { + action.execute(); + } + } while (action != null); + } + +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java new file mode 100644 index 0000000..01a0a27 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; + +/** + * A fronted for the {@link CacheActionDispatcher} creating actions and maintaining their state. + * + * @param key type + * @param value type + */ +public class CacheWriteQueue { + + private final CacheActionDispatcher dispatcher; + + private final PersistentCache cache; + + private final Map map; + + final Multiset queuedKeys = HashMultiset.create(); + + final Set waitsForInvalidation = new HashSet(); + + public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache cache, Map map) { + this.dispatcher = dispatcher; + this.cache = cache; + this.map = map; + } + + /** + * Add new invalidate action. + * + * @param keys to be invalidated + */ + public void addInvalidate(Iterable keys) { + synchronized(this) { + for (K key : keys) { + queuedKeys.add(key); + waitsForInvalidation.add(key); + } + } + dispatcher.add(new InvalidateCacheAction(this, keys)); + } + + /** + * Add new put action + * + * @param key to be put to cache + * @param value to be put to cache + */ + public void addPut(K key, V value) { + synchronized(this) { + queuedKeys.add(key); + waitsForInvalidation.remove(key); + } + dispatcher.add(new PutToCacheAction(this, key, value)); + } + + /** + * Check if the last action added for this key was invalidate + * + * @param key to check + * @return {@code true} if the last added action was invalidate + */ + public synchronized boolean waitsForInvalidation(K key) { + return waitsForInvalidation.contains(key); + } + + /** + * Remove the action state when it's finished or cancelled. + * + * @param key to be removed + */ + synchronized void remove(K key) { + queuedKeys.remove(key); + if (!queuedKeys.contains(key)) { + waitsForInvalidation.remove(key); + } + } + + PersistentCache getCache() { + return cache; + } + + Map getMap() { + return map; + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java new file mode 100644 index 0000000..e4d145c --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +/** + * An invalidate cache action. + * + * @param key type + * @param value type + */ +class InvalidateCacheAction implements CacheAction { + + private final PersistentCache cache; + + private final Map map; + + private final CacheWriteQueue owner; + + private final Iterable keys; + + InvalidateCacheAction(CacheWriteQueue cacheWriteQueue, Iterable keys) { + this.owner = cacheWriteQueue; + this.keys = keys; + this.cache = cacheWriteQueue.getCache(); + this.map = cacheWriteQueue.getMap(); + } + + @Override + public void execute() { + try { + if (map != null) { + for (K key : keys) { + cache.switchGenerationIfNeeded(); + map.remove(key); + } + } + } finally { + decrement(); + } + } + + @Override + public void cancel() { + decrement(); + } + + @Override + public CacheWriteQueue getOwner() { + return owner; + } + + @Override + public Iterable getAffectedKeys() { + return keys; + } + + private void decrement() { + for (K key : keys) { + owner.remove(key); + } + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java new file mode 100644 index 0000000..586693e --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static java.util.Collections.singleton; + +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +/** + * Put to cache action + * + * @param key type + * @param value type + */ +class PutToCacheAction implements CacheAction { + + private final PersistentCache cache; + + private final Map map; + + private final CacheWriteQueue owner; + + private final K key; + + private final V value; + + PutToCacheAction(CacheWriteQueue cacheWriteQueue, K key, V value) { + this.owner = cacheWriteQueue; + this.key = key; + this.value = value; + this.cache = cacheWriteQueue.getCache(); + this.map = cacheWriteQueue.getMap(); + } + + @Override + public void execute() { + try { + if (map != null) { + cache.switchGenerationIfNeeded(); + map.put(key, value); + } + } finally { + decrement(); + } + } + + @Override + public void cancel() { + decrement(); + } + + @Override + public CacheWriteQueue getOwner() { + return owner; + } + + @Override + public Iterable getAffectedKeys() { + return singleton(key); + } + + private void decrement() { + owner.remove(key); + } +} \ No newline at end of file diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java new file mode 100644 index 0000000..d35f5c6 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Iterables.size; +import static java.lang.String.valueOf; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE; +import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction; +import org.junit.Test; +import org.mockito.Mockito; + +public class CacheActionDispatcherTest { + + @Test + public void testMaxQueueSize() { + CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + CacheWriteQueue queue = new CacheWriteQueue(dispatcher, mock(PersistentCache.class), null); + + for (int i = 0; i < MAX_SIZE + 10; i++) { + dispatcher.add(createWriteAction(valueOf(i), queue)); + } + assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size()); + assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString()); + + InvalidateCacheAction invalidateAction = null; + for (CacheAction action : dispatcher.queue) { + if (action instanceof InvalidateCacheAction) { + invalidateAction = (InvalidateCacheAction) action; + } + } + assertNotNull(invalidateAction); + assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys())); + } + + @Test + public void testQueue() throws InterruptedException { + int threads = 5; + int actionsPerThread = 100; + + @SuppressWarnings("unchecked") + CacheWriteQueue queue = Mockito.mock(CacheWriteQueue.class); + final CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + Thread queueThread = new Thread(dispatcher); + queueThread.start(); + + List allActions = new ArrayList(); + List producerThreads = new ArrayList(); + for (int i = 0; i < threads; i++) { + final List threadActions = new ArrayList(); + for (int j = 0; j < actionsPerThread; j++) { + DummyCacheWriteAction action = new DummyCacheWriteAction(String.format("%d_%d", i, j), queue); + threadActions.add(action); + allActions.add(action); + } + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (DummyCacheWriteAction a : threadActions) { + dispatcher.add(a); + } + } + }); + producerThreads.add(t); + } + + for (Thread t : producerThreads) { + t.start(); + } + for (Thread t : producerThreads) { + t.join(); + } + + long start = currentTimeMillis(); + while (!allActions.isEmpty()) { + Iterator it = allActions.iterator(); + while (it.hasNext()) { + if (it.next().finished) { + it.remove(); + } + } + if (currentTimeMillis() - start > 10000) { + fail("Following actions hasn't been executed: " + allActions); + } + } + + dispatcher.stop(); + queueThread.join(); + assertFalse(queueThread.isAlive()); + } + + @Test + public void testExecuteInvalidatesOnShutdown() throws InterruptedException { + Map cacheMap = new HashMap(); + CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + CacheWriteQueue queue = new CacheWriteQueue(dispatcher, + Mockito.mock(PersistentCache.class), cacheMap); + Thread queueThread = new Thread(dispatcher); + queueThread.start(); + + cacheMap.put("2", new Object()); + cacheMap.put("3", new Object()); + cacheMap.put("4", new Object()); + dispatcher.add(new DummyCacheWriteAction("1", queue, 100)); + dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("2"))); + dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("3"))); + dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("4"))); + Thread.sleep(10); // make sure the first action started + + dispatcher.stop(); + assertEquals(of("2", "3", "4"), cacheMap.keySet()); + + queueThread.join(); + assertTrue(cacheMap.isEmpty()); + } + + private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue queue) { + return new DummyCacheWriteAction(id, queue); + } + + private class DummyCacheWriteAction implements CacheAction { + + private final CacheWriteQueue queue; + + private final String id; + + private final long delay; + + private volatile boolean finished; + + private DummyCacheWriteAction(String id, CacheWriteQueue queue) { + this(id, queue, new Random().nextInt(10)); + } + + private DummyCacheWriteAction(String id, CacheWriteQueue queue, long delay) { + this.queue = queue; + this.id = id; + this.delay = delay; + } + + @Override + public void execute() { + try { + sleep(delay); + } catch (InterruptedException e) { + fail("Interrupted"); + } + finished = true; + } + + @Override + public void cancel() { + } + + @Override + public String toString() { + return id; + } + + @Override + public Iterable getAffectedKeys() { + return Collections.singleton(id); + } + + @Override + public CacheWriteQueue getOwner() { + return queue; + } + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java new file mode 100644 index 0000000..34a9dd0 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class CacheWriteQueueTest { + + private CacheWriteQueue queue; + + @SuppressWarnings("rawtypes") + private List actions = Collections.synchronizedList(new ArrayList()); + + @Before + public void initQueue() { + actions.clear(); + + CacheActionDispatcher dispatcher = new CacheActionDispatcher() { + public void add(CacheAction action) { + actions.add(action); + } + }; + + PersistentCache cache = Mockito.mock(PersistentCache.class); + queue = new CacheWriteQueue(dispatcher, cache, null); + } + + @Test + public void testCounters() throws InterruptedException { + final int threadCount = 10; + final int actionsPerThread = 50; + + final Map counters = new HashMap(); + for (int i = 0; i < 10; i++) { + String key = "key_" + i; + counters.put(key, new AtomicInteger()); + } + + final Random random = new Random(); + List threads = new ArrayList(); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < actionsPerThread; j++) { + for (String key : counters.keySet()) { + if (random.nextBoolean()) { + queue.addPut(key, null); + } else { + queue.addPut(key, new Object()); + } + counters.get(key).incrementAndGet(); + } + } + } + }); + threads.add(t); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + for (String key : counters.keySet()) { + assertEquals(queue.queuedKeys.count(key), counters.get(key).get()); + } + + for (CacheAction action : actions) { + if (random.nextBoolean()) { + action.execute(); + } else { + action.cancel(); + } + } + + assertTrue(queue.queuedKeys.isEmpty()); + assertTrue(queue.waitsForInvalidation.isEmpty()); + } + + @Test + public void testWaitsForInvalidation() { + assertFalse(queue.waitsForInvalidation("key")); + + queue.addInvalidate(singleton("key")); + assertTrue(queue.waitsForInvalidation("key")); + + queue.addPut("key", new Object()); + assertFalse(queue.waitsForInvalidation("key")); + + queue.addInvalidate(singleton("key")); + assertTrue(queue.waitsForInvalidation("key")); + + int i; + for (i = 0; i < actions.size() - 1; i++) { + actions.get(i).execute(); + assertTrue(queue.waitsForInvalidation("key")); + } + + actions.get(i).execute(); + assertFalse(queue.waitsForInvalidation("key")); + } + +}