Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (revision 1800855)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/CommitQueue.java (working copy)
@@ -112,15 +112,38 @@
*
the thread is interrupted
*
*
- * @param conflictRevisions the revisions to become visible.
+ * @param revisions the revisions to become visible.
*/
- void suspendUntilAll(@Nonnull Set conflictRevisions) {
+ void suspendUntilAll(@Nonnull Set revisions) {
+ try {
+ suspendUntilAll(revisions, suspendTimeout);
+ } catch (InterruptedException e) {
+ LOG.debug("The suspended thread has been interrupted", e);
+ }
+ }
+
+ /**
+ * Suspends until for each of given revisions one of the following happens:
+ *
+ * - the given revision is visible from the current headRevision
+ * - the given revision is canceled from the commit queue
+ * - the suspend timeout is reached. See {@link #setSuspendTimeoutMillis(long)}
+ * - the thread is interrupted
+ *
+ *
+ * @param revisions the revisions to become visible.
+ * @param suspendTimeoutMillis how long to suspend at max
+ * @throws InterruptedException thrown when this thread was interrupted
+ * while waiting
+ */
+ void suspendUntilAll(@Nonnull Set revisions, long suspendTimeoutMillis)
+ throws InterruptedException {
Semaphore s;
int addedRevisions;
synchronized (suspendedCommits) {
RevisionVector headRevision = context.getHeadRevision();
- Set afterHead = new HashSet(conflictRevisions.size());
- for (Revision r : conflictRevisions) {
+ Set afterHead = new HashSet(revisions.size());
+ for (Revision r : revisions) {
if (headRevision.isRevisionNewer(r)) {
afterHead.add(r);
}
@@ -131,9 +154,7 @@
addedRevisions = afterHead.size();
}
try {
- s.tryAcquire(addedRevisions, suspendTimeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.debug("The suspended thread has been interrupted", e);
+ s.tryAcquire(addedRevisions, suspendTimeoutMillis, TimeUnit.MILLISECONDS);
} finally {
synchronized (suspendedCommits) {
suspendedCommits.remove(s);
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1800855)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy)
@@ -3171,7 +3171,53 @@
public String getInstanceId() {
return String.valueOf(getClusterId());
}
+
+ @Override
+ public String getVisibilityToken() {
+ final DocumentNodeState theRoot = root;
+ if (theRoot == null) {
+ // unlikely but for paranoia reasons...
+ return "";
+ }
+ return theRoot.getRootRevision().asString();
+ }
+
+ private boolean isVisible(RevisionVector rv) {
+ // do not synchronize, take a local copy instead
+ final DocumentNodeState localRoot = root;
+ if (localRoot == null) {
+ // unlikely but for paranoia reasons...
+ return false;
+ }
+ return Utils.isGreaterOrEquals(localRoot.getRootRevision(), rv);
+ }
+
+ @Override
+ public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException {
+ if (visibilityToken == null || visibilityToken.isEmpty()) {
+ // we've asked for @Nonnull..
+ // hence throwing an exception
+ throw new IllegalArgumentException("visibilityToken must not be null or empty");
+ }
+ // 'fromString' would throw a RuntimeException if it can't parse
+ // that would be re thrown automatically
+ final RevisionVector visibilityTokenRv = RevisionVector.fromString(visibilityToken);
+ if (isVisible(visibilityTokenRv)) {
+ // the simple case
+ return true;
+ }
+
+ // otherwise wait until the visibility token's revisions all become visible
+ // (or maxWaitMillis has passed)
+ commitQueue.suspendUntilAll(Sets.newHashSet(visibilityTokenRv), maxWaitMillis);
+
+ // if we got interrupted above would throw InterruptedException
+ // otherwise, we don't know why suspendUntilAll returned, so
+ // check the final isVisible state and return it
+ return isVisible(visibilityTokenRv);
+ }
+
public DocumentNodeStoreStatsCollector getStatsCollector() {
return nodeStoreStatsCollector;
}
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (revision 1800855)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/atomic/AtomicCounterEditorTest.java (working copy)
@@ -108,12 +108,32 @@
public String getInstanceId() {
return "1";
}
+
+ @Override
+ public String getVisibilityToken() {
+ return "";
+ }
+
+ @Override
+ public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException {
+ return true;
+ }
};
private static final Clusterable CLUSTER_2 = new Clusterable() {
@Override
public String getInstanceId() {
return "2";
}
+
+ @Override
+ public String getVisibilityToken() {
+ return "";
+ }
+
+ @Override
+ public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException {
+ return true;
+ }
};
private static final EditorHook HOOK_NO_CLUSTER = new EditorHook(
new TestableACEProvider(null, null, null, null));
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1800855)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy)
@@ -59,10 +59,14 @@
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -3192,7 +3196,130 @@
assertThat(updates, everyItem(is(30)));
assertEquals(1, updates.size());
}
+
+ // OAK-6276
+ @Test
+ public void visibilityToken() throws Exception {
+ DocumentStore docStore = new MemoryDocumentStore();
+ DocumentNodeStore ns1 = builderProvider.newBuilder()
+ .setDocumentStore(docStore).setAsyncDelay(0)
+ .setClusterId(1).getNodeStore();
+ ns1.getRoot();
+ ns1.runBackgroundOperations();
+ DocumentNodeStore ns2 = builderProvider.newBuilder()
+ .setDocumentStore(docStore).setAsyncDelay(0)
+ .setClusterId(2).getNodeStore();
+ ns2.getRoot();
+
+ String vt1 = ns1.getVisibilityToken();
+ String vt2 = ns2.getVisibilityToken();
+
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns1.isVisible(vt1, 1));
+ assertTrue(ns1.isVisible(vt1, 100000000));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertTrue(ns2.isVisible(vt2, 1));
+ assertTrue(ns2.isVisible(vt2, 100000000));
+ assertFalse(ns1.isVisible(vt2, -1));
+ assertFalse(ns1.isVisible(vt2, 1));
+ assertTrue(ns2.isVisible(vt1, -1));
+ ns2.runBackgroundOperations();
+ ns1.runBackgroundOperations();
+ assertTrue(ns1.isVisible(vt2, -1));
+ assertTrue(ns2.isVisible(vt1, -1));
+
+ vt1 = ns1.getVisibilityToken();
+ vt2 = ns2.getVisibilityToken();
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertTrue(ns1.isVisible(vt2, -1));
+ assertTrue(ns2.isVisible(vt1, -1));
+ assertTrue(ns1.isVisible(vt1, 100000000));
+ assertTrue(ns2.isVisible(vt2, 100000000));
+ assertTrue(ns1.isVisible(vt2, 100000000));
+ assertTrue(ns2.isVisible(vt1, 100000000));
+
+ NodeBuilder b1 = ns1.getRoot().builder();
+ b1.setProperty("p1", "1");
+ ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ NodeBuilder b2 = ns2.getRoot().builder();
+ b2.setProperty("p2", "2");
+ ns2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertTrue(ns1.isVisible(vt2, -1));
+ assertTrue(ns2.isVisible(vt1, -1));
+
+ vt1 = ns1.getVisibilityToken();
+ vt2 = ns2.getVisibilityToken();
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertFalse(ns1.isVisible(vt2, -1));
+ assertFalse(ns2.isVisible(vt1, -1));
+ assertFalse(ns1.isVisible(vt2, 1));
+ assertFalse(ns2.isVisible(vt1, 1));
+
+ ns1.runBackgroundOperations();
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertFalse(ns1.isVisible(vt2, -1));
+ assertFalse(ns2.isVisible(vt1, -1));
+ assertFalse(ns1.isVisible(vt2, 1));
+ assertFalse(ns2.isVisible(vt1, 1));
+
+ ns2.runBackgroundOperations();
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertFalse(ns1.isVisible(vt2, -1));
+ assertFalse(ns1.isVisible(vt2, 1));
+ assertTrue(ns2.isVisible(vt1, -1));
+
+ ns1.runBackgroundOperations();
+ assertTrue(ns1.isVisible(vt1, -1));
+ assertTrue(ns2.isVisible(vt2, -1));
+ assertTrue(ns1.isVisible(vt2, -1));
+ assertTrue(ns2.isVisible(vt1, -1));
+
+ vt1 = ns1.getVisibilityToken();
+ vt2 = ns2.getVisibilityToken();
+ assertTrue(ns1.isVisible(vt2, -1));
+ assertTrue(ns2.isVisible(vt1, -1));
+
+ b1 = ns1.getRoot().builder();
+ b1.setProperty("p1", "1b");
+ ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ vt1 = ns1.getVisibilityToken();
+ assertFalse(ns2.isVisible(vt1, -1));
+ final String finalVt1 = vt1;
+ Future asyncResult = Executors.newFixedThreadPool(1).submit(new Callable() {
+ @Override
+ public Void call() throws Exception {
+ assertTrue(ns2.isVisible(finalVt1, 10000));
+ return null;
+ }
+ });
+ try{
+ asyncResult.get(500, TimeUnit.MILLISECONDS);
+ fail("should have thrown a timeout exception");
+ } catch(TimeoutException te) {
+ // ok
+ }
+ ns1.runBackgroundOperations();
+ try{
+ asyncResult.get(500, TimeUnit.MILLISECONDS);
+ fail("should have thrown a timeout exception");
+ } catch(TimeoutException te) {
+ // ok
+ }
+ ns2.runBackgroundOperations();
+ asyncResult.get(6000, TimeUnit.MILLISECONDS);
+ }
+
+
private static class WriteCountingStore extends MemoryDocumentStore {
private final ThreadLocal createMulti = new ThreadLocal<>();
int count;
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java (revision 1800855)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java (working copy)
@@ -181,5 +181,15 @@
public String getInstanceId() {
return "foo";
}
+
+ @Override
+ public String getVisibilityToken() {
+ return "";
+ }
+
+ @Override
+ public boolean isVisible(String visibilityToken, long maxWaitMillis) throws InterruptedException {
+ return true;
+ }
}
}
\ No newline at end of file
Index: oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java
===================================================================
--- oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java (revision 1800855)
+++ oak-store-spi/src/main/java/org/apache/jackrabbit/oak/spi/state/Clusterable.java (working copy)
@@ -17,6 +17,7 @@
package org.apache.jackrabbit.oak.spi.state;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
/**
* Interface for bearing cluster node specific information.
@@ -32,4 +33,58 @@
*/
@Nonnull
String getInstanceId();
-}
+
+ /**
+ * Returns the visibility token of the underlying NodeStore. A 'visibility
+ * token' is an opaque String that can be used to verify if changes done on
+ * one NodeStore are visible on another NodeStore of the same cluster. This
+ * can be achieved by generating such a visibility token on the source
+ * NodeStore, passing it on to the target NodeStore (by whatever means) and
+ * checking for visibility on that target NodeStore.
+ *
+ * The visibility check returns true if the target NodeStore sees at least
+ * all the changes that the source NodeStore saw at time of visibility token
+ * generation. Once a visibility token is visible on a particular NodeStore
+ * it will always return true ever after. This also implies that the
+ * visibility check can only state whether at least all source changes are
+ * visible on the target and that it is independent of any further
+ * modifications.
+ *
+ * When source and target NodeStore are identical, the visibility check is
+ * expected to return true, immediately. This is based on the assumption
+ * that with a session.refresh() on that NodeStore you'll always get the
+ * latest changes applied by any other session locally.
+ *
+ * Visibility tokens are meant to be lightweight and are not expected to be
+ * persisted by the implementor. Nevertheless they should survive their
+ * validity in the case of crashes of the source and/or the target instance.
+ */
+ @Nullable
+ String getVisibilityToken();
+
+ /**
+ * Checks if the underlying NodeStore sees at least the changes that were
+ * visible at the time the visibility token was created on potentially
+ * another instance if in a clustered NodeStore setup.
+ *
+ * If the visibility token was created on the underlying NodeStore this
+ * check always returns true, immediately.
+ *
+ * @param visibilityToken
+ * the visibility token that was created on another instance in a
+ * clustered NodeStore setup. Providing null is not supported and
+ * might throw a RuntimeException
+ * @param maxWaitMillis
+ * if >-1 waits (at max this many milliseconds if >0,
+ * forever if ==0) until the underlying NodeStore sees at least
+ * the changes represented by the provided visibility token. if
+ * < 0 the method does not wait
+ * @return true if the underlying NodeStore sees at least the changes that
+ * were visible at the time the visibility token was created
+ * @throws InterruptedException
+ * (optionally) thrown if interrupted while waiting
+ * @see VisibilityTokenProvider VisibilityTokenProvider for a definition and
+ * usage of visibility tokens
+ */
+ boolean isVisible(@Nonnull String visibilityToken, long maxWaitMillis) throws InterruptedException;
+}
\ No newline at end of file