Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (revision 1800855) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (working copy) @@ -112,15 +112,38 @@ *
  • the thread is interrupted
  • * * - * @param conflictRevisions the revisions to become visible. + * @param revisions the revisions to become visible. */ - void suspendUntilAll(@Nonnull Set conflictRevisions) { + void suspendUntilAll(@Nonnull Set revisions) { + try { + suspendUntilAll(revisions, suspendTimeout); + } catch (InterruptedException e) { + LOG.debug("The suspended thread has been interrupted", e); + } + } + + /** + * Suspends until for each of given revisions one of the following happens: + * + * + * @param revisions the revisions to become visible. + * @param suspendTimeoutMillis how long to suspend at max + * @throws InterruptedException thrown when this thread was interrupted + * while waiting + */ + void suspendUntilAll(@Nonnull Set revisions, long suspendTimeoutMillis) + throws InterruptedException { Semaphore s; int addedRevisions; synchronized (suspendedCommits) { RevisionVector headRevision = context.getHeadRevision(); - Set afterHead = new HashSet(conflictRevisions.size()); - for (Revision r : conflictRevisions) { + Set afterHead = new HashSet(revisions.size()); + for (Revision r : revisions) { if (headRevision.isRevisionNewer(r)) { afterHead.add(r); } @@ -131,9 +154,7 @@ addedRevisions = afterHead.size(); } try { - s.tryAcquire(addedRevisions, suspendTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.debug("The suspended thread has been interrupted", e); + s.tryAcquire(addedRevisions, suspendTimeoutMillis, TimeUnit.MILLISECONDS); } finally { synchronized (suspendedCommits) { suspendedCommits.remove(s); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1800855) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -3171,7 +3171,53 @@ public String getInstanceId() { return String.valueOf(getClusterId()); } + + @Override + public String getVisibilityToken() { + final DocumentNodeState theRoot = root; + if (theRoot == null) { + // unlikely but for paranoia reasons... + return ""; + } + return theRoot.getRootRevision().asString(); + } + + private boolean isVisible(RevisionVector rv) { + // do not synchronize, take a local copy instead + final DocumentNodeState localRoot = root; + if (localRoot == null) { + // unlikely but for paranoia reasons... + return false; + } + return Utils.isGreaterOrEquals(localRoot.getRootRevision(), rv); + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + if (visibilityToken == null || visibilityToken.isEmpty()) { + // we've asked for @Nonnull.. + // hence throwing an exception + throw new IllegalArgumentException("visibilityToken must not be null or empty"); + } + // 'fromString' would throw a RuntimeException if it can't parse + // that would be re thrown automatically + final RevisionVector visibilityTokenRv = RevisionVector.fromString(visibilityToken); + if (isVisible(visibilityTokenRv)) { + // the simple case + return true; + } + + // otherwise wait until the visibility token's revisions all become visible + // (or maxWaitMillis has passed) + commitQueue.suspendUntilAll(Sets.newHashSet(visibilityTokenRv), maxWaitMillis); + + // if we got interrupted above would throw InterruptedException + // otherwise, we don't know why suspendUntilAll returned, so + // check the final isVisible state and return it + return isVisible(visibilityTokenRv); + } + public DocumentNodeStoreStatsCollector getStatsCollector() { return nodeStoreStatsCollector; } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (revision 1800855) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (working copy) @@ -108,12 +108,32 @@ public String getInstanceId() { return "1"; } + + @Override + public String getVisibilityToken() { + return ""; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + return true; + } }; private static final Clusterable CLUSTER_2 = new Clusterable() { @Override public String getInstanceId() { return "2"; } + + @Override + public String getVisibilityToken() { + return ""; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + return true; + } }; private static final EditorHook HOOK_NO_CLUSTER = new EditorHook( new TestableACEProvider(null, null, null, null)); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1800855) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy) @@ -59,10 +59,14 @@ import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -3192,7 +3196,130 @@ assertThat(updates, everyItem(is(30))); assertEquals(1, updates.size()); } + + // OAK-6276 + @Test + public void visibilityToken() throws Exception { + DocumentStore docStore = new MemoryDocumentStore(); + DocumentNodeStore ns1 = builderProvider.newBuilder() + .setDocumentStore(docStore).setAsyncDelay(0) + .setClusterId(1).getNodeStore(); + ns1.getRoot(); + ns1.runBackgroundOperations(); + DocumentNodeStore ns2 = builderProvider.newBuilder() + .setDocumentStore(docStore).setAsyncDelay(0) + .setClusterId(2).getNodeStore(); + ns2.getRoot(); + + String vt1 = ns1.getVisibilityToken(); + String vt2 = ns2.getVisibilityToken(); + + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns1.isVisible(vt1, 1)); + assertTrue(ns1.isVisible(vt1, 100000000)); + assertTrue(ns2.isVisible(vt2, -1)); + assertTrue(ns2.isVisible(vt2, 1)); + assertTrue(ns2.isVisible(vt2, 100000000)); + assertFalse(ns1.isVisible(vt2, -1)); + assertFalse(ns1.isVisible(vt2, 1)); + assertTrue(ns2.isVisible(vt1, -1)); + ns2.runBackgroundOperations(); + ns1.runBackgroundOperations(); + assertTrue(ns1.isVisible(vt2, -1)); + assertTrue(ns2.isVisible(vt1, -1)); + + vt1 = ns1.getVisibilityToken(); + vt2 = ns2.getVisibilityToken(); + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns2.isVisible(vt2, -1)); + assertTrue(ns1.isVisible(vt2, -1)); + assertTrue(ns2.isVisible(vt1, -1)); + assertTrue(ns1.isVisible(vt1, 100000000)); + assertTrue(ns2.isVisible(vt2, 100000000)); + assertTrue(ns1.isVisible(vt2, 100000000)); + assertTrue(ns2.isVisible(vt1, 100000000)); + + NodeBuilder b1 = ns1.getRoot().builder(); + b1.setProperty("p1", "1"); + ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + NodeBuilder b2 = ns2.getRoot().builder(); + b2.setProperty("p2", "2"); + ns2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns2.isVisible(vt2, -1)); + assertTrue(ns1.isVisible(vt2, -1)); + assertTrue(ns2.isVisible(vt1, -1)); + + vt1 = ns1.getVisibilityToken(); + vt2 = ns2.getVisibilityToken(); + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns2.isVisible(vt2, -1)); + assertFalse(ns1.isVisible(vt2, -1)); + assertFalse(ns2.isVisible(vt1, -1)); + assertFalse(ns1.isVisible(vt2, 1)); + assertFalse(ns2.isVisible(vt1, 1)); + + ns1.runBackgroundOperations(); + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns2.isVisible(vt2, -1)); + assertFalse(ns1.isVisible(vt2, -1)); + assertFalse(ns2.isVisible(vt1, -1)); + assertFalse(ns1.isVisible(vt2, 1)); + assertFalse(ns2.isVisible(vt1, 1)); + + ns2.runBackgroundOperations(); + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns2.isVisible(vt2, -1)); + assertFalse(ns1.isVisible(vt2, -1)); + assertFalse(ns1.isVisible(vt2, 1)); + assertTrue(ns2.isVisible(vt1, -1)); + + ns1.runBackgroundOperations(); + assertTrue(ns1.isVisible(vt1, -1)); + assertTrue(ns2.isVisible(vt2, -1)); + assertTrue(ns1.isVisible(vt2, -1)); + assertTrue(ns2.isVisible(vt1, -1)); + + vt1 = ns1.getVisibilityToken(); + vt2 = ns2.getVisibilityToken(); + assertTrue(ns1.isVisible(vt2, -1)); + assertTrue(ns2.isVisible(vt1, -1)); + + b1 = ns1.getRoot().builder(); + b1.setProperty("p1", "1b"); + ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + vt1 = ns1.getVisibilityToken(); + assertFalse(ns2.isVisible(vt1, -1)); + final String finalVt1 = vt1; + Future asyncResult = Executors.newFixedThreadPool(1).submit(new Callable() { + @Override + public Void call() throws Exception { + assertTrue(ns2.isVisible(finalVt1, 10000)); + return null; + } + }); + try{ + asyncResult.get(500, TimeUnit.MILLISECONDS); + fail("should have thrown a timeout exception"); + } catch(TimeoutException te) { + // ok + } + ns1.runBackgroundOperations(); + try{ + asyncResult.get(500, TimeUnit.MILLISECONDS); + fail("should have thrown a timeout exception"); + } catch(TimeoutException te) { + // ok + } + ns2.runBackgroundOperations(); + asyncResult.get(6000, TimeUnit.MILLISECONDS); + } + + private static class WriteCountingStore extends MemoryDocumentStore { private final ThreadLocal createMulti = new ThreadLocal<>(); int count; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java (revision 1800855) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java (working copy) @@ -181,5 +181,15 @@ public String getInstanceId() { return "foo"; } + + @Override + public String getVisibilityToken() { + return ""; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + return true; + } } } \ No newline at end of file Index: oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java =================================================================== --- oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java (revision 1800855) +++ oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.spi.state; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * Interface for bearing cluster node specific information. @@ -32,4 +33,58 @@ */ @Nonnull String getInstanceId(); -} + + /** + * Returns the visibility token of the underlying NodeStore. A 'visibility + * token' is an opaque String that can be used to verify if changes done on + * one NodeStore are visible on another NodeStore of the same cluster. This + * can be achieved by generating such a visibility token on the source + * NodeStore, passing it on to the target NodeStore (by whatever means) and + * checking for visibility on that target NodeStore. + *

    + * The visibility check returns true if the target NodeStore sees at least + * all the changes that the source NodeStore saw at time of visibility token + * generation. Once a visibility token is visible on a particular NodeStore + * it will always return true ever after. This also implies that the + * visibility check can only state whether at least all source changes are + * visible on the target and that it is independent of any further + * modifications. + *

    + * When source and target NodeStore are identical, the visibility check is + * expected to return true, immediately. This is based on the assumption + * that with a session.refresh() on that NodeStore you'll always get the + * latest changes applied by any other session locally. + *

    + * Visibility tokens are meant to be lightweight and are not expected to be + * persisted by the implementor. Nevertheless they should survive their + * validity in the case of crashes of the source and/or the target instance. + */ + @Nullable + String getVisibilityToken(); + + /** + * Checks if the underlying NodeStore sees at least the changes that were + * visible at the time the visibility token was created on potentially + * another instance if in a clustered NodeStore setup. + *

    + * If the visibility token was created on the underlying NodeStore this + * check always returns true, immediately. + * + * @param visibilityToken + * the visibility token that was created on another instance in a + * clustered NodeStore setup. Providing null is not supported and + * might throw a RuntimeException + * @param maxWaitMillis + * if >-1 waits (at max this many milliseconds if >0, + * forever if ==0) until the underlying NodeStore sees at least + * the changes represented by the provided visibility token. if + * < 0 the method does not wait + * @return true if the underlying NodeStore sees at least the changes that + * were visible at the time the visibility token was created + * @throws InterruptedException + * (optionally) thrown if interrupted while waiting + * @see VisibilityTokenProvider VisibilityTokenProvider for a definition and + * usage of visibility tokens + */ + boolean isVisible(@Nonnull String visibilityToken, long maxWaitMillis) throws InterruptedException; +} \ No newline at end of file