diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/BulkLock.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/BulkLock.java new file mode 100644 index 0000000..7fbcf74 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/BulkLock.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.locks; + +import static com.google.common.collect.Lists.reverse; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * This class exposes a list of locks as a single Lock instance. + */ +class BulkLock implements Lock { + + private final List locks; + + public BulkLock(List locks) { + this.locks = locks; + } + + @Override + public void lock() { + for (Lock l : locks) { + l.lock(); + } + } + + public void unlock() { + for (Lock l : reverse(locks)) { + l.unlock(); + } + } + + @Override + public void lockInterruptibly() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public Condition newCondition() { + throw new UnsupportedOperationException(); + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocks.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocks.java index 26ce4c6..751a88d 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocks.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocks.java @@ -16,16 +16,25 @@ */ package org.apache.jackrabbit.oak.plugins.document.locks; +import java.util.Collection; import java.util.concurrent.locks.Lock; public interface NodeDocumentLocks { /** - * Acquires a log for the given key. + * Acquires a lock for the given key. * * @param key a key. * @return the acquired lock for the given key. */ Lock acquire(String key); + /** + * Acquires locks for the given keys. Locks are sorted before the operation + * to avoid deadlocks. + * + * @param keys keys + * @return the object wrapping the acquired locks + */ + Lock acquire(Collection keys); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java index 850e3bd..a051edf 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/StripedNodeDocumentLocks.java @@ -16,21 +16,46 @@ */ package org.apache.jackrabbit.oak.plugins.document.locks; +import static com.google.common.base.Predicates.equalTo; +import static com.google.common.base.Predicates.not; +import static com.google.common.collect.Iterables.filter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.locks.Lock; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; + import com.google.common.util.concurrent.Striped; public class StripedNodeDocumentLocks implements NodeDocumentLocks { + private static final String ROOT = Utils.getIdFromPath("/"); + /** * Locks to ensure cache consistency on reads, writes and invalidation. */ private final Striped locks = Striped.lock(4096); - private final Lock rootLock = Striped.lock(1).get("0:/"); + private final Lock rootLock = Striped.lock(1).get(ROOT); @Override public Lock acquire(String key) { - Lock lock = "0:/".equals(key) ? rootLock : locks.get(key); + Lock lock = ROOT.equals(key) ? rootLock : locks.get(key); + lock.lock(); + return lock; + } + + @Override + public Lock acquire(Collection keys) { + List lockList = new ArrayList(); + if (keys.contains(ROOT)) { + lockList.add(rootLock); + } + for (Lock l : locks.bulkGet(filter(keys, not(equalTo(ROOT))))) { + lockList.add(l); + } + Lock lock = new BulkLock(lockList); lock.lock(); return lock; } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java index fa7cc12..3a75f7b 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/locks/TreeNodeDocumentLocks.java @@ -18,6 +18,10 @@ package org.apache.jackrabbit.oak.plugins.document.locks; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -28,6 +32,8 @@ import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Striped; public class TreeNodeDocumentLocks implements NodeDocumentLocks { @@ -53,7 +59,7 @@ public class TreeNodeDocumentLocks implements NodeDocumentLocks { private volatile AtomicLong lockAcquisitionCounter; /** - * Acquires a log for the given key. The returned tree lock will also hold + * Acquires a lock for the given key. The returned tree lock will also hold * a shared lock on the parent key. * * @param key a key. @@ -70,6 +76,42 @@ public class TreeNodeDocumentLocks implements NodeDocumentLocks { } /** + * This implementation creates two sequences of locks (for the keys and for + * the their parents) using {@link #locks} and {@link #parentLocks}. Then + * both sequences are zipped into pairs (parentLock, lock) and passed to the + * {@link TreeLock#shared(ReadWriteLock, Lock)}. After that all tree locks + * are acquired. + *

+ * Since we only acquire a parentLock.read, there's no danger of + * deadlock caused by interleaving locks from two different stripes by two + * threads. The only place where the parentLock.write is acquired is the + * {@link #acquireExclusive(String)} and that method doesn't acquire locks in bulk. + */ + @Override + public Lock acquire(Collection keys) { + if (lockAcquisitionCounter != null) { + lockAcquisitionCounter.addAndGet(keys.size()); + } + + Iterable parentKeys = Iterables.transform(keys, new Function() { + @Override + public String apply(String keys) { + return getParentId(keys); + } + }); + Iterator lockIt = locks.bulkGet(keys).iterator(); + Iterator parentLockIt = parentLocks.bulkGet(parentKeys).iterator(); + + List acquired = new ArrayList(keys.size()); + while (lockIt.hasNext()) { + acquired.add(TreeLock.shared(parentLockIt.next(), lockIt.next())); + } + Lock lock = new BulkLock(acquired); + lock.lock(); + return lock; + } + + /** * Acquires an exclusive lock on the given parent key. Use this method to * block cache access for child keys of the given parent key. * diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java new file mode 100644 index 0000000..409b3ca --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/locks/NodeDocumentLocksTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.locks; + +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.Lock; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class NodeDocumentLocksTest { + + @Parameters(name = "{1}") + public static Collection data() { + return Arrays.asList(new Object[][] { { new StripedNodeDocumentLocks(), "StripedNodeDocumentLocks" }, + { new TreeNodeDocumentLocks(), "TreeNodeDocumentLocks" } }); + } + + private final NodeDocumentLocks locks; + + public NodeDocumentLocksTest(NodeDocumentLocks locks, String name) { + this.locks = locks; + } + + @Test + public void testBulkAcquireNonConflicting() throws InterruptedException { + testBulkAcquire(false); + } + + @Test + public void testBulkAcquireConflicting() throws InterruptedException { + testBulkAcquire(true); + } + + private void testBulkAcquire(boolean conflicting) throws InterruptedException { + int threadCount = 10; + int locksPerThread = 100; + + List threads = new ArrayList(); + String keyName = conflicting ? "lock_%d" : "lock_%d_%d"; + for (int i = 0; i < threadCount; i++) { + final List keys = new ArrayList(locksPerThread); + for (int j = 0; j < locksPerThread; j++) { + keys.add(String.format(keyName, j, i)); + } + threads.add(new Thread(new Runnable() { + @Override + public void run() { + Lock lock = locks.acquire(keys); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } finally { + lock.unlock(); + } + } + })); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(10000); + if (t.isAlive()) { + fail("Thread hasn't stopped in 10s"); + } + } + } + +}