diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java index f548b07..1aa72ce 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java @@ -20,8 +20,11 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -32,8 +35,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import com.google.common.collect.Maps; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +56,7 @@ final class CommitQueue { /** * Map of currently suspended commits until a given Revision is visible. */ - private final Map suspendedCommits = Maps.newIdentityHashMap(); + private final Map suspendedCommits = new HashMap(); private final RevisionContext context; @@ -103,7 +104,7 @@ final class CommitQueue { } /** - * Suspends until one of the following happens: + * Suspends until for each of given revisions one of the following happens: *
    *
  • the given revision is visible from the current headRevision
  • *
  • the given revision is canceled from the commit queue
  • @@ -111,25 +112,32 @@ final class CommitQueue { *
  • the thread is interrupted
  • *
* - * @param r the revision to become visible. + * @param conflictRevisions the revisions to become visible. */ - void suspendUntil(@Nonnull Revision r) { + void suspendUntilAll(@Nonnull Set conflictRevisions) { Comparator comparator = context.getRevisionComparator(); Semaphore s = null; + int addedRevisions; synchronized (suspendedCommits) { Revision headRevision = context.getHeadRevision(); - if (comparator.compare(r, headRevision) > 0) { - s = new Semaphore(0); - suspendedCommits.put(s, r); + Set afterHead = new HashSet(conflictRevisions.size()); + for (Revision r : conflictRevisions) { + if (comparator.compare(r, headRevision) > 0) { + afterHead.add(r); + } } + + s = new Semaphore(0); + suspendedCommits.put(s, new SuspendedCommit(s, afterHead)); + addedRevisions = afterHead.size(); } - if (s != null) { - try { - s.tryAcquire(suspendTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - synchronized (suspendedCommits) { - suspendedCommits.remove(s); - } + try { + s.tryAcquire(addedRevisions, suspendTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.debug("The suspended thread has been interrupted", e); + } finally { + synchronized (suspendedCommits) { + suspendedCommits.remove(s); } } } @@ -153,7 +161,7 @@ final class CommitQueue { /** * Sets the suspend timeout in milliseconds. - * See also {@link #suspendUntil(Revision)}. + * See also {@link #suspendUntilAll(Revision)}. * * @param timeout the timeout to set. */ @@ -173,15 +181,12 @@ final class CommitQueue { if (suspendedCommits.isEmpty()) { return; } - Comparator comparator = context.getRevisionComparator(); Revision headRevision = context.getHeadRevision(); - Iterator> it = suspendedCommits.entrySet().iterator(); + Iterator it = suspendedCommits.values().iterator(); while (it.hasNext()) { - Map.Entry entry = it.next(); - if (comparator.compare(entry.getValue(), headRevision) <= 0) { - Semaphore s = entry.getKey(); + SuspendedCommit suspended = it.next(); + if (suspended.removeRevisionsYoungerThan(headRevision) && suspended.revisions.isEmpty()) { it.remove(); - s.release(); } } } @@ -193,13 +198,11 @@ final class CommitQueue { if (suspendedCommits.isEmpty()) { return; } - Iterator> it = suspendedCommits.entrySet().iterator(); + Iterator it = suspendedCommits.values().iterator(); while (it.hasNext()) { - Map.Entry entry = it.next(); - if (revision.equals(entry.getValue())) { - Semaphore s = entry.getKey(); + SuspendedCommit suspended = it.next(); + if (suspended.removeRevision(revision) && suspended.revisions.isEmpty()) { it.remove(); - s.release(); } } } @@ -292,4 +295,39 @@ final class CommitQueue { } } } -} + + private class SuspendedCommit { + + private final Semaphore semaphore; + + private final Set revisions; + + private SuspendedCommit(Semaphore semaphore, Set revisions) { + this.semaphore = semaphore; + this.revisions = revisions; + } + + private boolean removeRevisionsYoungerThan(Revision revision) { + Comparator comparator = context.getRevisionComparator(); + Iterator it = revisions.iterator(); + boolean removed = false; + while (it.hasNext()) { + if (comparator.compare(it.next(), revision) <= 0) { + it.remove(); + semaphore.release(); + removed = true; + } + } + return removed; + } + + private boolean removeRevision(Revision r) { + if (revisions.remove(r)) { + semaphore.release(); + return true; + } else { + return false; + } + } + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java index 2a47e3c..8528d13 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java @@ -17,13 +17,15 @@ package org.apache.jackrabbit.oak.plugins.document; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.jackrabbit.oak.api.CommitFailedException; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE; +import java.util.Collections; +import java.util.Set; + /** * A document store exception with an optional conflict revision. The * DocumentNodeStore implementation will throw this exception when a commit @@ -34,18 +36,36 @@ class ConflictException extends DocumentStoreException { private static final long serialVersionUID = 1418838561903727231L; /** - * Optional conflict revision. + * Optional conflict revisions list. */ - private final Revision conflictRevision; + private final Set conflictRevisions; /** * @param message the exception / conflict message. - * @param conflictRevision the conflict revision or {@code null} if unknown. + * @param conflictRevision the conflict revision */ ConflictException(@Nonnull String message, - @Nullable Revision conflictRevision) { + @Nonnull Revision conflictRevision) { + super(checkNotNull(message)); + this.conflictRevisions = Collections.singleton(checkNotNull(conflictRevision)); + } + + /** + * @param message the exception / conflict message. + * @param conflictRevisions the conflict revision list + */ + ConflictException(@Nonnull String message, + @Nonnull Set conflictRevisions) { + super(checkNotNull(message)); + this.conflictRevisions = checkNotNull(conflictRevisions); + } + + /** + * @param message the exception / conflict message. + */ + ConflictException(@Nonnull String message) { super(checkNotNull(message)); - this.conflictRevision = conflictRevision; + this.conflictRevisions = Collections.emptySet(); } /** @@ -55,11 +75,20 @@ class ConflictException extends DocumentStoreException { * @return a {@link CommitFailedException}. */ CommitFailedException asCommitFailedException() { - if (conflictRevision != null) { - return new FailedWithConflictException(conflictRevision, getMessage(), this); + if (conflictRevisions != null) { + return new FailedWithConflictException(conflictRevisions, getMessage(), this); } else { return new CommitFailedException(MERGE, 1, "Failed to merge changes to the underlying store", this); } } -} + + /** + * List of conflict revisions. + * + * @return a list of conflict revisions (may be empty) + */ + @Nonnull Iterable getConflictRevisions() { + return conflictRevisions; + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 3192f88..adc9b33 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -43,6 +43,7 @@ import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1456,21 +1457,28 @@ public final class DocumentNodeStore } /** - * Suspends until the given revision is visible from the current - * headRevision or the given revision is canceled from the commit queue. + * Suspends until all given revisions are either visible from the current + * headRevision or canceled from the commit queue. * - * The thread will *not* be suspended if the given revision is from a - * foreign cluster node and async delay is set to zero. + * Only revisions from the local cluster node will be considered if the async + * delay is set to 0. * - * @param r the revision to become visible. + * @param conflictRevisions the revision to become visible. */ - void suspendUntil(@Nonnull Revision r) { + void suspendUntilAll(@Nonnull Set conflictRevisions) { // do not suspend if revision is from another cluster node // and background read is disabled - if (r.getClusterId() != getClusterId() && getAsyncDelay() == 0) { - return; + if (getAsyncDelay() == 0) { + Set onlyLocal = new HashSet(conflictRevisions.size()); + for (Revision r : conflictRevisions) { + if (r.getClusterId() == getClusterId()) { + onlyLocal.add(r); + } + } + commitQueue.suspendUntilAll(onlyLocal); + } else { + commitQueue.suspendUntilAll(conflictRevisions); } - commitQueue.suspendUntil(r); } //------------------------< Observable >------------------------------------ diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java index 33d01a0..c9387b4 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java @@ -23,6 +23,7 @@ import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK; import static org.apache.jackrabbit.oak.api.CommitFailedException.STATE; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS; +import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -149,7 +150,7 @@ class DocumentNodeStoreBranch implements NodeStoreBranch { boolean exclusive) throws CommitFailedException { CommitFailedException ex = null; - Revision conflictRevision = null; + Set conflictRevisions = new HashSet(); long time = System.currentTimeMillis(); int numRetries = 0; for (long backoff = MIN_BACKOFF; backoff <= maximumBackoff; backoff *= 2) { @@ -159,14 +160,13 @@ class DocumentNodeStoreBranch implements NodeStoreBranch { final long start = perfLogger.start(); // suspend until conflict revision is visible // or as a fallback sleep for a while - if (conflictRevision != null) { + if (!conflictRevisions.isEmpty()) { // suspend until conflicting revision is visible LOG.debug("Suspending until {} is visible. Current head {}.", - conflictRevision, store.getHeadRevision()); - store.suspendUntil(conflictRevision); + conflictRevisions, store.getHeadRevision()); + store.suspendUntilAll(conflictRevisions); + conflictRevisions.clear(); LOG.debug("Resumed. Current head {}.", store.getHeadRevision()); - // reset conflict revision - conflictRevision = null; } else { Thread.sleep(backoff + RANDOM.nextInt((int) Math.min(backoff, Integer.MAX_VALUE))); } @@ -181,7 +181,7 @@ class DocumentNodeStoreBranch implements NodeStoreBranch { checkNotNull(info), exclusive); } catch (FailedWithConflictException e) { ex = e; - conflictRevision = e.getConflictRevision(); + conflictRevisions.addAll(e.getConflictRevisions()); } catch (CommitFailedException e) { ex = e; } @@ -740,4 +740,4 @@ class DocumentNodeStoreBranch implements NodeStoreBranch { throw ex; } } -} +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java index ef51dee..c98bcd5 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java @@ -22,6 +22,8 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Set; + /** * A {@link CommitFailedException} with a conflict revision. */ @@ -29,20 +31,20 @@ class FailedWithConflictException extends CommitFailedException { private static final long serialVersionUID = 2716279884065949789L; - private final Revision conflictRevision; + private final Set conflictRevisions; - FailedWithConflictException(@Nonnull Revision conflictRevision, + FailedWithConflictException(@Nonnull Set conflictRevisions, @Nonnull String message, @Nonnull Throwable cause) { super(OAK, MERGE, 4, checkNotNull(message), checkNotNull(cause)); - this.conflictRevision = checkNotNull(conflictRevision); + this.conflictRevisions = checkNotNull(conflictRevisions); } /** * @return the revision of another commit which caused a conflict. */ @Nonnull - Revision getConflictRevision() { - return conflictRevision; + Set getConflictRevisions() { + return conflictRevisions; } -} +} \ No newline at end of file diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java index 803a384..87c86dd 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java @@ -18,8 +18,11 @@ package org.apache.jackrabbit.oak.plugins.document; import java.io.Closeable; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -37,6 +40,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Sets.union; import static java.util.Collections.synchronizedList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -216,13 +221,15 @@ public class CommitQueueTest { } }; headRevision.set(context.newRevision()); + final CommitQueue queue = new CommitQueue(context); - final Revision r = context.newRevision(); + final Revision newHeadRev = context.newRevision(); + final Set revisions = queue.createRevisions(10); Thread t = new Thread(new Runnable() { @Override public void run() { - queue.suspendUntil(r); + queue.suspendUntilAll(union(of(newHeadRev), revisions)); } }); t.start(); @@ -240,8 +247,14 @@ public class CommitQueueTest { // must still be suspended assertEquals(1, queue.numSuspendedThreads()); - headRevision.set(r); + headRevision.set(newHeadRev); queue.headRevisionChanged(); + // must still be suspended + assertEquals(1, queue.numSuspendedThreads()); + + for (Revision rev : revisions) { + queue.canceled(rev); + } // must not be suspended anymore assertEquals(0, queue.numSuspendedThreads()); } @@ -264,7 +277,7 @@ public class CommitQueueTest { Thread t = new Thread(new Runnable() { @Override public void run() { - queue.suspendUntil(r); + queue.suspendUntilAll(of(r)); } }); t.start(); @@ -273,6 +286,66 @@ public class CommitQueueTest { assertFalse(t.isAlive()); } + @Test + public void concurrentSuspendUntil() throws Exception { + final AtomicReference headRevision = new AtomicReference(); + RevisionContext context = new DummyRevisionContext() { + @Nonnull + @Override + public Revision getHeadRevision() { + return headRevision.get(); + } + }; + headRevision.set(context.newRevision()); + + List threads = new ArrayList(); + List allRevisions = new ArrayList(); + + final CommitQueue queue = new CommitQueue(context); + for (int i = 0; i < 10; i++) { // threads count + final Set revisions = new HashSet(); + for (int j = 0; j < 10; j++) { // revisions per thread + Revision r = queue.createRevision(); + revisions.add(r); + allRevisions.add(r); + } + Thread t = new Thread(new Runnable() { + public void run() { + queue.suspendUntilAll(revisions); + } + }); + threads.add(t); + t.start(); + } + + for (int i = 0; i < 100; i++) { + if (queue.numSuspendedThreads() == 10) { + break; + } + Thread.sleep(10); + } + assertEquals(10, queue.numSuspendedThreads()); + + Collections.shuffle(allRevisions); + for (Revision r : allRevisions) { + queue.canceled(r); + Thread.sleep(10); + } + + for (int i = 0; i < 100; i++) { + if (queue.numSuspendedThreads() == 0) { + break; + } + Thread.sleep(10); + } + assertEquals(0, queue.numSuspendedThreads()); + + for (Thread t : threads) { + t.join(10); + assertFalse(t.isAlive()); + } + } + private void assertNoExceptions() throws Exception { if (!exceptions.isEmpty()) { throw exceptions.get(0); diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java index 62544dc..f7b54a7 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java @@ -23,18 +23,20 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.util.Collections; + public class ConflictExceptionTest { @Test public void type() { - ConflictException e = new ConflictException("conflict", null); + ConflictException e = new ConflictException("conflict"); CommitFailedException cfe = e.asCommitFailedException(); assertEquals(CommitFailedException.MERGE, cfe.getType()); } @Test public void cause() { - ConflictException e = new ConflictException("conflict", null); + ConflictException e = new ConflictException("conflict"); CommitFailedException cfe = e.asCommitFailedException(); assertSame(e, cfe.getCause()); } @@ -47,6 +49,6 @@ public class ConflictExceptionTest { assertTrue(cfe instanceof FailedWithConflictException); FailedWithConflictException fwce = (FailedWithConflictException) cfe; assertEquals(CommitFailedException.MERGE, fwce.getType()); - assertEquals(r, fwce.getConflictRevision()); + assertEquals(Collections.singleton(r), fwce.getConflictRevisions()); } } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java index 89d4e90..66ee975 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -2380,6 +2381,73 @@ public class DocumentNodeStoreTest { mergeAttempts.get() <= 1); } + // OAK-3586 + @Test + public void resolveMultipleConflictedRevisions() throws Exception { + MemoryDocumentStore store = new MemoryDocumentStore(); + final DocumentNodeStore ds = builderProvider.newBuilder() + .setDocumentStore(store) + .setAsyncDelay(0).getNodeStore(); + + DocumentNodeState root = ds.getRoot(); + final DocumentNodeStoreBranch b = ds.createBranch(root); + + NodeBuilder builder = root.builder(); + builder.child("foo"); + b.setRoot(builder.getNodeState()); + + final Set revisions = new HashSet(); + final List commits = new ArrayList(); + for (int i = 0; i < 10; i++) { + Revision revision = ds.newRevision(); + Commit commit = ds.newCommit(revision, ds.createBranch(root)); + commits.add(commit); + revisions.add(revision); + } + + final AtomicBoolean merged = new AtomicBoolean(); + Thread t = new Thread(new Runnable() { + public void run() { + try { + CommitFailedException exception = new ConflictException("Can't merge", revisions).asCommitFailedException(); + b.merge(new HookFailingOnce(exception), CommitInfo.EMPTY); + merged.set(true); + } catch (CommitFailedException e) { + LOG.error("Can't commit", e); + } + } + }); + t.start(); + + // 6 x done() + for (int i = 0; i < 6; i++) { + assertFalse("The branch can't be merged yet", merged.get()); + ds.done(commits.get(i), false, CommitInfo.EMPTY); + } + + // 2 x cancel() + for (int i = 6; i < 8; i++) { + assertFalse("The branch can't be merged yet", merged.get()); + ds.canceled(commits.get(i)); + } + + // 2 x branch done() + for (int i = 8; i < 10; i++) { + assertFalse("The branch can't be merged yet", merged.get()); + ds.done(commits.get(i), true, CommitInfo.EMPTY); + } + + for (int i = 0; i < 100; i++) { + if (merged.get()) { + break; + } + Thread.sleep(10); + } + assertTrue("The branch should be merged by now", merged.get()); + + t.join(); + } + // OAK-3411 @Test public void sameSeenAtRevision() throws Exception { @@ -2599,6 +2667,28 @@ public class DocumentNodeStoreTest { } }; + private static class HookFailingOnce implements CommitHook { + + private final AtomicBoolean failedAlready = new AtomicBoolean(); + + private final CommitFailedException exception; + + private HookFailingOnce(CommitFailedException exception) { + this.exception = exception; + } + + @Override + public NodeState processCommit(NodeState before, NodeState after, CommitInfo info) + throws CommitFailedException { + if (failedAlready.getAndSet(true)) { + return after; + } else { + throw exception; + } + } + + } + private static class TestEditor extends DefaultEditor { private final NodeBuilder builder;