Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (working copy) @@ -72,7 +72,7 @@ @Nonnull Revision mark(DocumentStore store) throws DocumentStoreException { // first try to mark their revision - if (markCommitRoot(document, theirRev, store)) { + if (markCommitRoot(document, theirRev, ourRev, store)) { return theirRev; } // their commit wins, we have to mark ourRev @@ -79,7 +79,7 @@ NodeDocument newDoc = Collection.NODES.newDocument(store); document.deepCopy(newDoc); UpdateUtils.applyChanges(newDoc, ourOp, context.getRevisionComparator()); - if (!markCommitRoot(newDoc, ourRev, store)) { + if (!markCommitRoot(newDoc, ourRev, theirRev, store)) { throw new IllegalStateException("Unable to annotate our revision " + "with collision marker. Our revision: " + ourRev + ", document:\n" + newDoc.format()); @@ -94,6 +94,7 @@ * @param document the document. * @param revision the revision of the commit to annotated with a collision * marker. + * @param other the revision which detected the collision. * @param store the document store. * @return true if the commit for the given revision was marked * successfully; false otherwise. @@ -100,6 +101,7 @@ */ private static boolean markCommitRoot(@Nonnull NodeDocument document, @Nonnull Revision revision, + @Nonnull Revision other, @Nonnull DocumentStore store) { String p = document.getPath(); String commitRootPath; @@ -131,7 +133,7 @@ // already marked return true; } - NodeDocument.addCollision(op, revision); + NodeDocument.addCollision(op, revision, other); String commitValue = commitRoot.getLocalRevisions().get(revision); if (commitValue == null) { // no revision entry yet Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.plugins.document; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -393,7 +394,16 @@ if (before == null) { String msg = "Conflicting concurrent change. " + "Update operation failed: " + commitRoot; - throw new DocumentStoreException(msg); + NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId()); + DocumentStoreException dse; + if (commitRootDoc == null) { + dse = new DocumentStoreException(msg); + } else { + dse = new ConflictException(msg, + commitRootDoc.getMostRecentConflictFor( + Collections.singleton(revision), nodeStore)); + } + throw dse; } else { success = true; // if we get here the commit was successful and @@ -490,9 +500,12 @@ * @param op the update operation. * @param before how the document looked before the update was applied or * {@code null} if it didn't exist before. + * @throws ConflictException if there was a conflict introduced by the + * given update operation. */ private void checkConflicts(@Nonnull UpdateOp op, - @Nullable NodeDocument before) { + @Nullable NodeDocument before) + throws ConflictException { DocumentStore store = nodeStore.getDocumentStore(); collisions.clear(); if (baseRevision != null) { @@ -507,10 +520,14 @@ }); } String conflictMessage = null; + Revision conflictRevision = newestRev; if (newestRev == null) { if ((op.isDelete() || !op.isNew()) && isConflicting(before, op)) { conflictMessage = "The node " + op.getId() + " does not exist or is already deleted"; + if (before != null && !before.getLocalDeleted().isEmpty()) { + conflictRevision = before.getLocalDeleted().firstKey(); + } } } else { if (op.isNew() && isConflicting(before, op)) { @@ -545,6 +562,7 @@ op.getId() + " was changed in revision\n" + r + ", which was applied after the base revision\n" + baseRevision; + conflictRevision = r; } } } @@ -558,7 +576,7 @@ ",\nrevision order:\n" + nodeStore.getRevisionComparator()); } - throw new DocumentStoreException(conflictMessage); + throw new ConflictException(conflictMessage, conflictRevision); } } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (working copy) @@ -19,14 +19,21 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,14 +41,23 @@ * CommitQueue ensures a sequence of commits consistent with the * commit revision even if commits did not complete in this sequence. */ -abstract class CommitQueue { +final class CommitQueue { static final Logger LOG = LoggerFactory.getLogger(CommitQueue.class); private final SortedMap commits = new TreeMap(StableRevisionComparator.INSTANCE); - protected abstract Revision newRevision(); + /** + * Map of currently suspended commits until a given Revision is visible. + */ + private final Map suspendedCommits = Maps.newIdentityHashMap(); + private final RevisionContext context; + + CommitQueue(@Nonnull RevisionContext context) { + this.context = checkNotNull(context); + } + @Nonnull Revision createRevision() { return createRevisions(1).first(); @@ -54,7 +70,7 @@ Revision rev = null; synchronized (this) { for (int i = 0; i < num; i++) { - rev = newRevision(); + rev = context.newRevision(); revs.add(rev); } commits.put(rev, new Entry(rev)); @@ -70,6 +86,7 @@ void canceled(@Nonnull Revision rev) { removeCommit(rev); + notifySuspendedCommits(rev); } boolean contains(@Nonnull Revision revision) { @@ -78,6 +95,44 @@ } } + /** + * Suspends until the given revision is visible from the current + * headRevision or the given revision is canceled from the commit queue. + * + * @param r the revision to become visible. + */ + void suspendUntil(@Nonnull Revision r) { + Comparator comparator = context.getRevisionComparator(); + Semaphore s = null; + synchronized (suspendedCommits) { + Revision headRevision = context.getHeadRevision(); + if (comparator.compare(r, headRevision) > 0) { + s = new Semaphore(0); + suspendedCommits.put(s, r); + } + } + if (s != null) { + s.acquireUninterruptibly(); + } + } + + /** + * Called when the head revision accessible via the {@link RevisionContext} + * passed to constructor changed. + */ + void headRevisionChanged() { + notifySuspendedCommits(); + } + + /** + * @return the number of suspended threads on this commit queue. + */ + int numSuspendedThreads() { + synchronized (suspendedCommits) { + return suspendedCommits.size(); + } + } + interface Callback { void headOfQueue(@Nonnull Revision revision); @@ -85,6 +140,43 @@ //------------------------< internal >-------------------------------------- + private void notifySuspendedCommits() { + synchronized (suspendedCommits) { + if (suspendedCommits.isEmpty()) { + return; + } + Comparator comparator = context.getRevisionComparator(); + Revision headRevision = context.getHeadRevision(); + Iterator> it = suspendedCommits.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (comparator.compare(entry.getValue(), headRevision) <= 0) { + Semaphore s = entry.getKey(); + it.remove(); + s.release(); + } + } + } + } + + private void notifySuspendedCommits(@Nonnull Revision revision) { + checkNotNull(revision); + synchronized (suspendedCommits) { + if (suspendedCommits.isEmpty()) { + return; + } + Iterator> it = suspendedCommits.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (revision.equals(entry.getValue())) { + Semaphore s = entry.getKey(); + it.remove(); + s.release(); + } + } + } + } + private void removeCommit(@Nonnull Revision rev) { // simply remove and notify next head if any synchronized (this) { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java (working copy) @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.jackrabbit.oak.api.CommitFailedException; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE; + +/** + * A document store exception with an optional conflict revision. The + * DocumentNodeStore implementation will throw this exception when a commit + * or merge fails with a conflict. + */ +class ConflictException extends DocumentStoreException { + + private static final long serialVersionUID = 1418838561903727231L; + + /** + * Optional conflict revision. + */ + private final Revision conflictRevision; + + /** + * @param message the exception / conflict message. + * @param conflictRevision the conflict revision or {@code null} if unknown. + */ + ConflictException(@Nonnull String message, + @Nullable Revision conflictRevision) { + super(checkNotNull(message)); + this.conflictRevision = conflictRevision; + } + + /** + * Convert this exception into a {@link CommitFailedException}. This + * exception will be set as the cause of the returned exception. + * + * @return a {@link CommitFailedException}. + */ + CommitFailedException asCommitFailedException() { + if (conflictRevision != null) { + return new FailedWithConflictException(conflictRevision, getMessage(), this); + } else { + return new CommitFailedException(MERGE, 1, + "Failed to merge changes to the underlying store", this); + } + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ConflictException.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -22,7 +22,6 @@ import static com.google.common.collect.Iterables.toArray; import static com.google.common.collect.Iterables.transform; import static java.util.Collections.singletonList; -import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE; import static org.apache.jackrabbit.oak.commons.PathUtils.concat; import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; @@ -510,12 +509,7 @@ getRevisionComparator().add(headRevision, Revision.newRevision(0)); dispatcher = new ChangeDispatcher(getRoot()); - commitQueue = new CommitQueue() { - @Override - protected Revision newRevision() { - return DocumentNodeStore.this.newRevision(); - } - }; + commitQueue = new CommitQueue(this); String threadNamePostfix = "(" + clusterId + ")"; batchCommitQueue = new BatchCommitQueue(store, revisionComparator); backgroundReadThread = new Thread( @@ -617,19 +611,6 @@ } /** - * Create a new revision. - * - * @return the revision - */ - @Nonnull - Revision newRevision() { - if (simpleRevisionCounter != null) { - return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId); - } - return Revision.newRevision(clusterId); - } - - /** * Creates a new commit. The caller must acknowledge the commit either with * {@link #done(Commit, boolean, CommitInfo)} or {@link #canceled(Commit)}, * depending on the result of the commit. @@ -700,6 +681,7 @@ changes.modified(c.getModifiedPaths()); // update head revision setHeadRevision(c.getRevision()); + commitQueue.headRevisionChanged(); dispatcher.contentChanged(getRoot(), info); } }); @@ -1382,8 +1364,10 @@ b.applyTo(getPendingModifications(), commit.getRevision()); getBranches().remove(b); } else { - throw new CommitFailedException(MERGE, 2, - "Conflicting concurrent change. Update operation failed: " + op); + NodeDocument root = Utils.getRootDocument(store); + Revision conflictRev = root.getMostRecentConflictFor(b.getCommits(), this); + String msg = "Conflicting concurrent change. Update operation failed: " + op; + throw new ConflictException(msg, conflictRev).asCommitFailedException(); } } else { // no commits in this branch -> do nothing @@ -1503,6 +1487,21 @@ } } + /** + * Suspends until the given revision is visible from the current + * headRevision or the given revision is canceled from the commit queue. + * + * @param r the revision to become visible. + */ + void suspendUntil(@Nonnull Revision r) { + // do not suspend if revision is from another cluster node + // and background read is disabled + if (r.getClusterId() != getClusterId() && getAsyncDelay() == 0) { + return; + } + commitQueue.suspendUntil(r); + } + //------------------------< Observable >------------------------------------ @Override @@ -1647,6 +1646,14 @@ return headRevision; } + @Nonnull + public Revision newRevision() { + if (simpleRevisionCounter != null) { + return new Revision(simpleRevisionCounter.getAndIncrement(), 0, clusterId); + } + return Revision.newRevision(clusterId); + } + //----------------------< background operations >--------------------------- /** Used for testing only */ @@ -1904,6 +1911,7 @@ // the new head revision is after other revisions setHeadRevision(newRevision()); if (dispatchChange) { + commitQueue.headRevisionChanged(); time = clock.getTime(); if (externalSort != null) { // then there were external changes and reading them @@ -1911,7 +1919,7 @@ try { JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision); } catch (Exception e1) { - LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1); + LOG.error("backgroundRead: Exception while processing external changes from journal: " + e1, e1); } } stats.populateDiffCache = clock.getTime() - time; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (working copy) @@ -113,6 +113,12 @@ throws CommitFailedException { try { return merge0(hook, info, false); + } catch (FailedWithConflictException e) { + // suspend until conflicting revision is visible + LOG.debug("Suspending until {} is visible. Current head {}.", + e.getConflictRevision(), store.getHeadRevision()); + store.suspendUntil(e.getConflictRevision()); + LOG.debug("Resumed. Current head {}.", store.getHeadRevision()); } catch (CommitFailedException e) { if (!e.isOfType(MERGE)) { throw e; @@ -166,6 +172,9 @@ try { return branchState.merge(checkNotNull(hook), checkNotNull(info), exclusive); + } catch (FailedWithConflictException e) { + // let caller decide what to do with conflicting revision + throw e; } catch (CommitFailedException e) { LOG.trace("Merge Error", e); ex = e; @@ -481,6 +490,8 @@ NodeState newHead = DocumentNodeStoreBranch.this.persist(toCommit, base, info); branchState = new Merged(base); return newHead; + } catch (ConflictException e) { + throw e.asCommitFailedException(); } catch(DocumentStoreException e) { throw new CommitFailedException(MERGE, 1, "Failed to merge changes to the underlying store", e); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java (working copy) @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.api.CommitFailedException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link CommitFailedException} with a conflict revision. + */ +class FailedWithConflictException extends CommitFailedException { + + private static final long serialVersionUID = 2716279884065949789L; + + private final Revision conflictRevision; + + FailedWithConflictException(@Nonnull Revision conflictRevision, + @Nonnull String message, + @Nonnull Throwable cause) { + super(OAK, MERGE, 4, checkNotNull(message), checkNotNull(cause)); + this.conflictRevision = checkNotNull(conflictRevision); + } + + /** + * @return the revision of another commit which caused a conflict. + */ + @Nonnull + Revision getConflictRevision() { + return conflictRevision; + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/FailedWithConflictException.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (working copy) @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -654,6 +655,45 @@ } /** + * Returns the most recent conflict on the given {@code branchCommits} if + * there are any. The returned revision is the commit, which created the + * collision marker for one of the {@code branchCommits}. + * + * @param branchCommits the branch commits to check. + * @param context a revision context. + * @return the conflict revision or {@code null} if there aren't any or + * the collision marker does not have a revision value. + */ + @CheckForNull + Revision getMostRecentConflictFor(@Nonnull Iterable branchCommits, + @Nonnull RevisionContext context) { + checkNotNull(branchCommits); + checkNotNull(context); + + Comparator comparator = context.getRevisionComparator(); + Revision conflict = null; + + Map collisions = getLocalMap(COLLISIONS); + for (Revision r : branchCommits) { + String value = collisions.get(r.asTrunkRevision()); + if (value == null) { + continue; + } + Revision c; + try { + c = Revision.fromString(value); + } catch (IllegalArgumentException e) { + // backward compatibility: collision marker with value 'true' + continue; + } + if (conflict == null || comparator.compare(conflict, c) < 0) { + conflict = c; + } + } + return conflict; + } + + /** * Returns the commit root path for the given revision or * null if this document does not have a commit root entry for * the given revision. @@ -1480,10 +1520,18 @@ checkNotNull(op).removeMapEntry(REVISIONS, checkNotNull(revision)); } + /** + * Add a collision marker for the given {@code revision}. + * + * @param op the update operation. + * @param revision the commit for which a collision was detected. + * @param other the revision for the commit, which detected the collision. + */ public static void addCollision(@Nonnull UpdateOp op, - @Nonnull Revision revision) { + @Nonnull Revision revision, + @Nonnull Revision other) { checkNotNull(op).setMapEntry(COLLISIONS, checkNotNull(revision), - String.valueOf(true)); + other.toString()); } public static void removeCollision(@Nonnull UpdateOp op, Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java (revision 1701032) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionContext.java (working copy) @@ -51,4 +51,10 @@ */ @Nonnull Revision getHeadRevision(); + + /** + * @return a new revision for the local document node store instance. + */ + @Nonnull + Revision newRevision(); } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java (working copy) @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.List; + +import javax.annotation.CheckForNull; + +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.DefaultEditor; +import org.apache.jackrabbit.oak.spi.commit.Editor; +import org.apache.jackrabbit.oak.spi.commit.EditorHook; +import org.apache.jackrabbit.oak.spi.commit.EditorProvider; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.jackrabbit.oak.spi.commit.CommitInfo.EMPTY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class ClusterConflictTest { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterConflictTest.class); + + private DocumentNodeStore ns1; + private DocumentNodeStore ns2; + + @Before + public void setUp() { + MemoryDocumentStore store = new MemoryDocumentStore(); + ns1 = newDocumentNodeStore(store); + ns2 = newDocumentNodeStore(store); + } + + private static DocumentNodeStore newDocumentNodeStore(DocumentStore store) { + // use high async delay and run background ops manually + // asyncDelay set to zero prevents commits from suspending + return new DocumentMK.Builder() + .setAsyncDelay(60000) + .setDocumentStore(store) + .setLeaseCheck(false) // disabled for debugging purposes + .getNodeStore(); + } + + @After + public void tearDown() { + ns1.dispose(); + ns2.dispose(); + } + + @Test + public void suspendUntilVisible() throws Exception { + suspendUntilVisible(false); + } + + @Test + public void suspendUntilVisibleWithBranch() throws Exception { + suspendUntilVisible(true); + } + + private void suspendUntilVisible(boolean withBranch) throws Exception { + NodeBuilder b1 = ns1.getRoot().builder(); + b1.child("counter").setProperty("value", 0); + merge(ns1, b1); + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + + b1 = ns1.getRoot().builder(); + b1.child("foo"); + ns1.merge(b1, new TestHook(), EMPTY); + + final List exceptions = Lists.newArrayList(); + final NodeBuilder b2 = ns2.getRoot().builder(); + b2.child("bar"); + if (withBranch) { + purge(b2); + } + b2.child("baz"); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + LOG.info("initiating merge"); + ns2.merge(b2, new TestHook(), EMPTY); + LOG.info("merge succeeded"); + } catch (CommitFailedException e) { + exceptions.add(e); + } + } + }); + t.start(); + + // wait until t is suspended + for (int i = 0; i < 100; i++) { + if (ns2.commitQueue.numSuspendedThreads() > 0) { + break; + } + Thread.sleep(10); + } + assertEquals(1, ns2.commitQueue.numSuspendedThreads()); + LOG.info("commit suspended"); + + ns1.runBackgroundOperations(); + LOG.info("ran background ops on ns1"); + ns2.runBackgroundOperations(); + LOG.info("ran background ops on ns2"); + assertEquals(0, ns2.commitQueue.numSuspendedThreads()); + + t.join(3000); + assertFalse("Commit did not succeed within 3 seconds", t.isAlive()); + + for (Exception e : exceptions) { + throw e; + } + } + + private static class TestHook extends EditorHook { + + TestHook() { + super(new EditorProvider() { + @CheckForNull + @Override + public Editor getRootEditor(NodeState before, + NodeState after, + NodeBuilder builder, + CommitInfo info) + throws CommitFailedException { + return new TestEditor(builder.child("counter")); + } + }); + } + } + + private static class TestEditor extends DefaultEditor { + + private NodeBuilder counter; + + TestEditor(NodeBuilder counter) { + this.counter = counter; + } + + @Override + public Editor childNodeAdded(String name, NodeState after) + throws CommitFailedException { + counter.setProperty("value", counter.getProperty("value").getValue(Type.LONG) + 1); + return super.childNodeAdded(name, after); + } + } + + private static NodeState merge(NodeStore store, NodeBuilder root) + throws CommitFailedException { + return store.merge(root, EmptyHook.INSTANCE, EMPTY); + } + + private static void purge(NodeBuilder builder) { + ((DocumentRootBuilder) builder).purge(); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterConflictTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java (revision 1701032) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.java (working copy) @@ -21,6 +21,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; import static java.util.Collections.synchronizedList; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; /** @@ -116,12 +118,7 @@ @Test public void concurrentCommits2() throws Exception { - final CommitQueue queue = new CommitQueue() { - @Override - protected Revision newRevision() { - return Revision.newRevision(1); - } - }; + final CommitQueue queue = new CommitQueue(DummyRevisionContext.INSTANCE); final CommitQueue.Callback c = new CommitQueue.Callback() { private Revision before = Revision.newRevision(1); @@ -205,6 +202,47 @@ assertNoExceptions(); } + @Test + public void suspendUntil() throws Exception { + final AtomicReference headRevision = new AtomicReference(); + RevisionContext context = new DummyRevisionContext() { + @Nonnull + @Override + public Revision getHeadRevision() { + return headRevision.get(); + } + }; + headRevision.set(context.newRevision()); + final CommitQueue queue = new CommitQueue(context); + + final Revision r = context.newRevision(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + queue.suspendUntil(r); + } + }); + t.start(); + + // wait until t is suspended + for (int i = 0; i < 100; i++) { + if (queue.numSuspendedThreads() > 0) { + break; + } + Thread.sleep(10); + } + assertEquals(1, queue.numSuspendedThreads()); + + queue.headRevisionChanged(); + // must still be suspended + assertEquals(1, queue.numSuspendedThreads()); + + headRevision.set(r); + queue.headRevisionChanged(); + // must not be suspended anymore + assertEquals(0, queue.numSuspendedThreads()); + } + private void assertNoExceptions() throws Exception { if (!exceptions.isEmpty()) { throw exceptions.get(0); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java (working copy) @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class ConflictExceptionTest { + + @Test + public void type() { + ConflictException e = new ConflictException("conflict", null); + CommitFailedException cfe = e.asCommitFailedException(); + assertEquals(CommitFailedException.MERGE, cfe.getType()); + } + + @Test + public void cause() { + ConflictException e = new ConflictException("conflict", null); + CommitFailedException cfe = e.asCommitFailedException(); + assertSame(e, cfe.getCause()); + } + + @Test + public void asCommitFailedException() { + Revision r = Revision.newRevision(1); + ConflictException e = new ConflictException("conflict", r); + CommitFailedException cfe = e.asCommitFailedException(); + assertTrue(cfe instanceof FailedWithConflictException); + FailedWithConflictException fwce = (FailedWithConflictException) cfe; + assertEquals(CommitFailedException.MERGE, fwce.getType()); + assertEquals(r, fwce.getConflictRevision()); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ConflictExceptionTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java (revision 1701032) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentSplitTest.java (working copy) @@ -886,6 +886,12 @@ } return rc.getHeadRevision(); } + + @Nonnull + @Override + public Revision newRevision() { + return rc.newRevision(); + } } private static NodeState merge(NodeStore store, NodeBuilder root) Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java (revision 1701032) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DummyRevisionContext.java (working copy) @@ -55,4 +55,10 @@ public Revision getHeadRevision() { return Revision.newRevision(1); } + + @Nonnull + @Override + public Revision newRevision() { + return Revision.newRevision(1); + } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (revision 1701032) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (working copy) @@ -39,11 +39,13 @@ import org.junit.Test; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; +import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.revisionAreAmbiguous; import static org.apache.jackrabbit.oak.plugins.document.Revision.RevisionComparator; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getRootDocument; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -61,7 +63,7 @@ for (int i = 0; i < NodeDocument.NUM_REVS_THRESHOLD + 1; i++) { Revision r = Revision.newRevision(1); NodeDocument.setRevision(op, r, "c"); - NodeDocument.addCollision(op, r); + NodeDocument.addCollision(op, r, Revision.newRevision(1)); } UpdateUtils.applyChanges(doc, op, StableRevisionComparator.INSTANCE); Revision head = DummyRevisionContext.INSTANCE.getHeadRevision(); @@ -106,6 +108,53 @@ } @Test + public void getMostRecentConflictFor() { + RevisionContext context = DummyRevisionContext.INSTANCE; + MemoryDocumentStore docStore = new MemoryDocumentStore(); + String id = Utils.getPathFromId("/"); + NodeDocument doc = new NodeDocument(docStore); + doc.put(Document.ID, id); + + Iterable branchCommits = Collections.emptyList(); + Revision conflict = doc.getMostRecentConflictFor(branchCommits, context); + assertNull(conflict); + + // add some collisions + UpdateOp op = new UpdateOp(id, false); + Revision r0 = Revision.newRevision(1); + Revision r1 = Revision.newRevision(1); + Revision c1 = Revision.newRevision(1); + Revision r2 = Revision.newRevision(1); + Revision c2 = Revision.newRevision(1); + // backward compatibility test + op.setMapEntry(COLLISIONS, r0, String.valueOf(true)); + // regular collision entries + NodeDocument.addCollision(op, r1, c1); + NodeDocument.addCollision(op, r2, c2); + UpdateUtils.applyChanges(doc, op, StableRevisionComparator.INSTANCE); + + branchCommits = Collections.singleton(r0); + conflict = doc.getMostRecentConflictFor(branchCommits, context); + assertNull(conflict); + + branchCommits = Collections.singleton(r1.asBranchRevision()); + conflict = doc.getMostRecentConflictFor(branchCommits, context); + assertEquals(c1, conflict); + + branchCommits = Collections.singleton(r2.asBranchRevision()); + conflict = doc.getMostRecentConflictFor(branchCommits, context); + assertEquals(c2, conflict); + + branchCommits = Lists.newArrayList(r1.asBranchRevision(), r2.asBranchRevision()); + conflict = doc.getMostRecentConflictFor(branchCommits, context); + assertEquals(c2, conflict); + + branchCommits = Lists.newArrayList(r2.asBranchRevision(), r1.asBranchRevision()); + conflict = doc.getMostRecentConflictFor(branchCommits, context); + assertEquals(c2, conflict); + } + + @Test public void getAllChanges() throws Exception { final int NUM_CHANGES = 200; DocumentNodeStore ns = createTestStore(NUM_CHANGES); Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java =================================================================== --- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (revision 1701032) +++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (working copy) @@ -26,10 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +59,6 @@ private static final int LOOP_COUNT = 10; private static final int WORKER_COUNT = 20; private static final String PROP_NAME = "testcount"; - private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); private List repos = new ArrayList(); private List mks = new ArrayList(); @@ -81,12 +77,15 @@ @After public void after() throws Exception { + workers.clear(); for (Repository repo : repos) { dispose(repo); } + repos.clear(); for (DocumentMK mk : mks) { mk.dispose(); } + mks.clear(); dropDB(); } @@ -211,7 +210,6 @@ for (int i = 0; i < 2; i++) { DocumentMK mk = new DocumentMK.Builder() .setMongoDB(createConnection().getDB()) - .setAsyncDelay(0) .setClusterId(i + 1).open(); mks.add(mk); } @@ -226,13 +224,11 @@ Session s2 = r2.login(new SimpleCredentials("admin", "admin".toCharArray())); ensureIndex(s1.getRootNode(), PROP_NAME); - syncMKs(1); ensureIndex(s2.getRootNode(), PROP_NAME); Map exceptions = Collections.synchronizedMap( new HashMap()); createNodes(s1, "testroot-1", 1, 1, exceptions); - syncMKs(1); createNodes(s2, "testroot-2", 1, 1, exceptions); for (Map.Entry entry : exceptions.entrySet()) { @@ -264,9 +260,9 @@ Session s3 = r3.login(new SimpleCredentials("admin", "admin".toCharArray())); ensureIndex(s1.getRootNode(), PROP_NAME); - syncMKs(1); - ensureIndex(s2.getRootNode(), PROP_NAME); - ensureIndex(s3.getRootNode(), PROP_NAME); + runBackgroundOps(mk1); + runBackgroundOps(mk2); + runBackgroundOps(mk3); // begin test @@ -357,18 +353,6 @@ assertTrue(s1.getRootNode().hasNode("session-2/nodes")); } - private void syncMKs(int delay) { - EXECUTOR.schedule(new Callable() { - @Override - public Object call() throws Exception { - for (DocumentMK mk : mks) { - runBackgroundOps(mk); - } - return null; - } - }, delay, TimeUnit.SECONDS); - } - private static MongoConnection createConnection() throws Exception { return OakMongoNSRepositoryStub.createConnection( ConcurrentAddNodesClusterIT.class.getSimpleName());