diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java index 9c95287..94b982c 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java @@ -17,11 +17,17 @@ package org.apache.jackrabbit.oak.plugins.document; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.cache.RemovalCause.COLLECTED; +import static com.google.common.cache.RemovalCause.EXPIRED; +import static com.google.common.cache.RemovalCause.SIZE; import java.io.InputStream; +import java.lang.ref.Reference; import java.net.UnknownHostException; +import java.util.EnumSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -32,12 +38,16 @@ import javax.sql.DataSource; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.cache.Weigher; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import com.mongodb.DB; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.cache.CacheLIRS; +import org.apache.jackrabbit.oak.cache.CacheLIRS.EvictionCallback; import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.cache.CacheValue; import org.apache.jackrabbit.oak.cache.EmpiricalWeigher; @@ -59,6 +69,7 @@ import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoVersionGCSupport; import org.apache.jackrabbit.oak.plugins.document.persistentCache.CacheType; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.EvictionListener; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBBlobStore; import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore; @@ -1008,19 +1019,24 @@ public class DocumentMK { return new NodeDocumentCache(nodeDocumentsCache, nodeDocumentsCacheStats, prevDocumentsCache, prevDocumentsCacheStats, locks); } + @SuppressWarnings("unchecked") private Cache buildCache( CacheType cacheType, long maxWeight, DocumentNodeStore docNodeStore, DocumentStore docStore ) { - Cache cache = buildCache(cacheType.name(), maxWeight); + Set> listeners = new CopyOnWriteArraySet>(); + Cache cache = buildCache(cacheType.name(), maxWeight, listeners); PersistentCache p = getPersistentCache(); if (p != null) { if (docNodeStore != null) { docNodeStore.setPersistentCache(p); } cache = p.wrap(docNodeStore, docStore, cache, cacheType); + if (cache instanceof EvictionListener) { + listeners.add((EvictionListener) cache); + } } return cache; } @@ -1042,7 +1058,8 @@ public class DocumentMK { private Cache buildCache( String module, - long maxWeight) { + long maxWeight, + final Set> listeners) { // by default, use the LIRS cache when using the persistent cache, // but don't use it otherwise boolean useLirs = persistentCacheURI != null; @@ -1064,6 +1081,14 @@ public class DocumentMK { segmentCount(cacheSegmentCount). stackMoveDistance(cacheStackMoveDistance). recordStats(). + evictionCallback(new EvictionCallback() { + @Override + public void evicted(K key, V value, RemovalCause cause) { + for (EvictionListener l : listeners) { + l.evicted(key, value, cause); + } + } + }). build(); } return CacheBuilder.newBuilder(). @@ -1071,6 +1096,14 @@ public class DocumentMK { weigher(weigher). maximumWeight(maxWeight). recordStats(). + removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + for (EvictionListener l : listeners) { + l.evicted(notification.getKey(), notification.getValue(), notification.getCause()); + } + } + }). build(); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java new file mode 100644 index 0000000..fa162ed --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/EvictionListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache; + +import com.google.common.cache.RemovalCause; + +public interface EvictionListener { + + void evicted(K key, V value, RemovalCause removalCause); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java index 8903471..0759771 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/NodeCache.java @@ -16,8 +16,14 @@ */ package org.apache.jackrabbit.oak.plugins.document.persistentCache; +import static com.google.common.cache.RemovalCause.COLLECTED; +import static com.google.common.cache.RemovalCause.EXPIRED; +import static com.google.common.cache.RemovalCause.SIZE; +import static java.util.Collections.singleton; + import java.nio.ByteBuffer; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -27,6 +33,8 @@ import javax.annotation.Nullable; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache.GenerationCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; import org.h2.mvstore.MVMap; import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.DataType; @@ -34,22 +42,29 @@ import org.h2.mvstore.type.DataType; import com.google.common.base.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheStats; +import com.google.common.cache.RemovalCause; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +class NodeCache implements Cache, GenerationCache, EvictionListener { + + private static final Set EVICTION_CAUSES = ImmutableSet.of(COLLECTED, EXPIRED, SIZE); -class NodeCache implements Cache, GenerationCache { - private final PersistentCache cache; private final Cache memCache; private final MultiGenerationMap map; private final CacheType type; private final DataType keyType; private final DataType valueType; - + private final CacheWriteQueue writerQueue; + NodeCache( PersistentCache cache, Cache memCache, DocumentNodeStore docNodeStore, - DocumentStore docStore, CacheType type) { + DocumentStore docStore, + CacheType type, + CacheActionDispatcher dispatcher) { this.cache = cache; this.memCache = memCache; this.type = type; @@ -57,6 +72,7 @@ class NodeCache implements Cache, GenerationCache { map = new MultiGenerationMap(); keyType = new KeyDataType(type); valueType = new ValueDataType(docNodeStore, docStore, type); + this.writerQueue = new CacheWriteQueue(dispatcher, cache, map); } @Override @@ -82,47 +98,31 @@ class NodeCache implements Cache, GenerationCache { } private V readIfPresent(K key) { + if (writerQueue.waitsForInvalidation(key)) { + return null; + } cache.switchGenerationIfNeeded(); V v = map.get(key); return v; } - - private void write(final K key, final V value) { - write(key, value, true); - } - - private void writeWithoutBroadcast(final K key, final V value) { - write(key, value, false); - } - private void write(final K key, final V value, boolean broadcast) { - cache.switchGenerationIfNeeded(); - if (broadcast) { - cache.broadcast(type, new Function() { - @Override - @Nullable - public Void apply(@Nullable WriteBuffer buffer) { - keyType.write(buffer, key); - if (value == null) { - buffer.put((byte) 0); - } else { - buffer.put((byte) 1); - valueType.write(buffer, value); - } - return null; + private void broadcast(final K key, final V value) { + cache.broadcast(type, new Function() { + @Override + @Nullable + public Void apply(@Nullable WriteBuffer buffer) { + keyType.write(buffer, key); + if (value == null) { + buffer.put((byte) 0); + } else { + buffer.put((byte) 1); + valueType.write(buffer, value); } - }); - } - MultiGenerationMap m = map; - if (m != null) { - if (value == null) { - m.remove(key); - } else { - m.put(key, value); + return null; } - } + }); } - + @SuppressWarnings("unchecked") @Override @Nullable @@ -147,7 +147,7 @@ class NodeCache implements Cache, GenerationCache { return value; } value = memCache.get(key, valueLoader); - write(key, value); + broadcast(key, value); return value; } @@ -160,14 +160,15 @@ class NodeCache implements Cache, GenerationCache { @Override public void put(K key, V value) { memCache.put(key, value); - write(key, value); + broadcast(key, value); } @SuppressWarnings("unchecked") @Override public void invalidate(Object key) { memCache.invalidate(key); - write((K) key, (V) null); + writerQueue.addInvalidate(singleton((K) key)); + broadcast((K) key, null); } @Override @@ -218,7 +219,16 @@ class NodeCache implements Cache, GenerationCache { value = (V) valueType.read(buff); memCache.put(key, value); } - writeWithoutBroadcast(key, value); + } + + /** + * Invoked on the eviction from the {@link #memCache} + */ + @Override + public void evicted(K key, V value, RemovalCause cause) { + if (EVICTION_CAUSES.contains(cause) && value != null) { // invalidations are handled separately + writerQueue.addPut(key, value); + } } } \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java index aa0f7ce..1086b68 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster; @@ -82,6 +83,8 @@ public class PersistentCache implements Broadcaster.Listener { private ThreadLocal writeBuffer = new ThreadLocal(); private final byte[] broadcastId; private DynamicBroadcastConfig broadcastConfig; + private CacheActionDispatcher writeDispatcher; + private Thread writeDispatcherThread; { ByteBuffer bb = ByteBuffer.wrap(new byte[16]); @@ -192,6 +195,11 @@ public class PersistentCache implements Broadcaster.Listener { } writeStore = createMapFactory(writeGeneration, false); initBroadcast(broadcast); + + writeDispatcher = new CacheActionDispatcher(); + writeDispatcherThread = new Thread(writeDispatcher, "Oak CacheWriteQueue"); + writeDispatcherThread.setDaemon(true); + writeDispatcherThread.start(); } private void initBroadcast(String broadcast) { @@ -338,6 +346,13 @@ public class PersistentCache implements Broadcaster.Listener { } public void close() { + writeDispatcher.stop(); + try { + writeDispatcherThread.join(); + } catch (InterruptedException e) { + LOG.error("Can't join the {}", writeDispatcherThread.getName(), e); + } + if (writeStore != null) { writeStore.closeStore(); } @@ -395,7 +410,7 @@ public class PersistentCache implements Broadcaster.Listener { } if (wrap) { NodeCache c = new NodeCache(this, - base, docNodeStore, docStore, type); + base, docNodeStore, docStore, type, writeDispatcher); initGenerationCache(c); return c; } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java new file mode 100644 index 0000000..26078e1 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheAction.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +/** + * Object represents an action on the cache (eg. put or invalidate). + * + * @param key type + * @param value type + */ +interface CacheAction { + + /** + * Execute the action + */ + void execute(); + + /** + * Cancel the action without executing it + */ + void cancel(); + + /** + * Return the keys affected by this action + * + * @return keys affected by this action + */ + Iterable getAffectedKeys(); + + /** + * Return the owner of this action + * + * @return {@link CacheWriteQueue} executing this action + */ + CacheWriteQueue getOwner(); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java new file mode 100644 index 0000000..49897e4 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcher.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static com.google.common.collect.Multimaps.index; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/** + * An asynchronous buffer of the CacheAction objects. The buffer removes + * {@link #ACTIONS_TO_REMOVE} oldest entries if the queue length is larger than + * {@link #MAX_SIZE}. + */ +public class CacheActionDispatcher implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(CacheActionDispatcher.class); + + /** + * What's the length of the queue. + */ + static final int MAX_SIZE = 1024; + + /** + * How many actions remove once the queue is longer than {@link #MAX_SIZE}. + */ + static final int ACTIONS_TO_REMOVE = 256; + + final BlockingQueue> queue = new ArrayBlockingQueue>(MAX_SIZE * 2); + + private volatile boolean isRunning = true; + + @Override + public void run() { + while (isRunning) { + try { + CacheAction action = queue.poll(10, TimeUnit.MILLISECONDS); + if (action != null && isRunning) { + action.execute(); + } + } catch (InterruptedException e) { + LOG.debug("Interrupted the queue.poll()", e); + } + } + applyInvalidateActions(); + } + + /** + * Stop the processing. + */ + public void stop() { + isRunning = false; + } + + /** + * Adds the new action and cleans the queue if necessary. + * + * @param action to be added + */ + synchronized void add(CacheAction action) { + if (queue.size() >= MAX_SIZE) { + cleanTheQueue(); + } + queue.offer(action); + } + + /** + * Clean the queue and add a single invalidate action for all the removed entries. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void cleanTheQueue() { + List removed = removeOldest(); + for (Entry> e : groupByOwner(removed).entrySet()) { + CacheWriteQueue owner = e.getKey(); + Collection actions = e.getValue(); + List affectedKeys = cancelAll(actions); + owner.addInvalidate(affectedKeys); + } + } + + /** + * Remove {@link #ACTIONS_TO_REMOVE} oldest actions. + * + * @return A list of removed items. + */ + @SuppressWarnings("rawtypes") + private List removeOldest() { + List removed = new ArrayList(); + while (queue.size() > MAX_SIZE - ACTIONS_TO_REMOVE) { + CacheAction toBeCanceled = queue.poll(); + if (toBeCanceled == null) { + break; + } else { + removed.add(toBeCanceled); + } + } + return removed; + } + + /** + * Group passed actions by their owners. + * + * @param actions to be grouped + * @return map in which owner is the key and assigned action list is the value + */ + @SuppressWarnings("rawtypes") + private Map> groupByOwner(List actions) { + return index(actions, new Function() { + @Override + public CacheWriteQueue apply(CacheAction input) { + return input.getOwner(); + } + }).asMap(); + } + + /** + * Cancel all passed actions. + * + * @param actions to cancel + * @return list of affected keys + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private List cancelAll(Collection actions) { + List cancelledKeys = new ArrayList(); + for (CacheAction action : actions) { + action.cancel(); + Iterables.addAll(cancelledKeys, action.getAffectedKeys()); + } + return cancelledKeys; + } + + @SuppressWarnings("rawtypes") + private void applyInvalidateActions() { + CacheAction action; + do { + action = queue.poll(); + if (action instanceof InvalidateCacheAction) { + action.execute(); + } + } while (action != null); + } + +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java new file mode 100644 index 0000000..01a0a27 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueue.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; + +/** + * A fronted for the {@link CacheActionDispatcher} creating actions and maintaining their state. + * + * @param key type + * @param value type + */ +public class CacheWriteQueue { + + private final CacheActionDispatcher dispatcher; + + private final PersistentCache cache; + + private final Map map; + + final Multiset queuedKeys = HashMultiset.create(); + + final Set waitsForInvalidation = new HashSet(); + + public CacheWriteQueue(CacheActionDispatcher dispatcher, PersistentCache cache, Map map) { + this.dispatcher = dispatcher; + this.cache = cache; + this.map = map; + } + + /** + * Add new invalidate action. + * + * @param keys to be invalidated + */ + public void addInvalidate(Iterable keys) { + synchronized(this) { + for (K key : keys) { + queuedKeys.add(key); + waitsForInvalidation.add(key); + } + } + dispatcher.add(new InvalidateCacheAction(this, keys)); + } + + /** + * Add new put action + * + * @param key to be put to cache + * @param value to be put to cache + */ + public void addPut(K key, V value) { + synchronized(this) { + queuedKeys.add(key); + waitsForInvalidation.remove(key); + } + dispatcher.add(new PutToCacheAction(this, key, value)); + } + + /** + * Check if the last action added for this key was invalidate + * + * @param key to check + * @return {@code true} if the last added action was invalidate + */ + public synchronized boolean waitsForInvalidation(K key) { + return waitsForInvalidation.contains(key); + } + + /** + * Remove the action state when it's finished or cancelled. + * + * @param key to be removed + */ + synchronized void remove(K key) { + queuedKeys.remove(key); + if (!queuedKeys.contains(key)) { + waitsForInvalidation.remove(key); + } + } + + PersistentCache getCache() { + return cache; + } + + Map getMap() { + return map; + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java new file mode 100644 index 0000000..e4d145c --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/InvalidateCacheAction.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +/** + * An invalidate cache action. + * + * @param key type + * @param value type + */ +class InvalidateCacheAction implements CacheAction { + + private final PersistentCache cache; + + private final Map map; + + private final CacheWriteQueue owner; + + private final Iterable keys; + + InvalidateCacheAction(CacheWriteQueue cacheWriteQueue, Iterable keys) { + this.owner = cacheWriteQueue; + this.keys = keys; + this.cache = cacheWriteQueue.getCache(); + this.map = cacheWriteQueue.getMap(); + } + + @Override + public void execute() { + try { + if (map != null) { + for (K key : keys) { + cache.switchGenerationIfNeeded(); + map.remove(key); + } + } + } finally { + decrement(); + } + } + + @Override + public void cancel() { + decrement(); + } + + @Override + public CacheWriteQueue getOwner() { + return owner; + } + + @Override + public Iterable getAffectedKeys() { + return keys; + } + + private void decrement() { + for (K key : keys) { + owner.remove(key); + } + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java new file mode 100644 index 0000000..586693e --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/PutToCacheAction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static java.util.Collections.singleton; + +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; + +/** + * Put to cache action + * + * @param key type + * @param value type + */ +class PutToCacheAction implements CacheAction { + + private final PersistentCache cache; + + private final Map map; + + private final CacheWriteQueue owner; + + private final K key; + + private final V value; + + PutToCacheAction(CacheWriteQueue cacheWriteQueue, K key, V value) { + this.owner = cacheWriteQueue; + this.key = key; + this.value = value; + this.cache = cacheWriteQueue.getCache(); + this.map = cacheWriteQueue.getMap(); + } + + @Override + public void execute() { + try { + if (map != null) { + cache.switchGenerationIfNeeded(); + map.put(key, value); + } + } finally { + decrement(); + } + } + + @Override + public void cancel() { + decrement(); + } + + @Override + public CacheWriteQueue getOwner() { + return owner; + } + + @Override + public Iterable getAffectedKeys() { + return singleton(key); + } + + private void decrement() { + owner.remove(key); + } +} \ No newline at end of file diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java new file mode 100644 index 0000000..f40ec74 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AsyncCacheTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.File; + +import com.google.common.cache.Cache; + +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +import static org.junit.Assert.assertNull; + +public class AsyncCacheTest { + + @Test + public void invalidateWhileInQueue() throws Exception { + FileUtils.deleteDirectory(new File("target/cacheTest")); + DocumentMK.Builder builder = new DocumentMK.Builder(); + builder.setPersistentCache("target/cacheTest"); + Cache cache = builder.buildChildrenCache(); + DocumentNodeState.Children c = new DocumentNodeState.Children(); + for (int i = 0; i < 100; i++) { + c.children.add("node-" + i); + } + PathRev key = null; + for (int i = 0; i < 1000; i++) { + key = new PathRev("/foo/bar", new RevisionVector(new Revision(i, 0, 1))); + cache.put(key, c); + } + cache.invalidate(key); + // give the write queue some time to write back entries + Thread.sleep(200); + assertNull(cache.getIfPresent(key)); + builder.getPersistentCache().close(); + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java new file mode 100644 index 0000000..d35f5c6 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheActionDispatcherTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Iterables.size; +import static java.lang.String.valueOf; +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; +import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.ACTIONS_TO_REMOVE; +import static org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher.MAX_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.InvalidateCacheAction; +import org.junit.Test; +import org.mockito.Mockito; + +public class CacheActionDispatcherTest { + + @Test + public void testMaxQueueSize() { + CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + CacheWriteQueue queue = new CacheWriteQueue(dispatcher, mock(PersistentCache.class), null); + + for (int i = 0; i < MAX_SIZE + 10; i++) { + dispatcher.add(createWriteAction(valueOf(i), queue)); + } + assertEquals(MAX_SIZE - ACTIONS_TO_REMOVE + 10 + 1, dispatcher.queue.size()); + assertEquals(valueOf(ACTIONS_TO_REMOVE), dispatcher.queue.peek().toString()); + + InvalidateCacheAction invalidateAction = null; + for (CacheAction action : dispatcher.queue) { + if (action instanceof InvalidateCacheAction) { + invalidateAction = (InvalidateCacheAction) action; + } + } + assertNotNull(invalidateAction); + assertEquals(ACTIONS_TO_REMOVE, size(invalidateAction.getAffectedKeys())); + } + + @Test + public void testQueue() throws InterruptedException { + int threads = 5; + int actionsPerThread = 100; + + @SuppressWarnings("unchecked") + CacheWriteQueue queue = Mockito.mock(CacheWriteQueue.class); + final CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + Thread queueThread = new Thread(dispatcher); + queueThread.start(); + + List allActions = new ArrayList(); + List producerThreads = new ArrayList(); + for (int i = 0; i < threads; i++) { + final List threadActions = new ArrayList(); + for (int j = 0; j < actionsPerThread; j++) { + DummyCacheWriteAction action = new DummyCacheWriteAction(String.format("%d_%d", i, j), queue); + threadActions.add(action); + allActions.add(action); + } + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (DummyCacheWriteAction a : threadActions) { + dispatcher.add(a); + } + } + }); + producerThreads.add(t); + } + + for (Thread t : producerThreads) { + t.start(); + } + for (Thread t : producerThreads) { + t.join(); + } + + long start = currentTimeMillis(); + while (!allActions.isEmpty()) { + Iterator it = allActions.iterator(); + while (it.hasNext()) { + if (it.next().finished) { + it.remove(); + } + } + if (currentTimeMillis() - start > 10000) { + fail("Following actions hasn't been executed: " + allActions); + } + } + + dispatcher.stop(); + queueThread.join(); + assertFalse(queueThread.isAlive()); + } + + @Test + public void testExecuteInvalidatesOnShutdown() throws InterruptedException { + Map cacheMap = new HashMap(); + CacheActionDispatcher dispatcher = new CacheActionDispatcher(); + CacheWriteQueue queue = new CacheWriteQueue(dispatcher, + Mockito.mock(PersistentCache.class), cacheMap); + Thread queueThread = new Thread(dispatcher); + queueThread.start(); + + cacheMap.put("2", new Object()); + cacheMap.put("3", new Object()); + cacheMap.put("4", new Object()); + dispatcher.add(new DummyCacheWriteAction("1", queue, 100)); + dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("2"))); + dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("3"))); + dispatcher.add(new InvalidateCacheAction(queue, Collections.singleton("4"))); + Thread.sleep(10); // make sure the first action started + + dispatcher.stop(); + assertEquals(of("2", "3", "4"), cacheMap.keySet()); + + queueThread.join(); + assertTrue(cacheMap.isEmpty()); + } + + private DummyCacheWriteAction createWriteAction(String id, CacheWriteQueue queue) { + return new DummyCacheWriteAction(id, queue); + } + + private class DummyCacheWriteAction implements CacheAction { + + private final CacheWriteQueue queue; + + private final String id; + + private final long delay; + + private volatile boolean finished; + + private DummyCacheWriteAction(String id, CacheWriteQueue queue) { + this(id, queue, new Random().nextInt(10)); + } + + private DummyCacheWriteAction(String id, CacheWriteQueue queue, long delay) { + this.queue = queue; + this.id = id; + this.delay = delay; + } + + @Override + public void execute() { + try { + sleep(delay); + } catch (InterruptedException e) { + fail("Interrupted"); + } + finished = true; + } + + @Override + public void cancel() { + } + + @Override + public String toString() { + return id; + } + + @Override + public Iterable getAffectedKeys() { + return Collections.singleton(id); + } + + @Override + public CacheWriteQueue getOwner() { + return queue; + } + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java new file mode 100644 index 0000000..34a9dd0 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/async/CacheWriteQueueTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.async; + +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheAction; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheActionDispatcher; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.async.CacheWriteQueue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class CacheWriteQueueTest { + + private CacheWriteQueue queue; + + @SuppressWarnings("rawtypes") + private List actions = Collections.synchronizedList(new ArrayList()); + + @Before + public void initQueue() { + actions.clear(); + + CacheActionDispatcher dispatcher = new CacheActionDispatcher() { + public void add(CacheAction action) { + actions.add(action); + } + }; + + PersistentCache cache = Mockito.mock(PersistentCache.class); + queue = new CacheWriteQueue(dispatcher, cache, null); + } + + @Test + public void testCounters() throws InterruptedException { + final int threadCount = 10; + final int actionsPerThread = 50; + + final Map counters = new HashMap(); + for (int i = 0; i < 10; i++) { + String key = "key_" + i; + counters.put(key, new AtomicInteger()); + } + + final Random random = new Random(); + List threads = new ArrayList(); + for (int i = 0; i < threadCount; i++) { + Thread t = new Thread(new Runnable() { + @Override + public void run() { + for (int j = 0; j < actionsPerThread; j++) { + for (String key : counters.keySet()) { + if (random.nextBoolean()) { + queue.addPut(key, null); + } else { + queue.addPut(key, new Object()); + } + counters.get(key).incrementAndGet(); + } + } + } + }); + threads.add(t); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + for (String key : counters.keySet()) { + assertEquals(queue.queuedKeys.count(key), counters.get(key).get()); + } + + for (CacheAction action : actions) { + if (random.nextBoolean()) { + action.execute(); + } else { + action.cancel(); + } + } + + assertTrue(queue.queuedKeys.isEmpty()); + assertTrue(queue.waitsForInvalidation.isEmpty()); + } + + @Test + public void testWaitsForInvalidation() { + assertFalse(queue.waitsForInvalidation("key")); + + queue.addInvalidate(singleton("key")); + assertTrue(queue.waitsForInvalidation("key")); + + queue.addPut("key", new Object()); + assertFalse(queue.waitsForInvalidation("key")); + + queue.addInvalidate(singleton("key")); + assertTrue(queue.waitsForInvalidation("key")); + + int i; + for (i = 0; i < actions.size() - 1; i++) { + actions.get(i).execute(); + assertTrue(queue.waitsForInvalidation("key")); + } + + actions.get(i).execute(); + assertFalse(queue.waitsForInvalidation("key")); + } + +}