Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (revision 1810337) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (working copy) @@ -90,7 +90,8 @@ if (!markCommitRoot(newDoc, ourRev, theirRev, store, context)) { throw new IllegalStateException("Unable to annotate our revision " + "with collision marker. Our revision: " + ourRev - + ", document:\n" + newDoc.format()); + + ", their revision: " + theirRev + ", document:\n" + + newDoc.format()); } return ourRev; } Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (revision 1810337) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (working copy) @@ -50,6 +50,7 @@ import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot; import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; +import static org.apache.jackrabbit.oak.plugins.document.Document.MOD_COUNT; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD; @@ -337,60 +338,64 @@ boolean success = false; try { opLog.addAll(changedNodes); - List oldDocs = store.createOrUpdate(NODES, changedNodes); - checkConflicts(oldDocs, changedNodes); - checkSplitCandidate(oldDocs); - // finally write the commit root (the commit root might be written - // twice, first to check if there was a conflict, and only then to - // commit the revision, with the revision property set) - NodeDocument.setRevision(commitRoot, revision, commitValue); - if (commitRootHasChanges) { - // remove previously added commit root - NodeDocument.removeCommitRoot(commitRoot, revision); - } - opLog.add(commitRoot); - if (baseBranchRevision == null) { - // create a clone of the commitRoot in order - // to set isNew to false. If we get here the - // commitRoot document already exists and - // only needs an update - UpdateOp commit = commitRoot.copy(); - commit.setNew(false); - // only set revision on commit root when there is - // no collision for this commit revision - commit.containsMapEntry(COLLISIONS, revision, false); - NodeDocument before = nodeStore.updateCommitRoot(commit, revision); - if (before == null) { - String msg = "Conflicting concurrent change. " + - "Update operation failed: " + commitRoot; - NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId()); - DocumentStoreException dse; - if (commitRootDoc == null) { - dse = new DocumentStoreException(msg); + if (conditionalCommit(changedNodes, commitValue)) { + success = true; + } else { + List oldDocs = store.createOrUpdate(NODES, changedNodes); + checkConflicts(oldDocs, changedNodes); + checkSplitCandidate(oldDocs); + + // finally write the commit root (the commit root might be written + // twice, first to check if there was a conflict, and only then to + // commit the revision, with the revision property set) + NodeDocument.setRevision(commitRoot, revision, commitValue); + if (commitRootHasChanges) { + // remove previously added commit root + NodeDocument.removeCommitRoot(commitRoot, revision); + } + opLog.add(commitRoot); + if (baseBranchRevision == null) { + // create a clone of the commitRoot in order + // to set isNew to false. If we get here the + // commitRoot document already exists and + // only needs an update + UpdateOp commit = commitRoot.copy(); + commit.setNew(false); + // only set revision on commit root when there is + // no collision for this commit revision + commit.containsMapEntry(COLLISIONS, revision, false); + NodeDocument before = nodeStore.updateCommitRoot(commit, revision); + if (before == null) { + String msg = "Conflicting concurrent change. " + + "Update operation failed: " + commitRoot; + NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId()); + DocumentStoreException dse; + if (commitRootDoc == null) { + dse = new DocumentStoreException(msg); + } else { + dse = new ConflictException(msg, + commitRootDoc.getConflictsFor( + Collections.singleton(revision))); + } + throw dse; } else { - dse = new ConflictException(msg, - commitRootDoc.getConflictsFor( - Collections.singleton(revision))); + success = true; + // if we get here the commit was successful and + // the commit revision is set on the commitRoot + // document for this commit. + // now check for conflicts/collisions by other commits. + // use original commitRoot operation with + // correct isNew flag. + checkConflicts(commitRoot, before); + checkSplitCandidate(before); } - throw dse; } else { - success = true; - // if we get here the commit was successful and - // the commit revision is set on the commitRoot - // document for this commit. - // now check for conflicts/collisions by other commits. - // use original commitRoot operation with - // correct isNew flag. - checkConflicts(commitRoot, before); - checkSplitCandidate(before); + // this is a branch commit, do not fail on collisions now + // trying to merge the branch will fail later + createOrUpdateNode(store, commitRoot); } - } else { - // this is a branch commit, do not fail on collisions now - // trying to merge the branch will fail later - createOrUpdateNode(store, commitRoot); } - operations.put(commitRootPath, commitRoot); } catch (DocumentStoreException e) { // OAK-3084 do not roll back if already committed if (success) { @@ -413,6 +418,49 @@ } } + private boolean conditionalCommit(List changedNodes, + String commitValue) + throws DocumentStoreException { + // conditional commit is only possible when not on a branch + // and commit root is on the same document as the changes + if (!Utils.isCommitted(commitValue) || changedNodes.size() != 1) { + return false; + } + UpdateOp op = changedNodes.get(0); + DocumentStore store = nodeStore.getDocumentStore(); + NodeDocument doc = store.getIfCached(NODES, op.getId()); + if (doc == null || doc.getModCount() == null) { + // document not in cache or store does not maintain modCount + return false; + } + try { + checkConflicts(op, doc); + } catch (ConflictException e) { + // remove collision marker again + removeCollisionMarker(op.getId()); + return false; + } + // if we get here, update based on current doc does not conflict + // create a new commit update operation, setting the revisions + // commit entry together with the other changes + UpdateOp commit = op.copy(); + NodeDocument.unsetCommitRoot(commit, revision); + NodeDocument.setRevision(commit, revision, commitValue); + // make the update conditional on the modCount + commit.equals(MOD_COUNT, doc.getModCount()); + NodeDocument before = store.findAndUpdate(NODES, commit); + if (before != null) { + checkSplitCandidate(before); + } + return before != null; + } + + private void removeCollisionMarker(String id) { + UpdateOp removeCollision = new UpdateOp(id, false); + NodeDocument.removeCollision(removeCollision, revision); + nodeStore.getDocumentStore().findAndUpdate(NODES, removeCollision); + } + private void updateParentChildStatus() { final Set processedParents = Sets.newHashSet(); for (String path : addedNodes) { @@ -447,9 +495,7 @@ } store.findAndUpdate(NODES, reverse); } - UpdateOp removeCollision = new UpdateOp(commitRoot.getId(), false); - NodeDocument.removeCollision(removeCollision, revision); - store.findAndUpdate(NODES, removeCollision); + removeCollisionMarker(commitRoot.getId()); } /** Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (revision 1810337) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (working copy) @@ -1816,6 +1816,11 @@ checkNotNull(op).removeMapEntry(COMMIT_ROOT, revision); } + public static void unsetCommitRoot(@Nonnull UpdateOp op, + @Nonnull Revision revision) { + checkNotNull(op).unsetMapEntry(COMMIT_ROOT, revision); + } + public static void setDeleted(@Nonnull UpdateOp op, @Nonnull Revision revision, boolean deleted) { Index: oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java =================================================================== --- oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1810337) +++ oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.plugins.document; import static java.util.Collections.emptyList; +import static java.util.Collections.synchronizedList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.api.CommitFailedException.CONSTRAINT; import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; @@ -77,6 +78,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.jcr.InvalidItemStateException; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -2904,10 +2906,9 @@ assertTrue("Two added paths should have forced flush", numChangedPaths == 0); } - @Ignore("OAK-5788") @Test public void commitRootSameAsModifiedPath() throws Exception{ - WriteCountingStore ws = new WriteCountingStore(); + WriteCountingStore ws = new WriteCountingStore(true); DocumentNodeStore ns = builderProvider.newBuilder().setAsyncDelay(0).setDocumentStore(ws).getNodeStore(); NodeBuilder builder = ns.getRoot().builder(); @@ -2923,6 +2924,80 @@ assertEquals(1, ws.count); } + @Test + public void commitRootSameAsModifiedPathWithConflicts() throws Exception{ + MemoryDocumentStore store = new MemoryDocumentStore(true); + final DocumentNodeStore ns = builderProvider.newBuilder().setAsyncDelay(0) + .setDocumentStore(store).getNodeStore(); + NodeBuilder builder = ns.getRoot().builder(); + builder.child("a").child("b").setProperty("p", 0L); + merge(ns, builder); + + final List exceptions = synchronizedList(Lists.newArrayList()); + + Runnable task = new Runnable() { + + CommitHook hook = new CompositeHook( + new ConflictHook(new AnnotatingConflictHandler()), + new EditorHook(new ConflictValidatorProvider()) + ); + + @Override + public void run() { + try { + for (int i = 0; i < 100; i++) { + NodeBuilder builder = ns.getRoot().builder(); + NodeBuilder b = builder.child("a").child("b"); + PropertyState p = b.getProperty("p"); + assertNotNull(p); + long value = p.getValue(Type.LONG) + 1; + b.setProperty(p.getName(), value); + try { + ns.merge(builder, hook, CommitInfo.EMPTY); + } catch (CommitFailedException e) { + if (e.asRepositoryException() instanceof InvalidItemStateException) { + // this is fine and may happen from time to + // time because the test updates the same + // property concurrently + } else { + // anything else is unexpected + exceptions.add(e); + } + } + } + } catch (AssertionError e) { + exceptions.add(e); + } + } + }; + + List threads = Lists.newArrayList(); + for (int i = 0; i < 4; i++) { + threads.add(new Thread(task)); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + + // check updates are consecutive + NodeDocument doc = store.find(NODES, Utils.getIdFromPath("/a/b")); + assertNotNull(doc); + long previousValue = -1; + List values = Lists.newArrayList(doc.getLocalMap("p").values()); + for (String v : Lists.reverse(values)) { + long currentValue = Long.parseLong(v); + assertEquals(previousValue + 1, currentValue); + previousValue = currentValue; + } + + for (Throwable e : exceptions) { + fail(e.toString()); + } + } + @Ignore("OAK-5791") @Test public void createChildNodeAndCheckNoOfCalls() throws Exception{ @@ -3477,6 +3552,13 @@ private final ThreadLocal createMulti = new ThreadLocal<>(); int count; + WriteCountingStore() { + } + + WriteCountingStore(boolean maintainModCount) { + super(maintainModCount); + } + @Override public T createOrUpdate(Collection collection, UpdateOp update) { if (createMulti.get() == null) {