diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java index 786f64a594..78d79ee8ad 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collision.java @@ -52,17 +52,20 @@ private final UpdateOp ourOp; private final Revision ourRev; private final RevisionContext context; + private final RevisionVector startRevisions; Collision(@NotNull NodeDocument document, @NotNull Revision theirRev, @NotNull UpdateOp ourOp, @NotNull Revision ourRev, - @NotNull RevisionContext context) { + @NotNull RevisionContext context, + @NotNull RevisionVector startRevisions) { this.document = checkNotNull(document); this.theirRev = checkNotNull(theirRev); this.ourOp = checkNotNull(ourOp); this.ourRev = checkNotNull(ourRev); this.context = checkNotNull(context); + this.startRevisions = checkNotNull(startRevisions); } /** @@ -101,9 +104,18 @@ Revision mark(DocumentStore store) throws DocumentStoreException { * * @return {@code true} if this is a conflicting collision, {@code false} * otherwise. - * @throws DocumentStoreException + * @throws DocumentStoreException if an operation on the document store + * fails. */ boolean isConflicting() throws DocumentStoreException { + // their revision is not conflicting when it is identified as branch + // commit that cannot be merged (orphaned branch commit, theirRev is + // garbage). + if (document.getLocalBranchCommits().contains(theirRev) + && !startRevisions.isRevisionNewer(theirRev)) { + return false; + } + // did their revision create or delete the node? if (document.getDeleted().containsKey(theirRev)) { return true; diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java index d4d12cd34c..169768c9be 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java @@ -59,6 +59,7 @@ protected final DocumentNodeStore nodeStore; private final RevisionVector baseRevision; + private final RevisionVector startRevisions; private final Revision revision; private final HashMap operations = new LinkedHashMap<>(); private final Set collisions = new LinkedHashSet(); @@ -85,24 +86,29 @@ * @param revision the revision for this commit. * @param baseRevision the base revision for this commit or {@code null} if * there is none. + * @param startRevisions the revisions for each cluster node corresponding + * to the start time of the cluster nodes. */ Commit(@NotNull DocumentNodeStore nodeStore, @NotNull Revision revision, - @Nullable RevisionVector baseRevision) { + @Nullable RevisionVector baseRevision, + @NotNull RevisionVector startRevisions) { this.nodeStore = checkNotNull(nodeStore); this.revision = checkNotNull(revision); this.baseRevision = baseRevision; + this.startRevisions = startRevisions; } Commit(@NotNull DocumentNodeStore nodeStore, @NotNull Revision revision, @Nullable RevisionVector baseRevision, + @NotNull RevisionVector startRevisions, @NotNull Map operations, @NotNull Set addedNodes, @NotNull Set removedNodes, @NotNull Set nodesWithBinaries, @NotNull Map bundledNodes) { - this(nodeStore, revision, baseRevision); + this(nodeStore, revision, baseRevision, startRevisions); this.operations.putAll(operations); this.addedNodes.addAll(addedNodes); this.removedNodes.addAll(removedNodes); @@ -595,7 +601,7 @@ private void checkConflicts(@NotNull UpdateOp op, // TODO: unify above conflict detection and isConflicting() boolean allowConflictingDeleteChange = allowConcurrentAddRemove(before, op); for (Revision r : collisions) { - Collision c = new Collision(before, r, op, revision, nodeStore); + Collision c = new Collision(before, r, op, revision, nodeStore, startRevisions); if (c.isConflicting() && !allowConflictingDeleteChange) { // mark collisions on commit root if (c.mark(store).equals(revision)) { @@ -807,6 +813,12 @@ void markChanged(Path path) { } } + + @NotNull + RevisionVector getStartRevisions() { + return startRevisions; + } + /** * Apply the changes of a node to the cache. * diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java index 1f4019b826..bd4f20b0ab 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilder.java @@ -30,7 +30,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** @@ -44,6 +43,7 @@ private final DocumentNodeStore nodeStore; private final Revision revision; private final RevisionVector baseRevision; + private RevisionVector startRevisions = new RevisionVector(); private final Map operations = new LinkedHashMap<>(); private final Set addedNodes = new HashSet<>(); @@ -220,6 +220,19 @@ CommitBuilder markNodeHavingBinary(@NotNull Path path) { return this; } + /** + * Sets the start revisions of known clusterIds on this commit builder. + * + * @param startRevisions the start revisions derived from the start time + * in the clusterNodes entries. + * @return {@code this} builder. + */ + @NotNull + CommitBuilder withStartRevisions(@NotNull RevisionVector startRevisions) { + this.startRevisions = checkNotNull(startRevisions); + return this; + } + /** * Builds the commit with the modifications. * @@ -234,8 +247,9 @@ Commit build() { String msg = "Cannot build a commit with a pseudo commit revision"; throw new IllegalStateException(msg); } - return new Commit(nodeStore, revision, baseRevision, operations, - addedNodes, removedNodes, nodesWithBinaries, bundledNodes); + return new Commit(nodeStore, revision, baseRevision, startRevisions, + operations, addedNodes, removedNodes, nodesWithBinaries, + bundledNodes); } /** @@ -251,8 +265,9 @@ Commit build(@NotNull Revision revision) { Revision from = this.revision; Map operations = Maps.transformValues( this.operations, op -> rewrite(op, from, revision)); - return new Commit(nodeStore, revision, baseRevision, operations, - addedNodes, removedNodes, nodesWithBinaries, bundledNodes); + return new Commit(nodeStore, revision, baseRevision, startRevisions, + operations, addedNodes, removedNodes, nodesWithBinaries, + bundledNodes); } /** diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index a82e25ff41..7ef4700794 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.partition; import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Lists.reverse; @@ -2244,19 +2243,7 @@ boolean updateClusterState() { */ @NotNull private RevisionVector getMinExternalRevisions() { - return new RevisionVector(transform(filter(clusterNodes.values(), - new Predicate() { - @Override - public boolean apply(ClusterNodeInfoDocument input) { - return input.getClusterId() != getClusterId(); - } - }), - new Function() { - @Override - public Revision apply(ClusterNodeInfoDocument input) { - return new Revision(input.getStartTime(), 0, input.getClusterId()); - } - })); + return Utils.getStartRevisions(clusterNodes.values()).remove(getClusterId()); } /** @@ -2759,7 +2746,7 @@ private Commit newTrunkCommit(@NotNull Changes changes, "base must not be a branch revision: " + base); // build commit before revision is created by the commit queue (OAK-7869) - CommitBuilder commitBuilder = new CommitBuilder(this, base); + CommitBuilder commitBuilder = newCommitBuilder(base, null); changes.with(commitBuilder); boolean success = false; @@ -2787,7 +2774,7 @@ private Commit newBranchCommit(@NotNull Changes changes, checkOpen(); Revision commitRevision = newRevision(); - CommitBuilder commitBuilder = new CommitBuilder(this, commitRevision, base); + CommitBuilder commitBuilder = newCommitBuilder(base, commitRevision); changes.with(commitBuilder); if (isDisableBranches()) { // Regular branch commits do not need to acquire the background @@ -2814,6 +2801,19 @@ private Commit newBranchCommit(@NotNull Changes changes, return commitBuilder.build(); } + @NotNull + private CommitBuilder newCommitBuilder(@NotNull RevisionVector base, + @Nullable Revision commitRevision) { + CommitBuilder cb; + if (commitRevision != null) { + cb = new CommitBuilder(this, commitRevision, base); + } else { + cb = new CommitBuilder(this, base); + } + RevisionVector startRevs = Utils.getStartRevisions(clusterNodes.values()); + return cb.withStartRevisions(startRevs); + } + /** * Checks if this store is still open and throws an * {@link IllegalStateException} if it is already disposed (or a dispose diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java index 2bb924f974..7e16805c76 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java @@ -35,7 +35,7 @@ MergeCommit(DocumentNodeStore nodeStore, RevisionVector baseRevision, SortedSet revisions) { - super(nodeStore, revisions.last(), baseRevision); + super(nodeStore, revisions.last(), baseRevision, new RevisionVector()); this.mergeRevs = revisions; } diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java index f73299df33..f81bb7c463 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java @@ -22,6 +22,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Comparator; import java.util.Date; import java.util.Iterator; @@ -39,6 +40,7 @@ import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.commons.StringUtils; import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo; +import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; @@ -814,6 +816,24 @@ public static Long asLong(@Nullable Number n) { } } + /** + * Returns a revision vector that contains a revision for each of the passed + * cluster nodes with a revision timestamp that corresponds to the last + * known time when the cluster node was started. + * + * @param clusterNodes the cluster node information. + * @return revision vector representing the last known time when the cluster + * nodes were started. + */ + @NotNull + public static RevisionVector getStartRevisions(@NotNull Iterable clusterNodes) { + List revs = new ArrayList<>(); + for (ClusterNodeInfoDocument doc : clusterNodes) { + revs.add(new Revision(doc.getStartTime(), 0, doc.getClusterId())); + } + return new RevisionVector(revs); + } + /** * Returns the minimum timestamp to use for a query for child documents that * have been modified between {@code fromRev} and {@code toRev}. diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java index 38297fbe29..ea74b230cb 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionTest.java @@ -25,7 +25,6 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.jetbrains.annotations.NotNull; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -163,10 +162,10 @@ private static Collision newCollision(@NotNull NodeDocument document, @NotNull UpdateOp ourOp, @NotNull Revision ourRev, @NotNull RevisionContext context) { - return new Collision(document, theirRev, ourOp, ourRev, context); + return new Collision(document, theirRev, ourOp, ourRev, context, + RevisionVector.fromString("")); } - @Ignore @Test public void collisionOnOrphanedBranch() throws Exception { DocumentStore store = new MemoryDocumentStore(); @@ -189,6 +188,7 @@ public void collisionOnOrphanedBranch() throws Exception { ns = builderProvider.newBuilder() .setDocumentStore(store).setAsyncDelay(0) .setUpdateLimit(10).build(); + ns.updateClusterState(); root = store.find(NODES, Utils.getIdFromPath(ROOT)); assertNotNull(root); @@ -211,7 +211,6 @@ public void collisionOnOrphanedBranch() throws Exception { assertNoCollisions(store, ROOT); } - @Ignore @Test public void collisionOnForeignOrphanedBranch() throws Exception { DocumentStore store = new MemoryDocumentStore(); @@ -228,6 +227,7 @@ public void collisionOnForeignOrphanedBranch() throws Exception { builder.child("n-" + i).setProperty("p", "v"); } ns1.dispose(); + ns2.updateClusterState(); NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT)); assertNotNull(root); @@ -243,12 +243,13 @@ public void collisionOnForeignOrphanedBranch() throws Exception { builder.child("n-0"); merge(ns2, builder); - // must not create a collision marker for a branch commit - // from a clusterId that is inactive - assertNoCollisions(store, ROOT); + // must create a collision marker for a branch commit because + // it is not known when ns1 was stopped + root = store.find(NODES, Utils.getIdFromPath(ROOT)); + assertNotNull(root); + assertThat(root.getLocalMap(COLLISIONS).keySet(), not(empty())); } - @Ignore @Test public void collisionOnForeignOrphanedBranchAfterRestart() throws Exception { DocumentStore store = new MemoryDocumentStore(); @@ -287,6 +288,8 @@ public void collisionOnForeignOrphanedBranchAfterRestart() throws Exception { assertNotNull(doc); assertThat(doc.getLocalBranchCommits(), not(empty())); + ns2.updateClusterState(); + builder = ns2.getRoot().builder(); builder.child("n-0"); merge(ns2, builder); @@ -296,6 +299,60 @@ public void collisionOnForeignOrphanedBranchAfterRestart() throws Exception { assertNoCollisions(store, ROOT); } + @Test + public void collisionWithBranchOnForeignOrphanedBranchAfterRestart() throws Exception { + int updateLimit = 10; + DocumentStore store = new MemoryDocumentStore(); + DocumentNodeStore ns1 = builderProvider.newBuilder() + .setDocumentStore(store).setAsyncDelay(0) + .setUpdateLimit(updateLimit).setClusterId(1).build(); + DocumentNodeStore ns2 = builderProvider.newBuilder() + .setDocumentStore(store).setAsyncDelay(0) + .setUpdateLimit(updateLimit).setClusterId(2).build(); + + NodeBuilder builder = ns1.getRoot().builder(); + // force a branch commit + for (int i = 0; i < updateLimit * 2; i++) { + builder.child("n-" + i).setProperty("p", "v"); + } + ns1.dispose(); + + NodeDocument root = store.find(NODES, Utils.getIdFromPath(ROOT)); + assertNotNull(root); + assertThat(root.getLocalBranchCommits(), not(empty())); + + // start it up again + ns1 = builderProvider.newBuilder() + .setDocumentStore(store).setAsyncDelay(0) + .setUpdateLimit(updateLimit).setClusterId(1).build(); + + root = store.find(NODES, Utils.getIdFromPath(ROOT)); + assertNotNull(root); + // on init the DocumentNodeStore removes orphaned + // branch commit entries on the root document + assertThat(root.getLocalBranchCommits(), empty()); + + // but some changes are still there + Path p = Path.fromString("/n-0"); + NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p)); + assertNotNull(doc); + assertThat(doc.getLocalBranchCommits(), not(empty())); + + ns2.updateClusterState(); + + builder = ns2.getRoot().builder(); + builder.child("n-0"); + // force a branch commit + for (int i = 0; i < updateLimit * 2; i++) { + builder.child("test").child("c-" + i); + } + merge(ns2, builder); + + // must not create a collision marker for a branch commit + // from a clusterId that is inactive + assertNoCollisions(store, ROOT); + } + private static void assertNoCollisions(DocumentStore store, Path p) { NodeDocument doc = store.find(NODES, Utils.getIdFromPath(p)); assertNotNull(doc); diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java index 085fcc6ece..431da8f4d1 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CollisionWithSplitTest.java @@ -115,7 +115,7 @@ public void collisionAfterSplit() throws Exception { // committed revision on ns2 doc = ns2.getDocumentStore().find(NODES, id); assertTrue(doc.getLocalCommitRoot().containsKey(conflictRev)); - Collision c = new Collision(doc, conflictRev, op, ourRev, ns2); + Collision c = new Collision(doc, conflictRev, op, ourRev, ns2, RevisionVector.fromString("")); assertEquals("Collision must match our revision (" + ourRev + "). " + "The conflict revision " + conflictRev + " is already committed.", ourRev, c.mark(ns2.getDocumentStore())); diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java index c6428ffd1e..d5e7e5e9b3 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/CommitBuilderTest.java @@ -266,6 +266,34 @@ public void updatePropertyPropertyNull() { } } + @Test + public void withStartRevisionsNull() { + CommitBuilder builder = new CommitBuilder(ns, null); + try { + builder.withStartRevisions(null); + expectNPE(); + } catch (NullPointerException e) { + // expected + } + } + + @Test + public void withStartRevisions() { + RevisionVector head = ns.getHeadRevision(); + CommitBuilder builder = new CommitBuilder(ns, head) + .withStartRevisions(head); + Commit c = builder.build(ns.newRevision()); + assertEquals(head, c.getStartRevisions()); + } + + @Test + public void defaultStartRevisions() { + RevisionVector head = ns.getHeadRevision(); + CommitBuilder builder = new CommitBuilder(ns, head); + Commit c = builder.build(ns.newRevision()); + assertEquals(new RevisionVector(), c.getStartRevisions()); + } + private static void expectNPE() { fail("NullPointerException expected"); } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java index 01a01868bd..31f43134af 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java @@ -16,6 +16,7 @@ */ package org.apache.jackrabbit.oak.plugins.document.util; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; @@ -29,6 +30,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo; +import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument; import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; @@ -47,6 +49,7 @@ import org.apache.jackrabbit.oak.stats.Clock; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; @@ -403,4 +406,48 @@ public void checkRevisionAge() throws Exception { assertThat(e.getMessage(), containsString("newer than current time")); } } + + @Test + public void getStartRevisionsEmpty() { + RevisionVector rv = Utils.getStartRevisions(Collections.emptyList()); + assertEquals(0, rv.getDimensions()); + } + + @Test + public void getStartRevisionsSingleNode() { + int clusterId = 1; + long now = System.currentTimeMillis(); + ClusterNodeInfoDocument info = mockedClusterNodeInfo(clusterId, now); + RevisionVector rv = Utils.getStartRevisions(Collections.singleton(info)); + assertEquals(1, rv.getDimensions()); + Revision r = rv.getRevision(clusterId); + assertNotNull(r); + assertEquals(now, r.getTimestamp()); + } + + @Test + public void getStartRevisionsMultipleNodes() { + int clusterId1 = 1; + int clusterId2 = 2; + long startTime1 = System.currentTimeMillis(); + long startTime2 = startTime1 + 1000; + ClusterNodeInfoDocument info1 = mockedClusterNodeInfo(clusterId1, startTime1); + ClusterNodeInfoDocument info2 = mockedClusterNodeInfo(clusterId2, startTime2); + RevisionVector rv = Utils.getStartRevisions(Arrays.asList(info1, info2)); + assertEquals(2, rv.getDimensions()); + Revision r1 = rv.getRevision(clusterId1); + assertNotNull(r1); + Revision r2 = rv.getRevision(clusterId2); + assertNotNull(r2); + assertEquals(startTime1, r1.getTimestamp()); + assertEquals(startTime2, r2.getTimestamp()); + } + + private static ClusterNodeInfoDocument mockedClusterNodeInfo(int clusterId, + long startTime) { + ClusterNodeInfoDocument info = Mockito.mock(ClusterNodeInfoDocument.class); + Mockito.when(info.getClusterId()).thenReturn(clusterId); + Mockito.when(info.getStartTime()).thenReturn(startTime); + return info; + } }