Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (revision 1704823) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (working copy) @@ -511,13 +511,8 @@ if (baseRevision != null) { Revision newestRev = null; if (before != null) { - newestRev = before.getNewestRevision(nodeStore, revision, - new CollisionHandler() { - @Override - void concurrentModification(Revision other) { - collisions.add(other); - } - }); + newestRev = before.getNewestRevision( + nodeStore, baseRevision, revision, collisions); } String conflictMessage = null; Revision conflictRevision = newestRev; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1704823) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -41,6 +41,7 @@ import java.lang.ref.WeakReference; import java.text.SimpleDateFormat; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -1839,6 +1840,29 @@ return stats; } Map lastRevMap = doc.getLastRev(); + try { + long externalTime = Utils.getMaxExternalTimestamp(lastRevMap.values(), clusterId); + long localTime = clock.getTime(); + if (localTime < externalTime) { + LOG.warn("Detected clock differences. Local time is '{}', " + + "while most recent external time is '{}'. " + + "Current _lastRev entries: {}", + new Date(localTime), new Date(externalTime), lastRevMap.values()); + double delay = ((double) externalTime - localTime) / 1000d; + String msg = String.format("Background read will be delayed by %.1f seconds. " + + "Please check system time on cluster nodes.", delay); + LOG.warn(msg); + clock.waitUntil(externalTime + 1); + } else if (localTime == externalTime) { + // make sure local time is past external time + // but only log at debug + LOG.debug("Local and external time are equal. Waiting until local" + + "time is more recent than external reported time."); + clock.waitUntil(externalTime + 1); + } + } catch (InterruptedException e) { + throw new RuntimeException("Background read interrupted", e); + } Revision.RevisionComparator revisionComparator = getRevisionComparator(); // the (old) head occurred first Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (revision 1704823) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (working copy) @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Queue; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -56,6 +57,7 @@ import com.google.common.collect.Sets; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; @@ -708,18 +710,31 @@ } /** - * Get the revision of the latest change made to this node. + * Get the revision of the latest change made to this node, visible from the + * given {@code changeRev}. Other changes that are not visible from + * {@code changeRev} are added to the {@code collisions} set. This includes + * the following cases: + * * - * @param context the revision context - * @param changeRev the revision of the current change - * @param handler the conflict handler, which is called for concurrent changes - * preceding changeRev. + * @param context the revision context. + * @param baseRev the base revision of the current change. + * @param changeRev the revision of the current change. + * @param collisions changes not visible from {@code changeRev} are added + * to this collisions set. * @return the revision, or null if deleted */ @CheckForNull public Revision getNewestRevision(final RevisionContext context, + final Revision baseRev, final Revision changeRev, - final CollisionHandler handler) { + final Set collisions) { final Map validRevisions = Maps.newHashMap(); Predicate predicate = new Predicate() { @Override @@ -730,15 +745,25 @@ if (isValidRevision(context, input, null, changeRev, validRevisions)) { return true; } - handler.concurrentModification(input); + collisions.add(input); return false; } }; - Revision newestRev = null; // check local commits first SortedMap revisions = getLocalRevisions(); SortedMap commitRoots = getLocalCommitRoot(); + // consider all external revisions newer than changeRev + // as concurrent modification + int localClusterId = context.getClusterId(); + for (Revision r : concat(revisions.keySet(), commitRoots.keySet())) { + if (r.getClusterId() != localClusterId + && isRevisionNewer(context, r, baseRev)) { + collisions.add(r); + } + } + // find newest revision + Revision newestRev = null; Iterator it = filter(Iterables.mergeSorted( ImmutableList.of(revisions.keySet(), commitRoots.keySet()), revisions.comparator()), predicate).iterator(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (revision 1704823) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (working copy) @@ -607,7 +607,7 @@ if (comp != 0) { return comp; } - return Integer.signum(o1.getClusterId() - o2.getClusterId()); + return o1.compareTo(o2); } /** Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (revision 1704823) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java (working copy) @@ -640,4 +640,26 @@ } }); } + + /** + * Returns the highest timestamp of all the passed external revisions. + * A revision is considered external if the clusterId is different from the + * passed {@code localClusterId}. + * + * @param revisions the revisions to consider. + * @param localClusterId the id of the local cluster node. + * @return the highest timestamp or {@link Long#MIN_VALUE} if none of the + * revisions is external. + */ + public static long getMaxExternalTimestamp(Iterable revisions, + int localClusterId) { + long maxTime = Long.MIN_VALUE; + for (Revision r : revisions) { + if (r.getClusterId() == localClusterId) { + continue; + } + maxTime = Math.max(maxTime, r.getTimestamp()); + } + return maxTime; + } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1704823) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy) @@ -92,7 +92,6 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.stats.Clock; import org.junit.After; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -1097,7 +1096,6 @@ } // OAK-2929 - @Ignore @Test public void conflictDetectionWithClockDifference() throws Exception { MemoryDocumentStore store = new MemoryDocumentStore(); @@ -1155,7 +1153,6 @@ } // OAK-2929 - @Ignore @Test public void parentWithUnseenChildrenMustNotBeDeleted() throws Exception { final MemoryDocumentStore docStore = new MemoryDocumentStore(); @@ -1318,13 +1315,8 @@ //root would hold reference to store2 root state after initial repo initialization root = store2.getRoot(); - //The hidden node itself should be creatable across cluster concurrently + //The hidden node and children should be creatable across cluster concurrently builder = root.builder(); - builder.child(":dynHidden"); - merge(store2, builder); - - //Children of hidden node should be creatable across cluster concurrently - builder = root.builder(); builder.child(":hidden").child("b"); builder.child(":dynHidden").child("c"); merge(store2, builder); @@ -1499,7 +1491,6 @@ } // OAK-3388 - @Ignore @Test public void clusterWithClockDifferences() throws Exception { MemoryDocumentStore store = new MemoryDocumentStore(); @@ -1551,7 +1542,6 @@ } // OAK-3388 - @Ignore @Test public void clusterWithClockDifferences2() throws Exception { MemoryDocumentStore store = new MemoryDocumentStore(); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (revision 1704823) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/NodeDocumentTest.java (working copy) @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -366,15 +367,11 @@ // simulate a change revision within the range of // the most recent previous document Iterable changes = prev.getAllChanges(); - Revision changeRev = new Revision(Iterables.getLast(changes).getTimestamp(), 1000, ns.getClusterId()); + Revision baseRev = Iterables.getLast(changes); + Revision changeRev = new Revision(baseRev.getTimestamp(), 1000, ns.getClusterId()); // reset calls to previous documents prevDocCalls.clear(); - doc.getNewestRevision(ns, changeRev, new CollisionHandler() { - @Override - void concurrentModification(Revision other) { - // ignore - } - }); + doc.getNewestRevision(ns, baseRev, changeRev, new HashSet()); // must not read all previous docs assertTrue("too many calls for previous documents: " + prevDocCalls, prevDocCalls.size() <= 4); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java (revision 1704823) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/RevisionTest.java (working copy) @@ -129,6 +129,22 @@ } @Test + public void compare2() { + RevisionComparator comp = new RevisionComparator(1); + + Revision r2 = new Revision(7, 0, 2); + Revision r3 = new Revision(5, 0, 3); + + Revision seenAt = new Revision(8, 0, 0); + comp.add(r2, seenAt); + comp.add(r3, seenAt); + + // both revisions have same seenAt revision, must use + // revision timestamp for comparison + assertTrue(comp.compare(r2, r3) > 0); + } + + @Test public void revisionComparatorSimple() { RevisionComparator comp = new RevisionComparator(0); Revision r1 = Revision.newRevision(0); @@ -254,12 +270,12 @@ comp.add(c1sync, Revision.fromString("r2-0-0")); Revision c2sync = Revision.fromString("r4-1-2"); comp.add(c2sync, Revision.fromString("r2-1-0")); - Revision c3sync = Revision.fromString("r2-0-3"); + Revision c3sync = Revision.fromString("r5-0-3"); comp.add(c3sync, Revision.fromString("r2-1-0")); assertTrue(comp.compare(r1, r2) < 0); assertTrue(comp.compare(r2, c2sync) < 0); - // same seen-at revision, but clusterId 2 < 3 + // same seen-at revision, but rev timestamp c2sync < c3sync assertTrue(comp.compare(c2sync, c3sync) < 0); // this means, c3sync must be after r1 and r2 Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (revision 1704823) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/util/UtilsTest.java (working copy) @@ -16,7 +16,10 @@ */ package org.apache.jackrabbit.oak.plugins.document.util; +import java.util.List; + import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -138,4 +141,42 @@ store.dispose(); } } + + @Test + public void getMaxExternalRevisionTime() { + int localClusterId = 1; + List revs = ImmutableList.of(); + long revTime = Utils.getMaxExternalTimestamp(revs, localClusterId); + assertEquals(Long.MIN_VALUE, revTime); + + revs = ImmutableList.of(Revision.fromString("r1-0-1")); + revTime = Utils.getMaxExternalTimestamp(revs, localClusterId); + assertEquals(Long.MIN_VALUE, revTime); + + revs = ImmutableList.of( + Revision.fromString("r1-0-1"), + Revision.fromString("r2-0-2")); + revTime = Utils.getMaxExternalTimestamp(revs, localClusterId); + assertEquals(2, revTime); + + revs = ImmutableList.of( + Revision.fromString("r3-0-1"), + Revision.fromString("r2-0-2")); + revTime = Utils.getMaxExternalTimestamp(revs, localClusterId); + assertEquals(2, revTime); + + revs = ImmutableList.of( + Revision.fromString("r1-0-1"), + Revision.fromString("r2-0-2"), + Revision.fromString("r2-0-3")); + revTime = Utils.getMaxExternalTimestamp(revs, localClusterId); + assertEquals(2, revTime); + + revs = ImmutableList.of( + Revision.fromString("r1-0-1"), + Revision.fromString("r3-0-2"), + Revision.fromString("r2-0-3")); + revTime = Utils.getMaxExternalTimestamp(revs, localClusterId); + assertEquals(3, revTime); + } }