diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java index 475d4f4..fd37d78 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java @@ -499,6 +499,7 @@ public class DocumentMK { public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16; private DocumentNodeStore nodeStore; private DocumentStore documentStore; + private String mongoUri; private DiffCache diffCache; private BlobStore blobStore; private int clusterId = Integer.getInteger("oak.documentMK.clusterId", 0); @@ -554,6 +555,8 @@ public class DocumentMK { @Nonnull String name, int blobCacheSizeMB) throws UnknownHostException { + this.mongoUri = uri; + DB db = new MongoConnection(uri).getDB(name); if (!MongoConnection.hasWriteConcern(uri)) { db.setWriteConcern(MongoConnection.getDefaultWriteConcern(db)); @@ -601,6 +604,16 @@ public class DocumentMK { } /** + * Returns the Mongo URI used in the {@link #setMongoDB(String, String, int)} method. + * + * @return the Mongo URI or null if the {@link #setMongoDB(String, String, int)} method hasn't + * been called. + */ + public String getMongoUri() { + return mongoUri; + } + + /** * Sets a {@link DataSource} to use for the RDB document and blob * stores. * diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java index d83de50..75eec81 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java @@ -1326,10 +1326,12 @@ public final class NodeDocument extends Document implements CachedNodeDocument{ } NodeDocument getPreviousDocument(String prevId){ - //Use the maxAge variant such that in case of Mongo call for - //previous doc are directed towards replicas first LOG.trace("get previous document {}", prevId); - return store.find(Collection.NODES, prevId, Integer.MAX_VALUE); + NodeDocument doc = store.find(Collection.NODES, prevId); + if (doc == null) { + doc = store.find(Collection.NODES, prevId, 0); // force the primary + } + return doc; } @Nonnull diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java index 9861039..69f73b7 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit; import org.apache.jackrabbit.oak.commons.StringUtils; import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore; +import org.apache.jackrabbit.oak.plugins.document.Document; +import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index f5bb6ac..163cda0 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -61,6 +61,7 @@ import org.apache.jackrabbit.oak.plugins.document.DocumentStoreStatsCollector; import org.apache.jackrabbit.oak.plugins.document.JournalEntry; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator; import org.apache.jackrabbit.oak.plugins.document.UpdateOp; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; @@ -70,6 +71,8 @@ import org.apache.jackrabbit.oak.plugins.document.UpdateUtils; import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker; import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; +import org.apache.jackrabbit.oak.plugins.document.mongo.replica.LocalChanges; +import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo; import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks; import org.apache.jackrabbit.oak.plugins.document.locks.StripedNodeDocumentLocks; import org.apache.jackrabbit.oak.plugins.document.util.Utils; @@ -140,6 +143,12 @@ public class MongoDocumentStore implements DocumentStore { private Clock clock = Clock.SIMPLE; + private final ReplicaSetInfo replicaInfo; + + private RevisionVector mostRecentAccessedRevisions; + + private LocalChanges localChanges; + private final long maxReplicationLagMillis; /** @@ -171,6 +180,15 @@ public class MongoDocumentStore implements DocumentStore { Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1)); /** + * How often in milliseconds the MongoDocumentStore should estimate the + * replication lag. + *

+ * Default is 60'000 (one minute). + */ + private long estimationPullFrequencyMS = + Long.getLong("oak.mongo.estimationPullFrequencyMS", TimeUnit.SECONDS.toMillis(5)); + + /** * The number of documents to put into one bulk update. *

* Default is 30. @@ -211,6 +229,11 @@ public class MongoDocumentStore implements DocumentStore { maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); + replicaInfo = new ReplicaSetInfo(clock, db, builder.getMongoUri(), estimationPullFrequencyMS, maxReplicationLagMillis); + new Thread(replicaInfo, "MongoDocumentStore replica set info provider (" + builder.getClusterId() + ")").start(); + localChanges = new LocalChanges(); + replicaInfo.addListener(localChanges); + // indexes: // the _id field is the primary key, so we don't need to define it @@ -471,7 +494,7 @@ public class MongoDocumentStore implements DocumentStore { boolean isSlaveOk = false; boolean docFound = true; try { - ReadPreference readPreference = getMongoReadPreference(collection, Utils.getParentId(key), docReadPref); + ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref); if(readPreference.isSlaveOk()){ LOG.trace("Routing call to secondary for fetching [{}]", key); @@ -480,17 +503,6 @@ public class MongoDocumentStore implements DocumentStore { DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference); - if (obj == null - && readPreference.isSlaveOk()) { - //In case secondary read preference is used and node is not found - //then check with primary again as it might happen that node document has not been - //replicated. This is required for case like SplitDocument where the SplitDoc is fetched with - //maxCacheAge == Integer.MAX_VALUE which results in readPreference of secondary. - //In such a case we know that document with such an id must exist - //but possibly dut to replication lag it has not reached to secondary. So in that case read again - //from primary - obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, ReadPreference.primary()); - } if(obj == null){ docFound = false; return null; @@ -499,6 +511,7 @@ public class MongoDocumentStore implements DocumentStore { if (doc != null) { doc.seal(); } + updateLatestAccessedRevs(doc); return doc; } finally { stats.doneFindUncached(watch.elapsed(TimeUnit.NANOSECONDS), collection, key, docFound, isSlaveOk); @@ -583,7 +596,7 @@ public class MongoDocumentStore implements DocumentStore { cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); } ReadPreference readPreference = - getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection)); + getMongoReadPreference(collection, parentId, null, getDefaultReadPreference(collection)); if(readPreference.isSlaveOk()){ isSlaveOk = true; @@ -598,6 +611,7 @@ public class MongoDocumentStore implements DocumentStore { for (int i = 0; i < limit && cursor.hasNext(); i++) { DBObject o = cursor.next(); T doc = convertFromDBObject(collection, o); + updateLatestAccessedRevs(doc); list.add(doc); } resultSize = list.size(); @@ -770,6 +784,7 @@ public class MongoDocumentStore implements DocumentStore { if (collection == Collection.NODES) { NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, updateOp); nodesCache.put(newDoc); + localChanges.add(newDoc.getId(), newDoc.getLastRev().values()); } oldDoc.seal(); } else if (upsert) { @@ -777,6 +792,7 @@ public class MongoDocumentStore implements DocumentStore { NodeDocument doc = (NodeDocument) collection.newDocument(this); UpdateUtils.applyChanges(doc, updateOp); nodesCache.putIfAbsent(doc); + localChanges.add(doc.getId(), doc.getLastRev().values()); } } else { // updateOp without conditions and not an upsert @@ -939,6 +955,11 @@ public class MongoDocumentStore implements DocumentStore { docsToCache.add(newDoc); } } + + for (NodeDocument doc : docsToCache) { + localChanges.add(doc.getId(), doc.getLastRev().values()); + } + nodesCache.putNonConflictingDocs(tracker, docsToCache); } oldDocs.keySet().removeAll(bulkResult.failedUpdates); @@ -1104,6 +1125,7 @@ public class MongoDocumentStore implements DocumentStore { if (collection == Collection.NODES) { for (T doc : docs) { nodesCache.putIfAbsent((NodeDocument) doc); + localChanges.add(doc.getId(), ((NodeDocument) doc).getLastRev().values()); } } insertSuccess = true; @@ -1208,7 +1230,7 @@ public class MongoDocumentStore implements DocumentStore { } DocumentReadPreference getReadPreference(int maxCacheAge){ - if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) { + if(maxCacheAge >= 0 && maxCacheAge < replicaInfo.getLag()) { return DocumentReadPreference.PRIMARY; } else if(maxCacheAge == Integer.MAX_VALUE){ return DocumentReadPreference.PREFER_SECONDARY; @@ -1221,9 +1243,10 @@ public class MongoDocumentStore implements DocumentStore { return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY; } - ReadPreference getMongoReadPreference(Collection collection, - String parentId, - DocumentReadPreference preference) { + ReadPreference getMongoReadPreference(@Nonnull Collection collection, + @Nullable String parentId, + @Nullable String documentId, + @Nonnull DocumentReadPreference preference) { switch(preference){ case PRIMARY: return ReadPreference.primary(); @@ -1236,23 +1259,19 @@ public class MongoDocumentStore implements DocumentStore { return ReadPreference.primary(); } - // read from primary unless parent has not been modified - // within replication lag period - ReadPreference readPreference = ReadPreference.primary(); - if (parentId != null) { - long replicationSafeLimit = getTime() - maxReplicationLagMillis; - NodeDocument cachedDoc = nodesCache.getIfPresent(parentId); - // FIXME: this is not quite accurate, because ancestors - // are updated in a background thread (_lastRev). We - // will need to revise this for low maxReplicationLagMillis - // values - if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) { - - //If parent has been modified loooong time back then there children - //would also have not be modified. In that case we can read from secondary - readPreference = getConfiguredReadPreference(collection); - } + boolean secondarySafe = true; + secondarySafe &= collection == Collection.NODES; + secondarySafe &= documentId == null || !localChanges.mayContain(documentId); + secondarySafe &= parentId == null || !localChanges.mayContainChildrenOf(parentId); + secondarySafe &= mostRecentAccessedRevisions == null || replicaInfo.isMoreRecentThan(mostRecentAccessedRevisions); + + ReadPreference readPreference; + if (secondarySafe) { + readPreference = getConfiguredReadPreference(collection); + } else { + readPreference = ReadPreference.primary(); } + return readPreference; default: throw new IllegalArgumentException("Unsupported usage " + preference); @@ -1560,6 +1579,21 @@ public class MongoDocumentStore implements DocumentStore { return diff; } + private synchronized void updateLatestAccessedRevs(T doc) { + if (doc instanceof NodeDocument) { + RevisionVector accessedRevs = new RevisionVector(((NodeDocument) doc).getLastRev().values()); + RevisionVector previousValue = mostRecentAccessedRevisions; + if (mostRecentAccessedRevisions == null) { + mostRecentAccessedRevisions = accessedRevs; + } else { + mostRecentAccessedRevisions = mostRecentAccessedRevisions.pmax(accessedRevs); + } + if (LOG.isDebugEnabled() && !mostRecentAccessedRevisions.equals(previousValue)) { + LOG.debug("Most recent accessed revisions: {}", mostRecentAccessedRevisions); + } + } + } + private static class BulkUpdateResult { private final Set failedUpdates; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java new file mode 100644 index 0000000..b3251bd --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/GetRootRevisionsCallable.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.jackrabbit.oak.plugins.document.Document; +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; +import org.apache.jackrabbit.oak.stats.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; + +public class GetRootRevisionsCallable implements Callable> { + + private static final Logger LOG = LoggerFactory.getLogger(GetRootRevisionsCallable.class); + + private final String hostName; + + private final NodeCollectionProvider nodeCollections; + + private final Clock clock; + + public GetRootRevisionsCallable(Clock clock, String hostName, NodeCollectionProvider nodeCollections) { + this.hostName = hostName; + this.nodeCollections = nodeCollections; + this.clock = clock; + } + + @Override + public Timestamped call() throws Exception { + List revisions = new ArrayList(); + DBCollection collection = nodeCollections.get(hostName); + + long start = clock.getTime(); + DBObject root = collection.findOne(new BasicDBObject(Document.ID, "0:/")); + long end = clock.getTime(); + long mid = (start + end) / 2; + + if (root == null) { + LOG.warn("Can't get the root document on {}", hostName); + return null; + } + + DBObject lastRev = (DBObject) root.get("_lastRev"); + for (String clusterId : lastRev.keySet()) { + String rev = (String) lastRev.get(clusterId); + revisions.add(Revision.fromString(rev)); + } + LOG.debug("Got /_lastRev from {}: {}", hostName, lastRev); + return new Timestamped(new RevisionVector(revisions), mid); + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java new file mode 100644 index 0000000..7113e07 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChanges.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isGreaterOrEquals; + +/** + * This class maintains a list of local changes (paths+revisions), which + * shouldn't be read from the secondary Mongo, as we are not sure if they have + * been already replicated from primary. Once we get this confidence, the entry + * will be removed from the map. + */ +public class LocalChanges implements ReplicaSetInfoListener { + + private static final Logger LOG = LoggerFactory.getLogger(LocalChanges.class); + + /** + * How many paths should be stored in the {@link #localChanges} map. If + * there's more paths added (and not removed in the + * {@link #gotRootRevisions(RevisionVector)}), only the latest changed + * revision will be remembered. + */ + private static final int SIZE_LIMIT = 100; + + /** + * This map contains document paths and revisions in which they have been + * changed. Paths in this collection hasn't been replicated to secondary + * instances yet. + */ + final Map localChanges = new HashMap(); + + /** + * If there's more than {@link #SIZE_LIMIT} paths in the + * {@link #localChanges}, the class will clear the above map and update this + * variable with the latest changed revision. {@code true} will be returned + * for all {@link #mayContainChildrenOf(String)} and {@link #mayContain(String)} + * invocations until this revision is replicated to all secondary instances. + *

+ * This is a safety mechanism, so the {@link #localChanges} won't grow too much. + */ + private volatile RevisionVector latestChange; + + /** + * True if the current Mongo installation is an working replica. Otherwise + * there's no need to store the local changes. + */ + private volatile boolean replicaActive; + + private volatile RevisionVector rootRevision; + + public void add(String id, Collection revs) { + RevisionVector revsV = new RevisionVector(revs); + RevisionVector localRootRev = rootRevision; + if (localRootRev != null && isGreaterOrEquals(localRootRev, revsV)) { + return; + } + + synchronized (this) { + if (latestChange != null && isGreaterOrEquals(latestChange, revsV)) { + return; + } + + if (replicaActive) { + localChanges.put(id, revsV); + if (localChanges.size() >= SIZE_LIMIT) { + localChanges.clear(); + latestChange = revsV; + LOG.debug( + "The local changes count == {}. Clearing the list and switching to the 'latest change' mode: {}", + SIZE_LIMIT, latestChange); + } + } else { + latestChange = revsV; + } + } + } + + /** + * Check if it's possible that the given document hasn't been replicated to + * the secondary yet. + * + * @param documentId + * @return {@code true} if it's possible that the document is still in the + * Mongo replication queue + */ + public boolean mayContain(String documentId) { + if (!replicaActive || latestChange != null) { + return true; + } + + synchronized (this) { + return localChanges.containsKey(documentId); + } + } + + /** + * Check if it's possible that the children of the given document hasn't + * been replicated to the secondary yet. + * + * @param documentId + * @return {@code true} if it's possible that the children of given document + * are still in the Mongo replication queue + */ + public boolean mayContainChildrenOf(String parentId) { + if (!replicaActive || latestChange != null) { + return true; + } + + synchronized (this) { + for (String key : localChanges.keySet()) { + if (parentId.equals(Utils.getParentId(key))) { + return true; + } + } + return false; + } + } + + @Override + public void gotRootRevisions(RevisionVector rootRevision) { + if (rootRevision == null) { + return; + } + + this.rootRevision = rootRevision; + + if (!replicaActive) { + replicaActive = true; + LOG.info("Replica set became active"); + } + + synchronized (this) { + if (latestChange != null && latestChange.compareTo(rootRevision) <= 0) { + latestChange = null; + } + + Iterator it = localChanges.values().iterator(); + while (it.hasNext()) { + if (it.next().compareTo(rootRevision) <= 0) { + it.remove(); + } + } + } + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java new file mode 100644 index 0000000..b4b0f5b --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/NodeCollectionProvider.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import static com.google.common.collect.Sets.difference; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; + +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import org.apache.jackrabbit.oak.plugins.document.Collection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientException; +import com.mongodb.MongoClientURI; + +/** + * This class connects to Mongo instances and returns the NODES collection. + * Connections are cached. + */ +public class NodeCollectionProvider { + + private static final Logger LOG = LoggerFactory.getLogger(NodeCollectionProvider.class); + + private final Map collections = new ConcurrentHashMap(); + + private final String originalMongoUri; + + private final String dbName; + + public NodeCollectionProvider(String originalMongoUri, String dbName) { + this.originalMongoUri = originalMongoUri; + this.dbName = dbName; + } + + public void retain(Set hostNames) { + close(difference(collections.keySet(), hostNames)); + } + + public void close() { + close(collections.keySet()); + } + + private void close(Set hostNames) { + Iterator> it = collections.entrySet().iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + if (hostNames.contains(entry.getKey())) { + try { + entry.getValue().getDB().getMongo().close(); + it.remove(); + } catch (MongoClientException e) { + LOG.error("Can't close Mongo client", e); + } + } + } + } + + @SuppressWarnings("deprecation") + public DBCollection get(String hostname) throws UnknownHostException { + if (collections.containsKey(hostname)) { + return collections.get(hostname); + } + + MongoClient client; + if (originalMongoUri == null) { + MongoClientURI uri = new MongoClientURI("mongodb://" + hostname); + client = new MongoClient(uri); + } else { + client = prepareClientForHostname(hostname); + } + + DB db = client.getDB(dbName); + db.getMongo().slaveOk(); + DBCollection collection = db.getCollection(Collection.NODES.toString()); + collections.put(hostname, collection); + return collection; + } + + private MongoClient prepareClientForHostname(String hostname) throws UnknownHostException { + ServerAddress address; + if (hostname.contains(":")) { + String[] hostSplit = hostname.split(":"); + if (hostSplit.length != 2) { + throw new IllegalArgumentException("Not a valid hostname: " + hostname); + } + address = new ServerAddress(hostSplit[0], Integer.parseInt(hostSplit[1])); + } else { + address = new ServerAddress(hostname); + } + + MongoClientURI originalUri = new MongoClientURI(originalMongoUri); + List credentialList = new ArrayList(1); + if (originalUri.getCredentials() != null) { + credentialList.add(originalUri.getCredentials()); + } + return new MongoClient(address, credentialList, originalUri.getOptions()); + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java new file mode 100644 index 0000000..0b09498 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfo.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import static com.google.common.base.Predicates.in; +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Iterables.isEmpty; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Maps.filterKeys; +import static com.google.common.collect.Sets.union; +import static org.apache.jackrabbit.oak.plugins.document.util.Utils.isGreaterOrEquals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; +import org.apache.jackrabbit.oak.stats.Clock; +import org.bson.BasicBSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.MongoException; +import com.mongodb.ReadPreference; + +/** + * This class analyses the replica set info provided by MongoDB to find out two + * what's the current synchronization state of secondary instances in terms of + * revision values and timestamp. + */ +public class ReplicaSetInfo implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetInfo.class); + + private final DB adminDb; + + private final long pullFrequencyMillis; + + private final long maxReplicationLagMillis; + + private final ExecutorService executors = Executors.newFixedThreadPool(5); + + private final NodeCollectionProvider nodeCollections; + + private final Clock clock; + + private final Object stopMonitor = new Object(); + + private final List listeners = new CopyOnWriteArrayList(); + + private volatile RevisionVector rootRevisions; + + volatile long secondariesSafeTimestamp; + + List hiddenMembers; + + private volatile boolean stop; + + public ReplicaSetInfo(Clock clock, DB db, String originalMongoUri, long pullFrequencyMillis, long maxReplicationLagMillis) { + this.clock = clock; + this.adminDb = db.getSisterDB("admin"); + this.pullFrequencyMillis = pullFrequencyMillis; + this.maxReplicationLagMillis = maxReplicationLagMillis; + this.nodeCollections = new NodeCollectionProvider(originalMongoUri, db.getName()); + } + + public void addListener(ReplicaSetInfoListener listener) { + listeners.add(listener); + } + + public boolean isMoreRecentThan(RevisionVector revisions) { + RevisionVector localRootRevisions = rootRevisions; + if (localRootRevisions == null) { + return false; + } else { + return isGreaterOrEquals(localRootRevisions, revisions); + } + } + + public long getLag() { + long localTS = secondariesSafeTimestamp; + if (localTS == 0) { + return maxReplicationLagMillis; + } else { + return clock.getTime() - localTS; + } + } + + @Nullable + public RevisionVector getMinimumRootRevisions() { + return rootRevisions; + } + + public void stop() { + synchronized (stopMonitor) { + stop = true; + stopMonitor.notify(); + executors.shutdown(); + } + } + + @Override + public void run() { + try { + updateLoop(); + } catch (Exception e) { + LOG.error("Exception in the ReplicaSetInfo thread", e); + } + } + + private void updateLoop() { + while (!stop) { + if (hiddenMembers == null) { + hiddenMembers = getHiddenMembers(); + } else { + updateReplicaStatus(); + + for (ReplicaSetInfoListener listener : listeners) { + listener.gotRootRevisions(rootRevisions); + } + } + + synchronized (stopMonitor) { + try { + if (!stop) { + stopMonitor.wait(pullFrequencyMillis); + } + } catch (InterruptedException e) { + break; + } + } + } + LOG.debug("Stopping the replica set info"); + nodeCollections.close(); + } + + void updateReplicaStatus() { + BasicDBObject result; + try { + result = getReplicaStatus(); + } catch (MongoException e) { + LOG.error("Can't get replica status", e); + rootRevisions = null; + secondariesSafeTimestamp = 0; + return; + } + + @SuppressWarnings("unchecked") + Iterable members = (Iterable) result.get("members"); + if (members == null) { + members = Collections.emptyList(); + } + updateRevisions(members); + } + + List getHiddenMembers() { + BasicDBObject result; + try { + result = getReplicaConfig(); + } catch (MongoException e) { + LOG.error("Can't get replica configuration", e); + return null; + } + + @SuppressWarnings("unchecked") + Iterable members = (Iterable) result.get("members"); + if (members == null) { + members = Collections.emptyList(); + } + + List hiddenMembers = new ArrayList(); + for (BasicBSONObject member : members) { + if (member.getBoolean("hidden")) { + hiddenMembers.add(member.getString("host")); + } + } + return hiddenMembers; + } + + protected BasicDBObject getReplicaConfig() { + return adminDb.command("replSetGetConfig", ReadPreference.primary()); + } + + protected BasicDBObject getReplicaStatus() { + return adminDb.command("replSetGetStatus", ReadPreference.primary()); + } + + private void updateRevisions(Iterable members) { + Set secondaries = new HashSet(); + boolean unknownState = false; + String primary = null; + + for (BasicBSONObject member : members) { + MemberState state; + try { + state = MemberState.valueOf(member.getString("stateStr")); + } catch (IllegalArgumentException e) { + state = MemberState.UNKNOWN; + } + String name = member.getString("name"); + if (hiddenMembers.contains(name)) { + continue; + } + + switch (state) { + case PRIMARY: + primary = name; + continue; + + case SECONDARY: + secondaries.add(name); + break; + + case ARBITER: + continue; + + default: + LOG.debug("Invalid state {} for instance {}", state, name); + unknownState = true; + break; + } + } + + if (secondaries.isEmpty()) { + LOG.debug("No secondaries found: {}", members); + unknownState = true; + } + + if (primary == null) { + LOG.debug("No primary found: {}", members); + unknownState = true; + } + + Map> vectors = null; + if (!unknownState) { + vectors = getRootRevisions(union(secondaries, of(primary))); + if (vectors.containsValue(null)) { + unknownState = true; + } + } + + if (unknownState) { + rootRevisions = null; + secondariesSafeTimestamp = 0; + } else { + Timestamped primaryRevision = vectors.get(primary); + Iterable> secondaryRevisions = filterKeys(vectors, in(secondaries)).values(); + + rootRevisions = pmin(transform(secondaryRevisions, Timestamped.getExtractFunction())); + if (rootRevisions == null || primaryRevision == null || isEmpty(secondaryRevisions)) { + secondariesSafeTimestamp = 0; + } else { + secondariesSafeTimestamp = getSecondariesSafeTimestamp(primaryRevision, secondaryRevisions); + } + } + + LOG.debug("Minimum root revisions: {}. Current lag: {}", rootRevisions, getLag()); + nodeCollections.retain(secondaries); + } + + /** + * Find the oldest revision which hasn't been replicated from primary to + * secondary yet and return its timestamp. If all revisions has been already + * replicated, return the date of the measurement. + * + * @return the point in time to which the secondary instances has been synchronized + */ + private long getSecondariesSafeTimestamp(Timestamped primary, Iterable> secondaries) { + final RevisionVector priRev = primary.getValue(); + Long oldestNotReplicated = null; + for (Timestamped v : secondaries) { + RevisionVector secRev = v.getValue(); + if (secRev.equals(priRev)) { + continue; + } + + for (Revision pr : priRev) { + Revision sr = secRev.getRevision(pr.getClusterId()); + if (pr.equals(sr)) { + continue; + } + if (oldestNotReplicated == null || oldestNotReplicated > pr.getTimestamp()) { + oldestNotReplicated = pr.getTimestamp(); + } + } + } + + if (oldestNotReplicated == null) { + long minOpTimestamp = primary.getOperationTimestamp(); + for (Timestamped v : secondaries) { + if (v.getOperationTimestamp() < minOpTimestamp) { + minOpTimestamp = v.getOperationTimestamp(); + } + } + return minOpTimestamp; + } else { + return oldestNotReplicated; + } + } + + protected Map> getRootRevisions(Iterable hosts) { + Map>> futures = new HashMap>>(); + for (final String hostName : hosts) { + futures.put(hostName, executors.submit(new GetRootRevisionsCallable(clock, hostName, nodeCollections))); + } + + Map> result = new HashMap>(); + for (Entry>> entry : futures.entrySet()) { + try { + result.put(entry.getKey(), entry.getValue().get()); + } catch (Exception e) { + LOG.error("Can't connect to the Mongo instance", e); + } + } + return result; + } + + private static RevisionVector pmin(Iterable vectors) { + RevisionVector minimum = null; + for (RevisionVector v : vectors) { + if (v == null) { + return null; + } else if (minimum == null) { + minimum = v; + } else { + minimum = minimum.pmin(v); + } + } + return minimum; + } + + enum MemberState { + STARTUP, PRIMARY, SECONDARY, RECOVERING, STARTUP2, UNKNOWN, ARBITER, DOWN, ROLLBACK, REMOVED + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java new file mode 100644 index 0000000..98764a6 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; + +/** + * Classes implementing this interface will be informed about the current root + * revision states on secondary instances by {@link ReplicaSetInfo}. + */ +public interface ReplicaSetInfoListener { + + void gotRootRevisions(RevisionVector rootRevision); + +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java new file mode 100644 index 0000000..8153829 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/Timestamped.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import com.google.common.base.Function; + +/** + * A value with a timestamp. + * + * @param the value type + */ +public class Timestamped { + + private final T value; + + private final long operationTimestamp; + + public Timestamped(T value, long operationTimestamp) { + this.value = value; + this.operationTimestamp = operationTimestamp; + } + + public T getValue() { + return value; + } + + public long getOperationTimestamp() { + return operationTimestamp; + } + + public static Function, T> getExtractFunction() { + return new Function, T>() { + @Override + public T apply(Timestamped input) { + if (input == null) { + return null; + } else { + return input.value; + } + } + }; + } + + @Override + public String toString() { + return new StringBuilder("Timestamped[").append(value).append('(').append(operationTimestamp).append(")]") + .toString(); + } +} \ No newline at end of file diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java index 67fa3c1..250e1cb 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java @@ -759,6 +759,21 @@ public class Utils { } /** + * Returns true if all the revisions in the {@code a} greater or equals + * to their counterparts in {@code b}. If {@code b} contains revisions + * for cluster nodes that are not present in {@code a}, return false. + * + * @param a + * @param b + * @return true if all the revisions in the {@code a} are at least + * as recent as their counterparts in the {@code b} + */ + public static boolean isGreaterOrEquals(@Nonnull RevisionVector a, + @Nonnull RevisionVector b) { + return a.pmax(b).equals(a); + } + + /** * Wraps the given iterable and aborts iteration over elements when the * predicate on an element evaluates to {@code false}. * diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java index 7841107..65af0f8 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java @@ -54,7 +54,6 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { replicationLag = TimeUnit.SECONDS.toMillis(10); mongoConnection = connectionFactory.getConnection(); mk = new DocumentMK.Builder() - .setMaxReplicationLag(replicationLag, TimeUnit.MILLISECONDS) .setMongoDB(mongoConnection.getDB()) .setClusterId(1) .setLeaseCheck(false) @@ -83,27 +82,27 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { @Test public void testMongoReadPreferencesDefault() throws Exception{ assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY)); + mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PRIMARY)); assertEquals(ReadPreference.primaryPreferred(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY)); + mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_PRIMARY)); //By default Mongo read preference is primary assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY)); + mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY)); //Change the default and assert again mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary()); assertEquals(ReadPreference.secondary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY)); + mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY)); //for case where parent age cannot be determined the preference should be primary assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); //For collection other than NODES always primary assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(SETTINGS,"foo", null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); } @@ -124,7 +123,7 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { //For modifiedTime < replicationLag primary must be used assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(NODES,parentId, null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); //Going into future to make parent /x old enough clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag); @@ -132,7 +131,7 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { //For old modified nodes secondaries should be preferred assertEquals(testPref, - mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(NODES, parentId, null, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); } @Test diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java new file mode 100644 index 0000000..6068944 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/LocalChangesTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; +import org.junit.Test; + +public class LocalChangesTest { + + @Test + public void testReplicaInactive() { + LocalChanges l = new LocalChanges(); + assertTrue(l.mayContain("2:/xyz/123")); + assertTrue(l.mayContainChildrenOf("2:/xyz/123")); + + l.add("2:/xyz/123", revs(2, 2, 2)); // don't remember the path, only the (2,2,2) revision + l.gotRootRevisions(revsV(1, 1, 1)); + assertTrue(l.mayContain("2:/xyz/123")); + assertTrue(l.mayContain("2:/abc/567")); // we only remembered the timestamp, not the path + + l.gotRootRevisions(revsV(3, 3, 3)); // the new revision >= the remembered (2,2,2) + assertFalse(l.mayContain("2:/xyz/123")); + assertFalse(l.mayContain("2:/abc/567")); + } + + @Test + public void testMayContain() { + LocalChanges l = new LocalChanges(); + l.add("2:/xyz/123", revs(2, 2, 2)); + + l.gotRootRevisions(revsV(1, 1, 1)); + assertTrue(l.mayContain("2:/xyz/123")); + assertTrue(l.mayContainChildrenOf("1:/xyz")); + + l.gotRootRevisions(revsV(2, 2, 2)); + assertFalse(l.mayContain("2:/xyz/123")); + assertFalse(l.mayContainChildrenOf("1:/xyz")); + } + + @Test + public void testGotRootRevisions() { + LocalChanges l = new LocalChanges(); + l.add("2:/xyz/123", revs(2, 3, 4)); + + l.gotRootRevisions(revsV(1, 1, 1)); + assertTrue(l.mayContain("2:/xyz/123")); + + l.gotRootRevisions(revsV(2, 2, 2)); + assertTrue(l.mayContain("2:/xyz/123")); + + l.gotRootRevisions(revsV(2, 3, 3)); + assertTrue(l.mayContain("2:/xyz/123")); + + l.gotRootRevisions(revsV(2, 3, 4)); + assertFalse(l.mayContain("2:/xyz/123")); + } + + @Test + public void testLimit() { + LocalChanges l = new LocalChanges(); + l.gotRootRevisions(revsV(1)); // make the class active + + for (int i = 1; i <= 99; i++) { + l.add("2:/xyz/" + i, revs(i + 100)); + assertTrue(l.mayContain("2:/xyz/" + i)); + assertFalse(l.mayContain("2:/abc/" + i)); + } + l.add("2:/xyz/100", revs(200)); // the list should be cleared right now + l.add("2:/abc/123", revs(300)); // this is added to the new list + l.add("2:/abc/456", revs(100)); // this shouldn't be added to the new list (as it's old) + + // now the list should be cleared and we should got true for all documents + assertTrue(l.mayContain("2:/abc/999")); + assertEquals(singleton("2:/abc/123"), l.localChanges.keySet()); + + l.gotRootRevisions(revsV(200)); // invalidate + assertFalse(l.mayContain("2:/xyz/0")); + assertFalse(l.mayContain("2:/xyz/99")); + + assertTrue(l.mayContain("2:/abc/123")); + assertFalse(l.mayContain("2:/abc/456")); + } + + @Test + public void dontAddOldRevisions() { + LocalChanges l = new LocalChanges(); + l.gotRootRevisions(revsV(10)); + l.add("2:/xyz/1", revs(5)); + assertFalse(l.mayContain("2:/xyz/1")); + } + + private Collection revs(int... timestamps) { + List revs = new ArrayList(); + for (int i = 0; i < timestamps.length; i++) { + revs.add(new Revision(timestamps[i], 0, i, false)); + } + return revs; + } + + private RevisionVector revsV(int... timestamps) { + return new RevisionVector(revs(timestamps)); + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java new file mode 100644 index 0000000..c5ab6f6 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/replica/ReplicaSetInfoTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo.replica; + +import static com.google.common.collect.Maps.transformValues; +import static org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState.PRIMARY; +import static org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState.RECOVERING; +import static org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState.SECONDARY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.RevisionVector; +import org.apache.jackrabbit.oak.plugins.document.mongo.replica.ReplicaSetInfo.MemberState; +import org.apache.jackrabbit.oak.stats.Clock; +import org.bson.BasicBSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.base.Function; +import com.mongodb.BasicDBObject; +import com.mongodb.DB; + +public class ReplicaSetInfoTest { + + private ReplicaSetInfo replica; + + private ReplicationSetStatusMock replicationSet; + + private Clock.Virtual clock; + + private Clock.Virtual mongoClock; + + @Before + public void resetEstimator() { + clock = mongoClock = new Clock.Virtual(); + + DB db = mock(DB.class); + when(db.getName()).thenReturn("oak-db"); + when(db.getSisterDB(Mockito.anyString())).thenReturn(db); + replica = new ReplicaSetInfo(clock, db, null, 0l, 0l) { + @Override + protected BasicDBObject getReplicaStatus() { + BasicDBObject obj = new BasicDBObject(); + obj.put("date", mongoClock.getDate()); + obj.put("members", replicationSet.members); + return obj; + } + + @Override + protected Map> getRootRevisions(Iterable hosts) { + return transformValues(replicationSet.memberRevisions, + new Function>() { + @Override + public Timestamped apply(RevisionBuilder input) { + return new Timestamped(input.revs, clock.getTime()); + } + }); + } + }; + replica.hiddenMembers = Collections.emptyList(); + } + + @Test + public void testMinimumRevision() { + addInstance(PRIMARY, "mp").addRevisions(20, 18, 19); + addInstance(SECONDARY, "m1").addRevisions(20, 18, 3); + addInstance(SECONDARY, "m2").addRevisions(20, 1, 17); + updateRevisions(); + + assertEquals(20, replica.getMinimumRootRevisions().getRevision(0).getTimestamp()); + assertEquals( 1, replica.getMinimumRootRevisions().getRevision(1).getTimestamp()); + assertEquals( 3, replica.getMinimumRootRevisions().getRevision(2).getTimestamp()); + } + + @Test + public void testIsMoreRecentThan() { + addInstance(PRIMARY, "mp").addRevisions(15, 21, 22); + addInstance(SECONDARY, "m1").addRevisions(10, 21, 11); + addInstance(SECONDARY, "m2").addRevisions(15, 14, 13); + addInstance(SECONDARY, "m3").addRevisions(14, 13, 22); + updateRevisions(); + + assertTrue(replica.isMoreRecentThan(lastRev(9, 13, 10))); + assertFalse(replica.isMoreRecentThan(lastRev(11, 14, 10))); + } + + @Test + public void testUnknownStateIsNotSafe() { + addInstance(PRIMARY, "mp"); + addInstance(SECONDARY, "m1").addRevisions(10, 21, 11); + addInstance(RECOVERING, "m2"); + updateRevisions(); + + assertNull(replica.getMinimumRootRevisions()); + assertFalse(replica.isMoreRecentThan(lastRev(1, 1, 1))); + } + + @Test + public void testEmptyIsNotSafe() { + addInstance(PRIMARY, "m1"); + updateRevisions(); + + assertNull(replica.getMinimumRootRevisions()); + assertFalse(replica.isMoreRecentThan(lastRev(1, 1, 1))); + } + + @Test + public void testOldestNotReplicated() { + addInstance(PRIMARY, "mp").addRevisions(10, 30, 30); + addInstance(SECONDARY, "m1").addRevisions(10, 5, 30); + addInstance(SECONDARY, "m2").addRevisions(2, 30, 30); + updateRevisions(); + + assertEquals(10, replica.secondariesSafeTimestamp); + } + + @Test + public void testAllSecondariesUpToDate() { + addInstance(PRIMARY, "mp").addRevisions(10, 30, 30); + addInstance(SECONDARY, "m1").addRevisions(10, 30, 30); + addInstance(SECONDARY, "m2").addRevisions(10, 30, 30); + + long before = clock.getTime(); + updateRevisions(); + long after = clock.getTime(); + + assertBetween(before, after, replica.secondariesSafeTimestamp); + } + + @Test + public void testAllSecondariesUpToDateWithTimediff() { + addInstance(PRIMARY, "mp").addRevisions(10, 30, 30); + addInstance(SECONDARY, "m1").addRevisions(10, 30, 30); + addInstance(SECONDARY, "m2").addRevisions(10, 30, 30); + + mongoClock = new Clock.Virtual(); + mongoClock.waitUntil(100); + + long before = clock.getTime(); + updateRevisions(); + long after = clock.getTime(); + + assertBetween(before, after, replica.secondariesSafeTimestamp); + } + + private RevisionBuilder addInstance(MemberState state, String name) { + if (replicationSet == null) { + replicationSet = new ReplicationSetStatusMock(); + } + return replicationSet.addInstance(state, name); + } + + private void updateRevisions() { + replica.updateReplicaStatus(); + replicationSet = null; + } + + private static RevisionVector lastRev(int... timestamps) { + return new RevisionBuilder().addRevisions(timestamps).revs; + } + + private static void assertBetween(long from, long to, long actual) { + final String msg = String.format("%d <= %d <= %d", from, actual, to); + assertTrue(msg, from <= actual); + assertTrue(msg, actual <= to); + } + + private class ReplicationSetStatusMock { + + private List members = new ArrayList(); + + private Map memberRevisions = new HashMap(); + + private RevisionBuilder addInstance(MemberState state, String name) { + BasicBSONObject member = new BasicBSONObject(); + member.put("stateStr", state.name()); + member.put("name", name); + members.add(member); + + RevisionBuilder builder = new RevisionBuilder(); + memberRevisions.put(name, builder); + return builder; + } + } + + private static class RevisionBuilder { + + private RevisionVector revs = new RevisionVector(); + + private RevisionBuilder addRevisions(int... timestamps) { + for (int i = 0; i < timestamps.length; i++) { + addRevision(timestamps[i], 0, i, false); + } + return this; + } + + private RevisionBuilder addRevision(int timestamp, int counter, int clusterId, boolean branch) { + Revision rev = new Revision(timestamp, counter, clusterId, branch); + revs = revs.update(rev); + return this; + } + } +} \ No newline at end of file