diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java index cdaba3d..d0dbad1 100755 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java @@ -76,6 +76,7 @@ import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; +import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks; @@ -89,9 +90,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.hash.BloomFilter; -import com.google.common.hash.Funnel; -import com.google.common.hash.PrimitiveSink; /** * Implementation of {@link DocumentStore} for relational databases. @@ -665,6 +663,15 @@ public class RDBDocumentStore implements DocumentStore { } } + private T getIfCached(Collection collection, String id, long modCount) { + T doc = getIfCached(collection, id); + if (doc != null && doc.getModCount() == modCount) { + return doc; + } else { + return null; + } + } + @Override public Iterable getCacheStats() { return nodesCache.getCacheStats(); @@ -1322,7 +1329,6 @@ public class RDBDocumentStore implements DocumentStore { for (List chunkedIds : Lists.partition(ids, CHUNKSIZE)) { - Set seenQueryContext = Collections.emptySet(); Map cachedDocs = Collections.emptyMap(); if (collection == Collection.NODES) { @@ -1330,17 +1336,7 @@ public class RDBDocumentStore implements DocumentStore { cachedDocs = new HashMap(); for (String key : chunkedIds) { cachedDocs.put(key, nodesCache.getIfPresent(key)); - } - - // keep concurrently running queries from updating - // the cache entry for this key - seenQueryContext = new HashSet(); - for (QueryContext qc : qmap.values()) { - qc.addKeys(chunkedIds); - seenQueryContext.add(qc); - } - for (String id : chunkedIds) { - nodesCache.invalidate(id); + nodesCache.invalidate(key); } } @@ -1363,13 +1359,6 @@ public class RDBDocumentStore implements DocumentStore { } if (success) { if (collection == Collection.NODES) { - // keep concurrently running queries from updating - // the cache entry for this key - for (QueryContext qc : qmap.values()) { - if (!seenQueryContext.contains(qc)) { - qc.addKeys(chunkedIds); - } - } for (String id : chunkedIds) { nodesCache.invalidate(id); } @@ -1391,72 +1380,6 @@ public class RDBDocumentStore implements DocumentStore { } } - /** - * Class used to track which documents may have been updated since the start - * of the query and thus may not put into the cache. - */ - private class QueryContext { - - private static final double FPP = 0.01d; - private static final int ENTRIES_SCOPED = 1000; - private static final int ENTRIES_OPEN = 10000; - - private final String fromKey, toKey; - private volatile BloomFilter filter = null; - - private BloomFilter getFilter() { - if (filter == null) { - synchronized (this) { - if (filter == null) { - filter = BloomFilter.create(new Funnel() { - private static final long serialVersionUID = -7114267990225941161L; - - @Override - public void funnel(String from, PrimitiveSink into) { - into.putUnencodedChars(from); - } - }, toKey.equals(NodeDocument.MAX_ID_VALUE) ? ENTRIES_OPEN : ENTRIES_SCOPED, FPP); - } - } - } - return filter; - } - - public QueryContext(String fromKey, String toKey) { - this.fromKey = fromKey; - this.toKey = toKey; - } - - public void addKey(String key) { - if (fromKey.compareTo(key) < 0 && toKey.compareTo(key) > 0) { - getFilter().put(key); - } - } - - public void addKeys(List keys) { - for (String key: keys) { - addKey(key); - } - } - - public boolean mayUpdate(String key) { - return filter == null ? true : !getFilter().mightContain(key); - } - - synchronized public void dispose() { - if (LOG.isDebugEnabled()) { - if (filter != null) { - LOG.debug("Disposing QueryContext for range " + fromKey + "..." + toKey + " - filter fpp was: " - + filter.expectedFpp()); - } else { - LOG.debug("Disposing QueryContext for range " + fromKey + "..." + toKey + " - no filter was needed"); - } - } - } - } - - private Map qmap = new ConcurrentHashMap(); - private List internalQuery(Collection collection, String fromKey, String toKey, List excludeKeyPatterns, List conditions, int limit) { Connection connection = null; @@ -1472,12 +1395,11 @@ public class RDBDocumentStore implements DocumentStore { final Stopwatch watch = startWatch(); int resultSize = 0; + CacheChangesTracker tracker = null; try { long now = System.currentTimeMillis(); - QueryContext qp = null; if (collection == Collection.NODES) { - qp = new QueryContext(fromKey, toKey); - qmap.put(Thread.currentThread(), qp); + tracker = nodesCache.registerTracker(fromKey, toKey); } connection = this.ch.getROConnection(); String from = collection == Collection.NODES && NodeDocument.MIN_ID_VALUE.equals(fromKey) ? null : fromKey; @@ -1489,19 +1411,24 @@ public class RDBDocumentStore implements DocumentStore { List result = new ArrayList(size); for (int i = 0; i < size; i++) { RDBRow row = dbresult.set(i, null); // free RDBRow ASAP - T doc = runThroughCache(collection, row, now, qp); + T doc = getIfCached(collection, row.getId(), row.getModcount()); + if (doc == null) { + doc = convertFromDBObject(collection, row); + } result.add(doc); } - resultSize = result.size(); - if (qp != null) { - qp.dispose(); + if (collection == Collection.NODES) { + nodesCache.putNonConflictingDocs(tracker, (List) result); } + resultSize = result.size(); return result; } catch (Exception ex) { LOG.error("SQL exception on query", ex); throw new DocumentStoreException(ex); } finally { - qmap.remove(Thread.currentThread()); + if (tracker != null) { + tracker.close(); + } this.ch.closeConnection(connection); stats.doneQuery(watch.elapsed(TimeUnit.NANOSECONDS), collection, fromKey, toKey, !conditions.isEmpty(), resultSize, -1, false); @@ -1864,43 +1791,6 @@ public class RDBDocumentStore implements DocumentStore { return ser.fromRow(collection, row); } - private T runThroughCache(Collection collection, RDBRow row, long now, QueryContext qp) { - - if (collection != Collection.NODES) { - // not in the cache anyway - return convertFromDBObject(collection, row); - } - - String id = row.getId(); - NodeDocument inCache = nodesCache.getIfPresent(id); - Long modCount = row.getModcount(); - - // do not overwrite document in cache if the - // existing one in the cache is newer - if (inCache != null && inCache != NodeDocument.NULL) { - // check mod count - Long cachedModCount = inCache.getModCount(); - if (cachedModCount == null) { - throw new IllegalStateException("Missing " + Document.MOD_COUNT); - } - if (modCount <= cachedModCount) { - // we can use the cached document - inCache.markUpToDate(now); - return castAsT(inCache); - } - } - - NodeDocument fresh = (NodeDocument) convertFromDBObject(collection, row); - fresh.seal(); - - if (!qp.mayUpdate(id)) { - return castAsT(fresh); - } - - nodesCache.putIfNewer(fresh); - return castAsT(fresh); - } - private static boolean hasChangesToCollisions(UpdateOp update) { if (!USECMODCOUNT) { return false;