Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1799341) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -52,10 +52,12 @@ import java.util.Set; import java.util.SortedSet; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; @@ -3152,6 +3154,102 @@ public String getInstanceId() { return String.valueOf(getClusterId()); } + + @Override + public String getVisibilityToken() { + final DocumentNodeState theRoot = root; + if (theRoot == null) { + // unlikely but for paranoia reasons... + return ""; + } + return theRoot.getRootRevision().asString(); + } + + private boolean isVisible(RevisionVector rv) { + final DocumentNodeState theRoot = root; + if (theRoot == null) { + // unlikely but for paranoia reasons... + return false; + } + final RevisionVector headRv = theRoot.getRootRevision(); + final boolean isVisible = headRv.compareTo(rv) >= 0; + return isVisible; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + if (visibilityToken == null || visibilityToken.isEmpty()) { + // we've asked for @Nonnull.. + // hence throwing an exception + throw new IllegalArgumentException("visibilityToken must not be null or empty"); + } + // 'fromString' would throw a RuntimeException if it can't parse + // that would be re thrown automatically + final RevisionVector visibilityTokenRv = RevisionVector.fromString(visibilityToken); + final boolean isVisible = isVisible(visibilityTokenRv); + if (isVisible || maxWaitMillis < 0 /*==no-wait*/) { + // the simple case + // (avoid adding/removing an observer in this most frequent case) + return isVisible; + } + // otherwise we have to wait at max waitTimeoutMillis + // but ideally we react quickly - so using a temporary observer for that + final CompletableFuture isVisibleResult = new CompletableFuture(); + final Observer observer = new Observer() { + + @Override + public void contentChanged(NodeState root, CommitInfo info) { + if (isVisible(visibilityTokenRv)) { + // trigger the future result + isVisibleResult.complete(true); + } // else keep waiting + } + }; + // since dispatcher.addObserver can be lengthy we calculate the timeout here already + final long timeout = maxWaitMillis > 0 ? System.currentTimeMillis() + maxWaitMillis : -1; + final Closeable observerClosable = dispatcher.addObserver(observer); + try{ + // again, as dispatcher.addObserver can be lengthy we might have + // just have missed 1 contentChanged call and while that's + // not extremely likely it could have been the last for a few seconds + // so we're checking here again before waiting on the contentChanged + // callback + if (isVisible(visibilityTokenRv)) { + return true; + } + // if it's not visible now then we indeed wait for the + // contentChanged callback... + if (maxWaitMillis == 0) { + // wait forever... + return isVisibleResult.get(); + } else { + // re calculate the maxWaitMillis due to length addObserver + final long remainingMaxWaitMillis = timeout - System.currentTimeMillis(); + // if remainingMaxWaitMillis <= 0 it will throw a TimeoutException + // (which is fine) + return isVisibleResult.get(remainingMaxWaitMillis, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + throw e; // forward + } catch (ExecutionException e) { + // this should really not occur + // hence translate into a RuntimeException + throw new RuntimeException(e); + } catch (TimeoutException e) { + // upon timeout we can make 1 final check + return isVisible(visibilityTokenRv); + } finally { + if (observerClosable != null) { + try { + observerClosable.close(); + } catch (IOException e) { + // this should not happen + // hence translate into a RuntimeException + throw new RuntimeException(e); + } + } + } + } public DocumentNodeStoreStatsCollector getStatsCollector() { return nodeStoreStatsCollector; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (revision 1799341) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (working copy) @@ -108,12 +108,32 @@ public String getInstanceId() { return "1"; } + + @Override + public String getVisibilityToken() { + return ""; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + return true; + } }; private static final Clusterable CLUSTER_2 = new Clusterable() { @Override public String getInstanceId() { return "2"; } + + @Override + public String getVisibilityToken() { + return ""; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + return true; + } }; private static final EditorHook HOOK_NO_CLUSTER = new EditorHook( new TestableACEProvider(null, null, null, null)); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java (revision 1799341) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java (working copy) @@ -181,5 +181,15 @@ public String getInstanceId() { return "foo"; } + + @Override + public String getVisibilityToken() { + return ""; + } + + @Override + public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException { + return true; + } } } \ No newline at end of file Index: oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java =================================================================== --- oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java (revision 1799341) +++ oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.spi.state; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * Interface for bearing cluster node specific information. @@ -32,4 +33,58 @@ */ @Nonnull String getInstanceId(); -} + + /** + * Returns the visibility token of the underlying NodeStore. A 'visibility + * token' is an opaque String that can be used to verify if changes done on + * one NodeStore are visible on another NodeStore of the same cluster. This + * can be achieved by generating such a visibility token on the source + * NodeStore, passing it on to the target NodeStore (by whatever means) and + * checking for visibility on that target NodeStore. + *

+ * The visibility check returns true if the target NodeStore sees at least + * all the changes that the source NodeStore saw at time of visibility token + * generation. Once a visibility token is visible on a particular NodeStore + * it will always return true ever after. This also implies that the + * visibility check can only state whether at least all source changes are + * visible on the target and that it is independent of any further + * modifications. + *

+ * When source and target NodeStore are identical, the visibility check is + * expected to return true, immediately. This is based on the assumption + * that with a session.refresh() on that NodeStore you'll always get the + * latest changes applied by any other session locally. + *

+ * Visibility tokens are meant to be lightweight and are not expected to be + * persisted by the implementor. Nevertheless they should survive their + * validity in the case of crashes of the source and/or the target instance. + */ + @Nullable + String getVisibilityToken(); + + /** + * Checks if the underlying NodeStore sees at least the changes that were + * visible at the time the visibility token was created on potentially + * another instance if in a clustered NodeStore setup. + *

+ * If the visibility token was created on the underlying NodeStore this + * check always returns true, immediately. + * + * @param visibilityToken + * the visibility token that was created on another instance in a + * clustered NodeStore setup. Providing null is not supported and + * might throw a RuntimeException + * @param maxWaitMillis + * if >-1 waits (at max this many milliseconds if >0) until + * the underlying NodeStore sees at least the changes represented + * by the provided visibility token. if < 0 the method does + * not wait + * @return true if the underlying NodeStore sees at least the changes that + * were visible at the time the visibility token was created + * @throws InterruptedException + * thrown if interrupted while waiting + * @see VisibilityTokenProvider VisibilityTokenProvider for a definition and + * usage of visibility tokens + */ + boolean isVisible(@Nonnull String visibilityToken, long maxWaitMillis) throws InterruptedException; +} \ No newline at end of file