diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java index 051eeb1..b8f674a 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java @@ -1048,7 +1048,7 @@ public final class NodeDocument extends Document implements CachedNodeDocument{ } if (store instanceof RevisionListener) { - ((RevisionListener) store).updateAccessedRevision(lastRevision); + ((RevisionListener) store).updateAccessedRevision(lastRevision, nodeStore.getClusterId()); } return new DocumentNodeState(nodeStore, path, readRevision, props, hasChildren(), lastRevision); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java index bd0ad10..7bef9b3 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/RevisionListener.java @@ -22,6 +22,6 @@ package org.apache.jackrabbit.oak.plugins.document; */ public interface RevisionListener { - void updateAccessedRevision(RevisionVector revision); + void updateAccessedRevision(RevisionVector revision, int currentClusterId); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index 043a3dc..ab1836b 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -149,7 +149,7 @@ public class MongoDocumentStore implements DocumentStore, RevisionListener { private RevisionVector mostRecentAccessedRevisions; - final LocalChanges localChanges; + LocalChanges localChanges; private final long maxReplicationLagMillis; @@ -249,11 +249,9 @@ public class MongoDocumentStore implements DocumentStore, RevisionListener { localChanges = null; } else { replicaInfo = new ReplicaSetInfo(clock, db, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis, builder.getExecutor()); - Thread replicaInfoThread = new Thread(replicaInfo, "MongoDocumentStore replica set info provider (" + builder.getClusterId() + ")"); + Thread replicaInfoThread = new Thread(replicaInfo, "MongoDocumentStore replica set info provider"); replicaInfoThread.setDaemon(true); replicaInfoThread.start(); - localChanges = new LocalChanges(builder.getClusterId()); - replicaInfo.addListener(localChanges); } // indexes: @@ -1325,12 +1323,14 @@ public class MongoDocumentStore implements DocumentStore, RevisionListener { NodeDocument cachedDoc = nodesCache.getIfPresent(parentId); secondarySafe = cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit); } - } else { + } else if (localChanges != null) { secondarySafe = true; secondarySafe &= collection == Collection.NODES; secondarySafe &= documentId == null || !localChanges.mayContain(documentId); secondarySafe &= parentId == null || !localChanges.mayContainChildrenOf(parentId); secondarySafe &= mostRecentAccessedRevisions == null || replicaInfo.isMoreRecentThan(mostRecentAccessedRevisions); + } else { // localChanges not initialized yet + secondarySafe = false; } ReadPreference readPreference; @@ -1666,7 +1666,12 @@ public class MongoDocumentStore implements DocumentStore, RevisionListener { } @Override - public synchronized void updateAccessedRevision(RevisionVector revisions) { + public synchronized void updateAccessedRevision(RevisionVector revisions, int clusterId) { + if (localChanges == null && replicaInfo != null) { + localChanges = new LocalChanges(clusterId); + replicaInfo.addListener(localChanges); + } + RevisionVector previousValue = mostRecentAccessedRevisions; if (mostRecentAccessedRevisions == null) { mostRecentAccessedRevisions = revisions; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java index d43c7e6..b7a3e01 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java @@ -215,9 +215,9 @@ public final class LeaseCheckDocumentStoreWrapper implements DocumentStore, Revi } @Override - public void updateAccessedRevision(RevisionVector revision) { + public void updateAccessedRevision(RevisionVector revision, int currentClusterId) { if (delegate instanceof RevisionListener) { - ((RevisionListener) delegate).updateAccessedRevision(revision); + ((RevisionListener) delegate).updateAccessedRevision(revision, currentClusterId); } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java index 5550b7f..fead88b 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java @@ -430,10 +430,10 @@ public class LoggingDocumentStoreWrapper implements DocumentStore, RevisionListe } @Override - public void updateAccessedRevision(RevisionVector revision) { + public void updateAccessedRevision(RevisionVector revision, int currentClusterId) { logMethod("updateAccessedRevision", revision); if (store instanceof RevisionListener) { - ((RevisionListener) store).updateAccessedRevision(revision); + ((RevisionListener) store).updateAccessedRevision(revision, currentClusterId); } } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java index 73d3e68..355a292 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java @@ -162,9 +162,9 @@ public class SynchronizingDocumentStoreWrapper implements DocumentStore, Revisio } @Override - public synchronized void updateAccessedRevision(RevisionVector revision) { + public synchronized void updateAccessedRevision(RevisionVector revision, int currentClusterId) { if (store instanceof RevisionListener) { - ((RevisionListener) store).updateAccessedRevision(revision); + ((RevisionListener) store).updateAccessedRevision(revision, currentClusterId); } } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java index 022fe0f..be76ab9 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java @@ -412,11 +412,11 @@ public class TimingDocumentStoreWrapper implements DocumentStore, RevisionListen } @Override - public void updateAccessedRevision(RevisionVector revision) { + public void updateAccessedRevision(RevisionVector revision, int currentClusterId) { try { long start = now(); if (base instanceof RevisionListener) { - ((RevisionListener) base).updateAccessedRevision(revision); + ((RevisionListener) base).updateAccessedRevision(revision, currentClusterId); } updateAndLogTimes("updateAccessedRevision", start, 0, 0); } catch (Exception e) { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java index 39ab41e..b0b2596 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java @@ -240,9 +240,9 @@ public class CountingDocumentStore implements DocumentStore, RevisionListener { } @Override - public void updateAccessedRevision(RevisionVector revision) { + public void updateAccessedRevision(RevisionVector revision, int currentClusterId) { if (delegate instanceof RevisionListener) { - ((RevisionListener) delegate).updateAccessedRevision(revision); + ((RevisionListener) delegate).updateAccessedRevision(revision, currentClusterId); } } } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java index cf58f79..1e070a1 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreWrapper.java @@ -173,9 +173,9 @@ public class DocumentStoreWrapper implements DocumentStore, RevisionListener { } @Override - public void updateAccessedRevision(RevisionVector revision) { + public void updateAccessedRevision(RevisionVector revision, int currentClusterId) { if (store instanceof RevisionListener) { - ((RevisionListener) store).updateAccessedRevision(revision); + ((RevisionListener) store).updateAccessedRevision(revision, currentClusterId); } } }