diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java new file mode 100644 index 0000000..291f1e0 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.cache; + +import com.google.common.base.Predicate; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnel; +import com.google.common.hash.PrimitiveSink; + +import java.util.List; + +public class CacheChangesTracker { + + private final List changeTrackers; + + private final Predicate keyFilter; + + private final LazyBloomFilter lazyBloomFilter; + + CacheChangesTracker(Predicate keyFilter, List changeTrackers) { + this.changeTrackers = changeTrackers; + this.keyFilter = keyFilter; + this.lazyBloomFilter = new LazyBloomFilter(); + } + + public void putDocument(String key) { + if (keyFilter.apply(key)) { + lazyBloomFilter.put(key); + } + } + + public void invalidateDocument(String key) { + if (keyFilter.apply(key)) { + lazyBloomFilter.put(key); + } + } + + public boolean mightBeenAffected(String key) { + return keyFilter.apply(key) && lazyBloomFilter.mightContain(key); + } + + public void close() { + changeTrackers.remove(this); + } + + public static class LazyBloomFilter { + + private static final double FPP = 0.01d; + + private static final int ENTRIES = 1000; + + private volatile BloomFilter filter; + + public void put(String entry) { + getFilter().put(entry); + } + + public boolean mightContain(String entry) { + if (filter == null) { + return false; + } else { + return filter.mightContain(entry); + } + } + + private BloomFilter getFilter() { + if (filter == null) { + synchronized (this) { + if (filter == null) { + filter = BloomFilter.create(new Funnel() { + private static final long serialVersionUID = -7114267990225941161L; + + @Override + public void funnel(String from, PrimitiveSink into) { + into.putUnencodedChars(from); + } + }, ENTRIES, FPP); + } + } + } + return filter; + } + + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/NodeDocumentCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/NodeDocumentCache.java index d5d5ed6..8f89eb3 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/NodeDocumentCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/NodeDocumentCache.java @@ -18,16 +18,23 @@ package org.apache.jackrabbit.oak.plugins.document.cache; import java.io.Closeable; import java.io.IOException; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.Lock; import javax.annotation.CheckForNull; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.jackrabbit.oak.cache.CacheStats; @@ -36,10 +43,12 @@ import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks; import org.apache.jackrabbit.oak.plugins.document.util.StringValue; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; import com.google.common.base.Objects; import com.google.common.cache.Cache; +import static java.util.Collections.synchronizedSet; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isLeafPreviousDocId; /** @@ -61,6 +70,8 @@ public class NodeDocumentCache implements Closeable { private final NodeDocumentLocks locks; + private final List changeTrackers; + public NodeDocumentCache(@Nonnull Cache nodeDocumentsCache, @Nonnull CacheStats nodeDocumentsCacheStats, @Nonnull Cache prevDocumentsCache, @@ -71,6 +82,7 @@ public class NodeDocumentCache implements Closeable { this.prevDocumentsCache = prevDocumentsCache; this.prevDocumentsCacheStats = prevDocumentsCacheStats; this.locks = locks; + this.changeTrackers = new CopyOnWriteArrayList(); } /** @@ -86,6 +98,10 @@ public class NodeDocumentCache implements Closeable { } else { nodeDocumentsCache.invalidate(new StringValue(key)); } + + for (CacheChangesTracker tracker : changeTrackers) { + tracker.invalidateDocument(key); + } } finally { lock.unlock(); } @@ -319,6 +335,79 @@ public class NodeDocumentCache implements Closeable { } } + /** + * Registers a new CacheChangesTracker that records all puts and + * invalidations related to children of the given parent. + * + * @param parentId children of this parent will be tracked + * @return new tracker + */ + public CacheChangesTracker registerTracker(final String parentId) { + return new CacheChangesTracker(new Predicate() { + @Override + public boolean apply(@Nullable String input) { + return input != null && parentId.equals(Utils.getParentId(input)); + } + }, changeTrackers); + } + + /** + * Registers a new CacheChangesTracker that records all puts and + * invalidations related to the given documents + * + * @param keys these documents will be tracked + * @return new tracker + */ + public CacheChangesTracker registerTracker(final Set keys) { + return new CacheChangesTracker(new Predicate() { + @Override + public boolean apply(@Nullable String input) { + return input != null && keys.contains(input); + } + }, changeTrackers); + } + + /** + * Updates the cache with all the documents that: + * + * (1) currently have their older versions in the cache or + * (2) have been neither put nor invalidated during the tracker lifetime. + * + * We can't cache documents that has been invalidated during the tracker + * lifetime, as it's possible that the invalidated version was newer than + * the one passed in the docs parameter. + * + * If the document has been added during the tracker lifetime, but it is not + * present in the cache anymore, it means it may have been evicted, so we + * can't re-add it for the same reason as above. + * + * @param tracker + * used to decide whether the docs should be put into cache + * @param docs + * to put into cache + */ + public void putNonConflictingDocs(CacheChangesTracker tracker, List docs) { + for (NodeDocument d : docs) { + if (d == null || d == NodeDocument.NULL) { + continue; + } + String id = d.getId(); + Lock lock = locks.acquire(id); + try { + // if an old document is present in the cache, we can simply update it + if (getIfPresent(id) != null) { + putIfNewer(d); + // if the document hasn't been invalidated or added during the tracker lifetime, + // we can put it as well + } else if (!tracker.mightBeenAffected(id)) { + putIfNewer(d); + } + } finally { + lock.unlock(); + } + } + } + //----------------------------< internal >---------------------------------- /** @@ -332,5 +421,8 @@ public class NodeDocumentCache implements Closeable { } else { nodeDocumentsCache.put(new StringValue(doc.getId()), doc); } + for (CacheChangesTracker tracker : changeTrackers) { + tracker.putDocument(doc.getId()); + } } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/BulkReadWriteLock.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/BulkReadWriteLock.java deleted file mode 100644 index eb496ce..0000000 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/BulkReadWriteLock.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.jackrabbit.oak.plugins.document.locks; - -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; - -/** - * This class exposes a list of ReadWriteLocks as a single Lock instance. - */ -class BulkReadWriteLock implements ReadWriteLock { - - private Iterable locks; - - public BulkReadWriteLock(Iterable locks) { - this.locks = locks; - } - - @Override - public Lock readLock() { - return new BulkLock(Iterables.transform(locks, new Function() { - @Override - public Lock apply(ReadWriteLock input) { - return input.readLock(); - } - })); - } - - @Override - public Lock writeLock() { - return new BulkLock(Iterables.transform(locks, new Function() { - @Override - public Lock apply(ReadWriteLock input) { - return input.writeLock(); - } - })); - } - -} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java index a051edf..4e521c3 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java @@ -23,6 +23,7 @@ import static com.google.common.collect.Iterables.filter; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import org.apache.jackrabbit.oak.plugins.document.util.Utils; @@ -39,6 +40,11 @@ public class StripedNodeDocumentLocks implements NodeDocumentLocks { private final Striped locks = Striped.lock(4096); private final Lock rootLock = Striped.lock(1).get(ROOT); + /** + * Counts how many times {@link TreeLock}s were acquired. + */ + private volatile AtomicLong lockAcquisitionCounter; + @Override public Lock acquire(String key) { Lock lock = ROOT.equals(key) ? rootLock : locks.get(key); @@ -59,4 +65,15 @@ public class StripedNodeDocumentLocks implements NodeDocumentLocks { lock.lock(); return lock; } + + public void resetLockAcquisitionCount() { + lockAcquisitionCounter = new AtomicLong(); + } + + public long getLockAcquisitionCount() { + if (lockAcquisitionCounter == null) { + throw new IllegalStateException("The counter hasn't been initialized"); + } + return lockAcquisitionCounter.get(); + } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java deleted file mode 100644 index 5d16a50..0000000 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.jackrabbit.oak.plugins.document.locks; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; - -import javax.annotation.Nonnull; - -import org.apache.jackrabbit.oak.plugins.document.util.Utils; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Striped; - -public class TreeNodeDocumentLocks implements NodeDocumentLocks { - - /** - * Locks to ensure cache consistency on reads, writes and invalidation. - */ - private final Striped locks = Striped.lock(4096); - - /** - * ReadWriteLocks to synchronize cache access when child documents are - * requested from MongoDB and put into the cache. Accessing a single - * document in the cache will acquire a read (shared) lock for the parent - * key in addition to the lock (from {@link #locks}) for the individual - * document. Reading multiple sibling documents will acquire a write - * (exclusive) lock for the parent key. See OAK-1897. - */ - private final Striped parentLocks = Striped.readWriteLock(2048); - - /** - * Counts how many times {@link TreeLock}s were acquired. - */ - private volatile AtomicLong lockAcquisitionCounter; - - /** - * Acquires a lock for the given key. The returned tree lock will also hold - * a shared lock on the parent key. - * - * @param key a key. - * @return the acquired lock for the given key. - */ - @Override - public TreeLock acquire(String key) { - if (lockAcquisitionCounter != null) { - lockAcquisitionCounter.incrementAndGet(); - } - TreeLock lock = TreeLock.shared(parentLocks.get(getParentId(key)), locks.get(key)); - lock.lock(); - return lock; - } - - /** - * This implementation creates two sequences of locks (for the keys and for - * the their parents) using {@link #locks} and {@link #parentLocks}. Then - * all parent locks are acquired first and in a second step the locks for - * the actual keys. - *

- * Since we only acquire a parentLock.read, there's no danger of - * deadlock caused by interleaving locks from two different stripes by two - * threads. The only place where the parentLock.write is acquired is the - * {@link #acquireExclusive(String)} and that method doesn't acquire locks in bulk. - */ - @Override - public Lock acquire(Collection keys) { - if (lockAcquisitionCounter != null) { - lockAcquisitionCounter.addAndGet(keys.size()); - } - - Iterable parentKeys = Iterables.transform(keys, new Function() { - @Override - public String apply(String keys) { - return getParentId(keys); - } - }); - - ReadWriteLock bulkParentLock = new BulkReadWriteLock(parentLocks.bulkGet(parentKeys)); - Lock bulkChildrenLock = new BulkLock(locks.bulkGet(keys)); - - Lock lock = TreeLock.shared(bulkParentLock, bulkChildrenLock); - lock.lock(); - return lock; - } - - /** - * Acquires an exclusive lock on the given parent key. Use this method to - * block cache access for child keys of the given parent key. - * - * @param parentKey the parent key. - * @return the acquired lock for the given parent key. - */ - public TreeLock acquireExclusive(String parentKey) { - if (lockAcquisitionCounter != null) { - lockAcquisitionCounter.incrementAndGet(); - } - TreeLock lock = TreeLock.exclusive(parentLocks.get(parentKey)); - lock.lock(); - return lock; - } - - /** - * Returns the parent id for the given id. An empty String is returned if - * the given value is the id of the root document or the id for a long path. - * - * @param id an id for a document. - * @return the id of the parent document or the empty String. - */ - @Nonnull - private static String getParentId(@Nonnull String id) { - String parentId = Utils.getParentId(checkNotNull(id)); - if (parentId == null) { - parentId = ""; - } - return parentId; - } - - public void resetLockAcquisitionCount() { - lockAcquisitionCounter = new AtomicLong(); - } - - public long getLockAcquisitionCount() { - if (lockAcquisitionCounter == null) { - throw new IllegalStateException("The counter hasn't been initialized"); - } - return lockAcquisitionCounter.get(); - } - - private final static class TreeLock implements Lock { - - private final Lock parentLock; - - private final Lock lock; - - private TreeLock(Lock parentLock, Lock lock) { - this.parentLock = parentLock; - this.lock = lock; - } - - private static TreeLock shared(ReadWriteLock parentLock, Lock lock) { - return new TreeLock(parentLock.readLock(), lock); - } - - private static TreeLock exclusive(ReadWriteLock parentLock) { - return new TreeLock(parentLock.writeLock(), null); - } - - @Override - public void lock() { - parentLock.lock(); - if (lock != null) { - lock.lock(); - } - } - - @Override - public void unlock() { - if (lock != null) { - lock.unlock(); - } - parentLock.unlock(); - } - - @Override - public void lockInterruptibly() throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean tryLock() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public Condition newCondition() { - throw new UnsupportedOperationException(); - } - } - -} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index 7e1e5dc..4b085ae 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -47,7 +47,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.UncheckedExecutionException; import com.mongodb.MongoClientURI; -import com.mongodb.MongoExecutionTimeoutException; import com.mongodb.QueryOperators; import com.mongodb.ReadPreference; @@ -68,9 +67,10 @@ import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; +import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; -import org.apache.jackrabbit.oak.plugins.document.locks.TreeNodeDocumentLocks; +import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.stats.Clock; import org.apache.jackrabbit.oak.util.PerfLogger; @@ -135,7 +135,7 @@ public class MongoDocumentStore implements DocumentStore { private final NodeDocumentCache nodesCache; - private final TreeNodeDocumentLocks nodeLocks; + private final StripedNodeDocumentLocks nodeLocks; private Clock clock = Clock.SIMPLE; @@ -170,17 +170,6 @@ public class MongoDocumentStore implements DocumentStore { Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1)); /** - * Duration in milliseconds after a mongo query with an additional - * constraint (e.g. _modified) on the NODES collection times out and is - * executed again without holding a {@link TreeNodeDocumentLocks.TreeLock} - * and without updating the cache with data retrieved from MongoDB. - *

- * Default is 3000 (three seconds). - */ - private long maxLockedQueryTimeMS = - Long.getLong("oak.mongo.maxLockedQueryTimeMS", TimeUnit.SECONDS.toMillis(3)); - - /** * The number of documents to put into one bulk update. *

* Default is 30. @@ -240,7 +229,7 @@ public class MongoDocumentStore implements DocumentStore { // index on _modified for journal entries createIndex(journal, JournalEntry.MODIFIED, true, false, false); - this.nodeLocks = new TreeNodeDocumentLocks(); + this.nodeLocks = new StripedNodeDocumentLocks(); this.nodesCache = builder.buildNodeDocumentCache(this, nodeLocks); LOG.info("Configuration maxReplicationLagMillis {}, " + @@ -523,27 +512,11 @@ public class MongoDocumentStore implements DocumentStore { String indexedProperty, long startValue, int limit) { - boolean withLock = true; - if (collection == Collection.NODES && indexedProperty != null) { - long maxQueryTime; - if (maxQueryTimeMS > 0) { - maxQueryTime = Math.min(maxQueryTimeMS, maxLockedQueryTimeMS); - } else { - maxQueryTime = maxLockedQueryTimeMS; - } - try { - return queryInternal(collection, fromKey, toKey, indexedProperty, - startValue, limit, maxQueryTime, true); - } catch (MongoExecutionTimeoutException e) { - LOG.info("query timed out after {} milliseconds and will be retried without lock {}", - maxQueryTime, Lists.newArrayList(fromKey, toKey, indexedProperty, startValue, limit)); - withLock = false; - } - } return queryInternal(collection, fromKey, toKey, indexedProperty, - startValue, limit, maxQueryTimeMS, withLock); + startValue, limit, maxQueryTimeMS); } + @SuppressWarnings("unchecked") @Nonnull List queryInternal(Collection collection, String fromKey, @@ -551,8 +524,7 @@ public class MongoDocumentStore implements DocumentStore { String indexedProperty, long startValue, int limit, - long maxQueryTime, - boolean withLock) { + long maxQueryTime) { log("query", fromKey, toKey, indexedProperty, startValue, limit); DBCollection dbCollection = getDBCollection(collection); QueryBuilder queryBuilder = QueryBuilder.start(Document.ID); @@ -584,11 +556,14 @@ public class MongoDocumentStore implements DocumentStore { String parentId = Utils.getParentIdFromLowerLimit(fromKey); long lockTime = -1; final Stopwatch watch = startWatch(); - Lock lock = withLock ? nodeLocks.acquireExclusive(parentId != null ? parentId : "") : null; + boolean isSlaveOk = false; int resultSize = 0; + CacheChangesTracker cacheChangesTracker = null; + if (parentId != null && collection == Collection.NODES) { + cacheChangesTracker = nodesCache.registerTracker(parentId); + } try { - lockTime = withLock ? watch.elapsed(TimeUnit.MILLISECONDS) : -1; DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC); if (!disableIndexHint && !hasModifiedIdCompoundIndex) { cursor.hint(hint); @@ -613,21 +588,21 @@ public class MongoDocumentStore implements DocumentStore { for (int i = 0; i < limit && cursor.hasNext(); i++) { DBObject o = cursor.next(); T doc = convertFromDBObject(collection, o); - if (collection == Collection.NODES - && doc != null - && lock != null) { - nodesCache.putIfNewer((NodeDocument) doc); - } list.add(doc); } resultSize = list.size(); } finally { cursor.close(); } + + if (cacheChangesTracker != null) { + nodesCache.putNonConflictingDocs(cacheChangesTracker, (List) list); + } + return list; } finally { - if (lock != null) { - lock.unlock(); + if (cacheChangesTracker != null) { + cacheChangesTracker.close(); } stats.doneQuery(watch.elapsed(TimeUnit.NANOSECONDS), collection, fromKey, toKey, indexedProperty != null , resultSize, lockTime, isSlaveOk); @@ -1525,10 +1500,6 @@ public class MongoDocumentStore implements DocumentStore { this.clock = clock; } - void setMaxLockedQueryTimeMS(long maxLockedQueryTimeMS) { - this.maxLockedQueryTimeMS = maxLockedQueryTimeMS; - } - void resetLockAcquisitionCount() { nodeLocks.resetLockAcquisitionCount(); } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java index 409b3ca..625f7d8 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java @@ -19,30 +19,14 @@ package org.apache.jackrabbit.oak.plugins.document.locks; import static org.junit.Assert.fail; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.concurrent.locks.Lock; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -@RunWith(Parameterized.class) public class NodeDocumentLocksTest { - @Parameters(name = "{1}") - public static Collection data() { - return Arrays.asList(new Object[][] { { new StripedNodeDocumentLocks(), "StripedNodeDocumentLocks" }, - { new TreeNodeDocumentLocks(), "TreeNodeDocumentLocks" } }); - } - - private final NodeDocumentLocks locks; - - public NodeDocumentLocksTest(NodeDocumentLocks locks, String name) { - this.locks = locks; - } + private final NodeDocumentLocks locks = new StripedNodeDocumentLocks(); @Test public void testBulkAcquireNonConflicting() throws InterruptedException { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentsLocksTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentsLocksTest.java deleted file mode 100644 index 194076c..0000000 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentsLocksTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.jackrabbit.oak.plugins.document.locks; - -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.Lock; - -import org.junit.Test; - -public class TreeNodeDocumentsLocksTest { - - /** - * Test for the OAK-3949. It uses multiple threads to acquire a shared lock - * collection. There's also another set of threads that acquires parent - * locks exclusively. Such combination can't lead to a deadlock. - */ - @Test - public void testBulkAcquire() throws InterruptedException { - int threadCount = 10; - int locksCount = 100; - - final TreeNodeDocumentLocks locks = new TreeNodeDocumentLocks(); - List threads = new ArrayList(); - for (int i = 0; i < threadCount; i++) { - final List keys = new ArrayList(locksCount); - for (int j = 0; j < locksCount; j++) { - keys.add(String.format("2:/parent_%d/lock_%d", j, j)); - } - threads.add(new Thread(new Runnable() { - @Override - public void run() { - Lock lock = locks.acquire(keys); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } finally { - lock.unlock(); - } - } - })); - } - - for (int j = 0; j < threadCount; j++) { - final int from = j * 10; - final int to = (j + 1) * 10; - threads.add(new Thread(new Runnable() { - @Override - public void run() { - try { - for (int i = from; i < to; i++) { - Lock parentLock = locks.acquireExclusive(String.format("1:/parent_%d", i)); - Thread.sleep(100); - Lock childLock = locks.acquire(String.format("2:/parent_%d/lock_%d", i, i)); - Thread.sleep(100); - childLock.unlock(); - Thread.sleep(100); - parentLock.unlock(); - } - } catch (InterruptedException e) { - } - } - })); - } - - Collections.shuffle(threads); - for (Thread t : threads) { - t.start(); - } - - for (Thread t : threads) { - t.join(10000); - if (t.isAlive()) { - fail("Thread hasn't stopped in 10s"); - } - } - } -} \ No newline at end of file diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java index e1d9d6e..b3d817f 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java @@ -16,11 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.document.mongo; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.annotation.Nonnull; - import com.mongodb.DB; import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest; @@ -30,14 +25,11 @@ import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.JournalEntry; import org.apache.jackrabbit.oak.plugins.document.MongoUtils; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; -import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.junit.Test; import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoUtils.hasIndex; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * MongoDocumentStoreTest... @@ -57,32 +49,6 @@ public class MongoDocumentStoreTest extends AbstractMongoConnectionTest { } @Test - public void timeoutQuery() { - String fromId = Utils.getKeyLowerLimit("/"); - String toId = Utils.getKeyUpperLimit("/"); - store.setMaxLockedQueryTimeMS(1); - long index = 0; - for (int i = 0; i < 100; i++) { - // keep adding nodes until the query runs into the timeout - StringBuilder sb = new StringBuilder(); - for (int j = 0; j < 1000; j++) { - sb.append("+\"node-").append(index++).append("\":{}"); - } - mk.commit("/", sb.toString(), null, null); - store.queriesWithoutLock.set(0); - store.resetLockAcquisitionCount(); - List docs = store.query(Collection.NODES, fromId, toId, - "foo", System.currentTimeMillis(), Integer.MAX_VALUE); - assertTrue(docs.isEmpty()); - if (store.queriesWithoutLock.get() > 0) { - assertEquals(1, store.getLockAcquisitionCount()); - return; - } - } - fail("No query timeout triggered even after adding " + index + " nodes"); - } - - @Test public void defaultIndexes() { assertTrue(hasIndex(store.getDBCollection(Collection.NODES), Document.ID)); assertTrue(hasIndex(store.getDBCollection(Collection.NODES), NodeDocument.SD_TYPE)); @@ -94,28 +60,8 @@ public class MongoDocumentStoreTest extends AbstractMongoConnectionTest { } static final class TestStore extends MongoDocumentStore { - - AtomicInteger queriesWithoutLock = new AtomicInteger(); - TestStore(DB db, DocumentMK.Builder builder) { super(db, builder); } - - @Nonnull - @Override - List queryInternal(Collection collection, - String fromKey, - String toKey, - String indexedProperty, - long startValue, - int limit, - long maxQueryTime, - boolean withLock) { - if (collection == Collection.NODES && !withLock) { - queriesWithoutLock.incrementAndGet(); - } - return super.queryInternal(collection, fromKey, toKey, - indexedProperty, startValue, limit, maxQueryTime, withLock); - } } }