Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (revision 1867455) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java (working copy) @@ -52,17 +52,20 @@ private final UpdateOp ourOp; private final Revision ourRev; private final RevisionContext context; + private final RevisionVector startRevisions; Collision(@NotNull NodeDocument document, @NotNull Revision theirRev, @NotNull UpdateOp ourOp, @NotNull Revision ourRev, - @NotNull RevisionContext context) { + @NotNull RevisionContext context, + @NotNull RevisionVector startRevisions) { this.document = checkNotNull(document); this.theirRev = checkNotNull(theirRev); this.ourOp = checkNotNull(ourOp); this.ourRev = checkNotNull(ourRev); this.context = checkNotNull(context); + this.startRevisions = checkNotNull(startRevisions); } /** @@ -101,9 +104,18 @@ * * @return {@code true} if this is a conflicting collision, {@code false} * otherwise. - * @throws DocumentStoreException + * @throws DocumentStoreException if an operation on the document store + * fails. */ boolean isConflicting() throws DocumentStoreException { + // their revision is not conflicting when it is identified as branch + // commit that cannot be merged (orphaned branch commit, theirRev is + // garbage). + if (document.getLocalBranchCommits().contains(theirRev) + && !startRevisions.isRevisionNewer(theirRev)) { + return false; + } + // did their revision create or delete the node? if (document.getDeleted().containsKey(theirRev)) { return true; Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (revision 1867455) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (working copy) @@ -59,6 +59,7 @@ protected final DocumentNodeStore nodeStore; private final RevisionVector baseRevision; + private final RevisionVector startRevisions; private final Revision revision; private final HashMap operations = new LinkedHashMap<>(); private final Set collisions = new LinkedHashSet(); @@ -85,24 +86,29 @@ * @param revision the revision for this commit. * @param baseRevision the base revision for this commit or {@code null} if * there is none. + * @param startRevisions the revisions for each cluster node corresponding + * to the start time of the cluster nodes. */ Commit(@NotNull DocumentNodeStore nodeStore, @NotNull Revision revision, - @Nullable RevisionVector baseRevision) { + @Nullable RevisionVector baseRevision, + @NotNull RevisionVector startRevisions) { this.nodeStore = checkNotNull(nodeStore); this.revision = checkNotNull(revision); this.baseRevision = baseRevision; + this.startRevisions = startRevisions; } Commit(@NotNull DocumentNodeStore nodeStore, @NotNull Revision revision, @Nullable RevisionVector baseRevision, + @NotNull RevisionVector startRevisions, @NotNull Map operations, @NotNull Set addedNodes, @NotNull Set removedNodes, @NotNull Set nodesWithBinaries, @NotNull Map bundledNodes) { - this(nodeStore, revision, baseRevision); + this(nodeStore, revision, baseRevision, startRevisions); this.operations.putAll(operations); this.addedNodes.addAll(addedNodes); this.removedNodes.addAll(removedNodes); @@ -595,7 +601,7 @@ // TODO: unify above conflict detection and isConflicting() boolean allowConflictingDeleteChange = allowConcurrentAddRemove(before, op); for (Revision r : collisions) { - Collision c = new Collision(before, r, op, revision, nodeStore); + Collision c = new Collision(before, r, op, revision, nodeStore, startRevisions); if (c.isConflicting() && !allowConflictingDeleteChange) { // mark collisions on commit root if (c.mark(store).equals(revision)) { @@ -807,6 +813,12 @@ } } + + @NotNull + RevisionVector getStartRevisions() { + return startRevisions; + } + /** * Apply the changes of a node to the cache. * Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java (revision 1867455) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java (working copy) @@ -30,7 +30,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** @@ -44,6 +43,7 @@ private final DocumentNodeStore nodeStore; private final Revision revision; private final RevisionVector baseRevision; + private RevisionVector startRevisions = new RevisionVector(); private final Map operations = new LinkedHashMap<>(); private final Set addedNodes = new HashSet<>(); @@ -221,6 +221,19 @@ } /** + * Sets the start revisions of known clusterIds on this commit builder. + * + * @param startRevisions the start revisions derived from the start time + * in the clusterNodes entries. + * @return {@code this} builder. + */ + @NotNull + CommitBuilder withStartRevisions(@NotNull RevisionVector startRevisions) { + this.startRevisions = checkNotNull(startRevisions); + return this; + } + + /** * Builds the commit with the modifications. * * @return {@code this} builder. @@ -234,8 +247,9 @@ String msg = "Cannot build a commit with a pseudo commit revision"; throw new IllegalStateException(msg); } - return new Commit(nodeStore, revision, baseRevision, operations, - addedNodes, removedNodes, nodesWithBinaries, bundledNodes); + return new Commit(nodeStore, revision, baseRevision, startRevisions, + operations, addedNodes, removedNodes, nodesWithBinaries, + bundledNodes); } /** @@ -251,8 +265,9 @@ Revision from = this.revision; Map operations = Maps.transformValues( this.operations, op -> rewrite(op, from, revision)); - return new Commit(nodeStore, revision, baseRevision, operations, - addedNodes, removedNodes, nodesWithBinaries, bundledNodes); + return new Commit(nodeStore, revision, baseRevision, startRevisions, + operations, addedNodes, removedNodes, nodesWithBinaries, + bundledNodes); } /** Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1867455) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.partition; import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Lists.reverse; @@ -2244,19 +2243,7 @@ */ @NotNull private RevisionVector getMinExternalRevisions() { - return new RevisionVector(transform(filter(clusterNodes.values(), - new Predicate() { - @Override - public boolean apply(ClusterNodeInfoDocument input) { - return input.getClusterId() != getClusterId(); - } - }), - new Function() { - @Override - public Revision apply(ClusterNodeInfoDocument input) { - return new Revision(input.getStartTime(), 0, input.getClusterId()); - } - })); + return Utils.getStartRevisions(clusterNodes.values()).remove(getClusterId()); } /** @@ -2758,8 +2745,10 @@ checkArgument(!checkNotNull(base).isBranch(), "base must not be a branch revision: " + base); + RevisionVector startRevs = Utils.getStartRevisions(clusterNodes.values()); // build commit before revision is created by the commit queue (OAK-7869) - CommitBuilder commitBuilder = new CommitBuilder(this, base); + CommitBuilder commitBuilder = new CommitBuilder(this, base) + .withStartRevisions(startRevs); changes.with(commitBuilder); boolean success = false; Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java (revision 1867455) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java (working copy) @@ -35,7 +35,7 @@ MergeCommit(DocumentNodeStore nodeStore, RevisionVector baseRevision, SortedSet revisions) { - super(nodeStore, revisions.last(), baseRevision); + super(nodeStore, revisions.last(), baseRevision, new RevisionVector()); this.mergeRevs = revisions; } Index: oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java =================================================================== --- oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (revision 1867455) +++ oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (working copy) @@ -22,6 +22,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Comparator; import java.util.Date; import java.util.Iterator; @@ -39,6 +40,7 @@ import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.commons.StringUtils; import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo; +import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; @@ -815,6 +817,24 @@ } /** + * Returns a revision vector that contains a revision for each of the passed + * cluster nodes with a revision timestamp that corresponds to the last + * known time when the cluster node was started. + * + * @param clusterNodes the cluster node information. + * @return revision vector representing the last known time when the cluster + * nodes were started. + */ + @NotNull + public static RevisionVector getStartRevisions(@NotNull Iterable clusterNodes) { + List revs = new ArrayList<>(); + for (ClusterNodeInfoDocument doc : clusterNodes) { + revs.add(new Revision(doc.getStartTime(), 0, doc.getClusterId())); + } + return new RevisionVector(revs); + } + + /** * Returns the minimum timestamp to use for a query for child documents that * have been modified between {@code fromRev} and {@code toRev}. * Index: oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java =================================================================== --- oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java (revision 1867455) +++ oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java (working copy) @@ -25,7 +25,6 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.jetbrains.annotations.NotNull; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -163,10 +162,10 @@ @NotNull UpdateOp ourOp, @NotNull Revision ourRev, @NotNull RevisionContext context) { - return new Collision(document, theirRev, ourOp, ourRev, context); + return new Collision(document, theirRev, ourOp, ourRev, context, + RevisionVector.fromString("")); } - @Ignore @Test public void collisionOnOrphanedBranch() throws Exception { DocumentStore store = new MemoryDocumentStore(); @@ -189,6 +188,7 @@ ns = builderProvider.newBuilder() .setDocumentStore(store).setAsyncDelay(0) .setUpdateLimit(10).build(); + ns.updateClusterState(); root = store.find(NODES, Utils.getIdFromPath(ROOT)); assertNotNull(root); @@ -211,7 +211,6 @@ assertNoCollisions(store, ROOT); } - @Ignore @Test public void collisionOnForeignOrphanedBranch() throws Exception { DocumentStore store = new MemoryDocumentStore(); @@ -228,6 +227,7 @@ builder.child("n-" + i).setProperty("p", "v"); } ns1.dispose(); + ns2.updateClusterState(); NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT)); assertNotNull(root); @@ -243,12 +243,13 @@ builder.child("n-0"); merge(ns2, builder); - // must not create a collision marker for a branch commit - // from a clusterId that is inactive - assertNoCollisions(store, ROOT); + // must create a collision marker for a branch commit because + // it is not known when ns1 was stopped + root = store.find(NODES, Utils.getIdFromPath(ROOT)); + assertNotNull(root); + assertThat(root.getLocalMap(COLLISIONS).keySet(), not(empty())); } - @Ignore @Test public void collisionOnForeignOrphanedBranchAfterRestart() throws Exception { DocumentStore store = new MemoryDocumentStore(); @@ -287,6 +288,8 @@ assertNotNull(doc); assertThat(doc.getLocalBranchCommits(), not(empty())); + ns2.updateClusterState(); + builder = ns2.getRoot().builder(); builder.child("n-0"); merge(ns2, builder); Index: oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java =================================================================== --- oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java (revision 1867455) +++ oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java (working copy) @@ -115,7 +115,7 @@ // committed revision on ns2 doc = ns2.getDocumentStore().find(NODES, id); assertTrue(doc.getLocalCommitRoot().containsKey(conflictRev)); - Collision c = new Collision(doc, conflictRev, op, ourRev, ns2); + Collision c = new Collision(doc, conflictRev, op, ourRev, ns2, RevisionVector.fromString("")); assertEquals("Collision must match our revision (" + ourRev + "). " + "The conflict revision " + conflictRev + " is already committed.", ourRev, c.mark(ns2.getDocumentStore())); Index: oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java =================================================================== --- oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java (revision 1867455) +++ oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java (working copy) @@ -266,6 +266,34 @@ } } + @Test + public void withStartRevisionsNull() { + CommitBuilder builder = new CommitBuilder(ns, null); + try { + builder.withStartRevisions(null); + expectNPE(); + } catch (NullPointerException e) { + // expected + } + } + + @Test + public void withStartRevisions() { + RevisionVector head = ns.getHeadRevision(); + CommitBuilder builder = new CommitBuilder(ns, head) + .withStartRevisions(head); + Commit c = builder.build(ns.newRevision()); + assertEquals(head, c.getStartRevisions()); + } + + @Test + public void defaultStartRevisions() { + RevisionVector head = ns.getHeadRevision(); + CommitBuilder builder = new CommitBuilder(ns, head); + Commit c = builder.build(ns.newRevision()); + assertEquals(new RevisionVector(), c.getStartRevisions()); + } + private static void expectNPE() { fail("NullPointerException expected"); } Index: oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java =================================================================== --- oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (revision 1867455) +++ oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (working copy) @@ -16,6 +16,7 @@ */ package org.apache.jackrabbit.oak.plugins.document.util; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; @@ -29,6 +30,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo; +import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; @@ -47,6 +49,7 @@ import org.apache.jackrabbit.oak.stats.Clock; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; @@ -403,4 +406,48 @@ assertThat(e.getMessage(), containsString("newer than current time")); } } + + @Test + public void getStartRevisionsEmpty() { + RevisionVector rv = Utils.getStartRevisions(Collections.emptyList()); + assertEquals(0, rv.getDimensions()); + } + + @Test + public void getStartRevisionsSingleNode() { + int clusterId = 1; + long now = System.currentTimeMillis(); + ClusterNodeInfoDocument info = mockedClusterNodeInfo(clusterId, now); + RevisionVector rv = Utils.getStartRevisions(Collections.singleton(info)); + assertEquals(1, rv.getDimensions()); + Revision r = rv.getRevision(clusterId); + assertNotNull(r); + assertEquals(now, r.getTimestamp()); + } + + @Test + public void getStartRevisionsMultipleNodes() { + int clusterId1 = 1; + int clusterId2 = 2; + long startTime1 = System.currentTimeMillis(); + long startTime2 = startTime1 + 1000; + ClusterNodeInfoDocument info1 = mockedClusterNodeInfo(clusterId1, startTime1); + ClusterNodeInfoDocument info2 = mockedClusterNodeInfo(clusterId2, startTime2); + RevisionVector rv = Utils.getStartRevisions(Arrays.asList(info1, info2)); + assertEquals(2, rv.getDimensions()); + Revision r1 = rv.getRevision(clusterId1); + assertNotNull(r1); + Revision r2 = rv.getRevision(clusterId2); + assertNotNull(r2); + assertEquals(startTime1, r1.getTimestamp()); + assertEquals(startTime2, r2.getTimestamp()); + } + + private static ClusterNodeInfoDocument mockedClusterNodeInfo(int clusterId, + long startTime) { + ClusterNodeInfoDocument info = Mockito.mock(ClusterNodeInfoDocument.class); + Mockito.when(info.getClusterId()).thenReturn(clusterId); + Mockito.when(info.getStartTime()).thenReturn(startTime); + return info; + } }