diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java index 6e7e940..3f2b58d 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java @@ -35,10 +35,13 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.google.common.base.Charsets; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.hash.BloomFilter; +import com.google.common.hash.Funnels; /** * Contains commit information about a branch and its base revision. @@ -61,6 +64,11 @@ class Branch { private final BranchReference ref; /** + * Bloom filter containing paths affected by revisions in this branch. + */ + private final BloomFilter affectedPaths; + + /** * Create a new branch instance with an initial set of commits and a given * base revision. The life time of this branch can be controlled with * the {@code guard} parameter. Once the {@code guard} object becomes weakly @@ -91,6 +99,7 @@ class Branch { } else { this.ref = null; } + this.affectedPaths = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1000); // about 100kB } /** @@ -259,6 +268,16 @@ class Branch { return checkNotNull(rev).equals(commits.lastKey()); } + void addAffectedPaths(Iterable paths) { + for (String path : paths) { + affectedPaths.put(path); + } + } + + boolean mightBeAffectedPath(String path) { + return affectedPaths.mightContain(path); + } + /** * Returns the modified paths since the base revision of this branch until * the given branch revision {@code r} (inclusive). diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java index fc7cd5a..19e4fac 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java @@ -203,6 +203,9 @@ public class Commit { } else { b.addCommit(rev); } + b.addAffectedPaths(addedNodes); + b.addAffectedPaths(modifiedNodes); + b.addAffectedPaths(removedNodes); try { // prepare commit prepare(baseRev); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java index 63e6433..129fa75 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java @@ -488,6 +488,7 @@ public class DocumentMK { public static final int DEFAULT_CACHE_STACK_MOVE_DISTANCE = 16; private DocumentNodeStore nodeStore; private DocumentStore documentStore; + private String mongoSecondaryCredentials; private DiffCache diffCache; private BlobStore blobStore; private int clusterId = Integer.getInteger("oak.documentMK.clusterId", 0); @@ -581,6 +582,31 @@ public class DocumentMK { } /** + * Set credentials to the secondary instances in the Mongo replica set. + * + * @param credentials in the format: {@code username:password} + * @return this + * @throws IllegalStateException if the setMongoDB() has been already invoked. + */ + public Builder setMongoSecondaryCredentials(String credentials) { + if (documentStore != null) { + throw new IllegalStateException("This method must be invoked before setMongoDB()"); + } + this.mongoSecondaryCredentials = credentials; + return this; + } + + /** + * Returns the credentials for the secondary instances in the Mongo replica set. + * + * @return Credentials in the format: {@code username:password} or null if credentials + * hasn't been set. + */ + public String getMongoSecondaryCredentials() { + return mongoSecondaryCredentials; + } + + /** * Sets a {@link DataSource} to use for the RDB document and blob * stores. * diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 3192f88..bd281a6 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -459,6 +459,9 @@ public final class DocumentNodeStore } }; + if (store instanceof UnmergedBranchesAware) { + ((UnmergedBranchesAware) store).setUnmergedBranches(branches); + } //TODO Make stats collection configurable as it add slight overhead nodeCache = builder.buildNodeCache(this); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index 3169381..8ed583f 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -219,6 +219,11 @@ public class DocumentNodeStoreService { ) private static final String PROP_JOURNAL_GC_MAX_AGE_MILLIS = "journalGCMaxAge"; + @Property(label = "Secondary Mongo instance credentials", + description = "Credentials to the secondary instances in the Mongo replica set" + ) + private static final String PROP_SECONDARY_CREDENTIALS = "secondaryCredentials"; + private static final long MB = 1024 * 1024; private static enum DocumentStoreType { @@ -426,6 +431,8 @@ public class DocumentNodeStoreService { log.info("Mongo Connection details {}", MongoConnection.toString(mongoURI.getOptions())); } + String mongoSecondaryCredentials = PropertiesUtil.toString(prop(PROP_SECONDARY_CREDENTIALS), null); + mkBuilder.setMongoSecondaryCredentials(mongoSecondaryCredentials); mkBuilder.setMaxReplicationLag(maxReplicationLagInSecs, TimeUnit.SECONDS); mkBuilder.setMongoDB(uri, db, blobCacheSize); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java index 86eefcf..aa3dfdf 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java @@ -439,7 +439,7 @@ public class Revision { */ private final int currentClusterNodeId; - RevisionComparator(int currentClusterNodId) { + public RevisionComparator(int currentClusterNodId) { this.currentClusterNodeId = currentClusterNodId; } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranches.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranches.java index 1464b59..27fff30 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranches.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranches.java @@ -41,7 +41,7 @@ import static com.google.common.base.Preconditions.checkNotNull; * UnmergedBranches contains all un-merged branches of a DocumentMK * instance. */ -class UnmergedBranches { +public class UnmergedBranches { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -191,4 +191,19 @@ class UnmergedBranches { } return null; } + + /** + * Checks if there's a chance that this branch revisions affect the path. + * + * @param path + * @return {@code true} if this branch may affect the given path, {@code false} otherwise + */ + public boolean mightAffectPath(String path) { + for (Branch branch : branches) { + if (branch.mightBeAffectedPath(path)) { + return true; + } + } + return false; + } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranchesAware.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranchesAware.java new file mode 100644 index 0000000..10cb821 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnmergedBranchesAware.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +public interface UnmergedBranchesAware extends DocumentStore { + + void setUnmergedBranches(UnmergedBranches unmergedBranches); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCredentialProvider.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCredentialProvider.java new file mode 100644 index 0000000..d96f77b --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCredentialProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import com.mongodb.MongoCredential; + +public interface MongoCredentialProvider { + + MongoCredential getCredential(String hostName); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index e6cf30a..8a0fa6e 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -57,6 +57,8 @@ import org.apache.jackrabbit.oak.plugins.document.JournalEntry; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.Revision; import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator; +import org.apache.jackrabbit.oak.plugins.document.UnmergedBranches; +import org.apache.jackrabbit.oak.plugins.document.UnmergedBranchesAware; import org.apache.jackrabbit.oak.plugins.document.UpdateOp; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; @@ -87,7 +89,7 @@ import static com.google.common.base.Preconditions.checkArgument; /** * A document store that uses MongoDB as the backend. */ -public class MongoDocumentStore implements DocumentStore { +public class MongoDocumentStore implements DocumentStore, UnmergedBranchesAware { private static final Logger LOG = LoggerFactory.getLogger(MongoDocumentStore.class); private static final PerfLogger PERFLOG = new PerfLogger( @@ -118,7 +120,9 @@ public class MongoDocumentStore implements DocumentStore { private Clock clock = Clock.SIMPLE; - private final long maxReplicationLagMillis; + private UnmergedBranches unmergedBranches; + + private final ReplicaSetInfo replicaInfo; /** * Duration in seconds under which queries would use index on _modified field @@ -159,6 +163,15 @@ public class MongoDocumentStore implements DocumentStore { private long maxLockedQueryTimeMS = Long.getLong("oak.mongo.maxLockedQueryTimeMS", TimeUnit.SECONDS.toMillis(3)); + /** + * How often in milliseconds the MongoDocumentStore should estimate the + * replication lag. + *

+ * Default is 60'000 (one minute). + */ + private long estimationPullFrequencyMS = + Long.getLong("oak.mongo.estimationPullFrequencyMS", TimeUnit.SECONDS.toMillis(60)); + private String lastReadWriteMode; private final Map metadata; @@ -176,7 +189,9 @@ public class MongoDocumentStore implements DocumentStore { settings = db.getCollection(Collection.SETTINGS.toString()); journal = db.getCollection(Collection.JOURNAL.toString()); - maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); + long maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); + replicaInfo = new ReplicaSetInfo(db, builder.getMongoSecondaryCredentials(), builder.getClusterId(), maxReplicationLagMillis, estimationPullFrequencyMS); + new Thread(replicaInfo, "MongoDocumentStore replica set info provider (" + builder.getClusterId() + ")").start(); // indexes: // the _id field is the primary key, so we don't need to define it @@ -453,7 +468,7 @@ public class MongoDocumentStore implements DocumentStore { final long start = PERFLOG.start(); boolean isSlaveOk = false; try { - ReadPreference readPreference = getMongoReadPreference(collection, Utils.getParentId(key), docReadPref); + ReadPreference readPreference = getMongoReadPreference(collection, key, Utils.getParentId(key), docReadPref); if(readPreference.isSlaveOk()){ LOG.trace("Routing call to secondary for fetching [{}]", key); @@ -579,7 +594,7 @@ public class MongoDocumentStore implements DocumentStore { cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); } ReadPreference readPreference = - getMongoReadPreference(collection, parentId, getDefaultReadPreference(collection)); + getMongoReadPreference(collection, null, parentId, getDefaultReadPreference(collection)); if(readPreference.isSlaveOk()){ LOG.trace("Routing call to secondary for fetching children from [{}] to [{}]", fromKey, toKey); @@ -924,7 +939,7 @@ public class MongoDocumentStore implements DocumentStore { } DocumentReadPreference getReadPreference(int maxCacheAge){ - if(maxCacheAge >= 0 && maxCacheAge < maxReplicationLagMillis) { + if(maxCacheAge >= 0 && !replicaInfo.isSecondarySafe(maxCacheAge, getTime())) { return DocumentReadPreference.PRIMARY; } else if(maxCacheAge == Integer.MAX_VALUE){ return DocumentReadPreference.PREFER_SECONDARY; @@ -938,6 +953,7 @@ public class MongoDocumentStore implements DocumentStore { } ReadPreference getMongoReadPreference(Collection collection, + String documentId, String parentId, DocumentReadPreference preference) { switch(preference){ @@ -955,17 +971,9 @@ public class MongoDocumentStore implements DocumentStore { // read from primary unless parent has not been modified // within replication lag period ReadPreference readPreference = ReadPreference.primary(); - if (parentId != null) { - long replicationSafeLimit = getTime() - maxReplicationLagMillis; + if (!belongsToBranch(documentId) && parentId != null) { NodeDocument cachedDoc = nodesCache.getIfPresent(parentId); - // FIXME: this is not quite accurate, because ancestors - // are updated in a background thread (_lastRev). We - // will need to revise this for low maxReplicationLagMillis - // values - if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) { - - //If parent has been modified loooong time back then there children - //would also have not be modified. In that case we can read from secondary + if (replicaInfo.isSecondarySafe(cachedDoc, getTime())) { readPreference = getConfiguredReadPreference(collection); } } @@ -1040,6 +1048,7 @@ public class MongoDocumentStore implements DocumentStore { @Override public void dispose() { + replicaInfo.stop(); nodes.getDB().getMongo().close(); try { nodesCache.close(); @@ -1264,6 +1273,25 @@ public class MongoDocumentStore implements DocumentStore { return diff; } + /** + * @param documentId + * @return {@code true} if it's possible that this document belongs to a branch + */ + boolean belongsToBranch(@CheckForNull String documentId) { + if (unmergedBranches == null) { + return true; + } + if (documentId == null) { + return false; + } + return unmergedBranches.mightAffectPath(Utils.getPathFromId(documentId)); + } + + @Override + public void setUnmergedBranches(UnmergedBranches unmergedBranches) { + this.unmergedBranches = unmergedBranches; + } + private static class InvalidationResult implements CacheInvalidationStats { int invalidationCount; int upToDateCount; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReplicaSetInfo.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReplicaSetInfo.java new file mode 100644 index 0000000..9bb52ed --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReplicaSetInfo.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import static com.google.common.collect.Sets.difference; +import static java.lang.Math.min; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.jackrabbit.oak.plugins.document.Collection; +import org.apache.jackrabbit.oak.plugins.document.Document; +import org.apache.jackrabbit.oak.plugins.document.NodeDocument; +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.Revision.RevisionComparator; +import org.bson.BasicBSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.BasicDBObject; +import com.mongodb.CommandResult; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.MongoClientException; +import com.mongodb.MongoClientURI; +import com.mongodb.ReadPreference; + +public class ReplicaSetInfo implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicaSetInfo.class); + + private final Map collections = new HashMap(); + + private final DB adminDb; + + private final String dbName; + + private final long pullFrequencyMillis; + + private final long maxReplicationLagMillis; + + private final RevisionComparator comparator; + + private final String credentials; + + Map rootRevisions; + + Long minRootTimestamp; + + private volatile boolean stop; + + private final Object stopMonitor = new Object(); + + public ReplicaSetInfo(DB db, String credentials, int localClusterId, long maxReplicationLagMillis, long pullFrequencyMillis) { + this.adminDb = db.getSisterDB("admin"); + this.dbName = db.getName(); + this.pullFrequencyMillis = pullFrequencyMillis; + this.maxReplicationLagMillis = maxReplicationLagMillis; + this.comparator = new RevisionComparator(localClusterId); + this.credentials = credentials; + } + + public synchronized boolean isSecondarySafe(long maxDocumentAge, long currentTime) { + if (minRootTimestamp == null) { + return maxReplicationLagMillis <= maxDocumentAge; + } else if (currentTime < minRootTimestamp) { + return false; + } else { + return min(currentTime - minRootTimestamp, maxReplicationLagMillis) <= maxDocumentAge; + } + } + + public boolean isSecondarySafe(NodeDocument doc, long currentTime) { + if (doc == null) { + return false; + } else if (!doc.hasBeenModifiedSince(currentTime - maxReplicationLagMillis)) { + // If parent has been modified loooong time back then there children + // would also have not be modified. In that case we can read from secondary. + return true; + } else { + return isSecondarySafe(doc.getLastRev()); + } + } + + boolean isSecondarySafe(Map lastRev) { + Map revisions; + synchronized (this) { + revisions = rootRevisions; + } + if (revisions == null) { + return false; + } + + boolean result = true; + for (Revision r : lastRev.values()) { + int clusterId = r.getClusterId(); + if (r.isBranch()) { + result = false; + break; + } else if (!revisions.containsKey(clusterId)) { + result = false; + break; + } else if (comparator.compare(r, revisions.get(clusterId)) > 0) { + result = false; + break; + } + } + return result; + } + + @Nullable + public synchronized Map getMinimumRootRevisions() { + return rootRevisions; + } + + public void stop() { + synchronized (stopMonitor) { + stop = true; + stopMonitor.notify(); + } + } + + @SuppressWarnings("unchecked") + @Override + public void run() { + try { + while (!stop) { + CommandResult result = adminDb.command("replSetGetStatus", ReadPreference.primary()); + Iterable members = (Iterable) result.get("members"); + if (members == null) { + members = Collections.emptyList(); + } + updateRevisions(members); + synchronized (stopMonitor) { + try { + if (!stop) { + stopMonitor.wait(pullFrequencyMillis); + } + } catch (InterruptedException e) { + break; + } + } + } + LOG.debug("Stopping the replica set info"); + closeConnections(collections.keySet()); + collections.clear(); + } catch (Exception e) { + LOG.error("Exception in the ReplicaSetInfo thread", e); + } + } + + void updateRevisions(Iterable members) { + Set secondaries = new HashSet(); + boolean unknownState = false; + for (BasicBSONObject member : members) { + ReplicaSetMemberState state; + try { + state = ReplicaSetMemberState.valueOf(member.getString("stateStr")); + } catch (IllegalArgumentException e) { + state = ReplicaSetMemberState.UNKNOWN; + } + String name = member.getString("name"); + + switch (state) { + case PRIMARY: + continue; + + case SECONDARY: + secondaries.add(name); + break; + + case ARBITER: + continue; + + default: + LOG.debug("Invalid state {} for instance {}", state, name); + unknownState = true; + break; + } + } + + if (secondaries.isEmpty()) { + LOG.debug("No secondaries found"); + unknownState = true; + } + + if (!unknownState) { + Map minRevisions = getMinimumRootRevisions(secondaries); + if (minRevisions == null) { + unknownState = true; + } else { + Long minTimestamp = null; + for (Revision r : minRevisions.values()) { + long timestamp = r.getTimestamp(); + if (minTimestamp == null || minTimestamp > timestamp) { + minTimestamp = timestamp; + } + } + synchronized (this) { + rootRevisions = minRevisions; + minRootTimestamp = minTimestamp; + } + LOG.debug("Minimum revisions: {}", minRevisions); + LOG.debug("Minimum root timestamp: {}", minTimestamp); + } + } + if (unknownState) { + synchronized (this) { + rootRevisions = null; + minRootTimestamp = null; + } + } + + closeConnections(difference(collections.keySet(), secondaries)); + } + + private Map getMinimumRootRevisions(Set secondaries) { + Map minimumRevisions = new HashMap(); + for (String name : secondaries) { + try { + for (Revision r : getRootRevisions(name)) { + int clusterId = r.getClusterId(); + if (minimumRevisions.containsKey(clusterId)) { + if (comparator.compare(r, minimumRevisions.get(clusterId)) < 0) { + minimumRevisions.put(clusterId, r); + } + } else { + minimumRevisions.put(clusterId, r); + } + } + } catch (UnknownHostException e) { + LOG.error("Can't connect to {}", name, e); + return null; + } + } + return minimumRevisions; + } + + protected List getRootRevisions(String hostName) throws UnknownHostException { + List revisions = new ArrayList(); + DBCollection collection = getNodeCollection(hostName); + DBObject root = collection.findOne(new BasicDBObject(Document.ID, "0:/")); + DBObject lastRev = (DBObject) root.get("_lastRev"); + for (String clusterId : lastRev.keySet()) { + String rev = (String) lastRev.get(clusterId); + revisions.add(Revision.fromString(rev)); + } + LOG.debug("Got /_lastRev from {}: {}", hostName, lastRev); + return revisions; + } + + private void closeConnections(Set hostNames) { + Iterator> it = collections.entrySet().iterator(); + while (it.hasNext()) { + Entry entry = it.next(); + if (hostNames.contains(entry.getKey())) { + try { + entry.getValue().getDB().getMongo().close(); + it.remove(); + } catch (MongoClientException e) { + LOG.error("Can't close Mongo client", e); + } + } + } + } + + @SuppressWarnings("deprecation") + private DBCollection getNodeCollection(String hostName) throws UnknownHostException { + if (collections.containsKey(hostName)) { + return collections.get(hostName); + } + + StringBuilder uriBuilder = new StringBuilder("mongodb://"); + if (credentials != null) { + uriBuilder.append(credentials).append('@'); + } + uriBuilder.append(hostName); + + MongoClientURI uri = new MongoClientURI(uriBuilder.toString()); + MongoClient client = new MongoClient(uri); + + DB db = client.getDB(dbName); + db.getMongo().slaveOk(); + DBCollection collection = db.getCollection(Collection.NODES.toString()); + collections.put(hostName, collection); + return collection; + } + + enum ReplicaSetMemberState { + STARTUP, PRIMARY, SECONDARY, RECOVERING, STARTUP2, UNKNOWN, ARBITER, DOWN, ROLLBACK, REMOVED + } + +} \ No newline at end of file diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java index 7841107..fc1cae4 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java @@ -26,6 +26,10 @@ import com.mongodb.ReadPreference; import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.Revision; + +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore.DocumentReadPreference; + import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; @@ -36,12 +40,14 @@ import org.junit.Test; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.Collection.SETTINGS; -import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore.DocumentReadPreference; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ReadPreferenceIT extends AbstractMongoConnectionTest { + private static final int BRANCH_LIMIT = 10000; + private MongoDocumentStore mongoDS; private Clock clock; @@ -83,27 +89,27 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { @Test public void testMongoReadPreferencesDefault() throws Exception{ assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY)); + mongoDS.getMongoReadPreference(NODES, "/foo", "/foo/bar", DocumentReadPreference.PRIMARY)); assertEquals(ReadPreference.primaryPreferred(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY)); + mongoDS.getMongoReadPreference(NODES, "/foo", "/foo/bar", DocumentReadPreference.PREFER_PRIMARY)); //By default Mongo read preference is primary assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY)); + mongoDS.getMongoReadPreference(NODES, "/foo", "/foo/bar", DocumentReadPreference.PREFER_SECONDARY)); //Change the default and assert again mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary()); assertEquals(ReadPreference.secondary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY)); + mongoDS.getMongoReadPreference(NODES, "/foo", "/foo/bar", DocumentReadPreference.PREFER_SECONDARY)); //for case where parent age cannot be determined the preference should be primary assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(NODES, "/foo", "/foo/bar", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); //For collection other than NODES always primary assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(SETTINGS, "/foo", "/foo/bar", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); } @@ -124,7 +130,7 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { //For modifiedTime < replicationLag primary must be used assertEquals(ReadPreference.primary(), - mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(NODES, id, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); //Going into future to make parent /x old enough clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag); @@ -132,7 +138,40 @@ public class ReadPreferenceIT extends AbstractMongoConnectionTest { //For old modified nodes secondaries should be preferred assertEquals(testPref, - mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + mongoDS.getMongoReadPreference(NODES, id, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + } + + @Test + public void testMongoReadPreferencesWhenNodeBelongsToBranch() throws Exception{ + //Change the default + ReadPreference testPref = ReadPreference.secondary(); + mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref); + + NodeBuilder b1 = mk.getNodeStore().getRoot().builder(); + NodeBuilder x = b1.child("x"); + mk.getNodeStore().merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + for (int i = 0; i <= BRANCH_LIMIT; i++) { + NodeBuilder y = x.child("y_" + i); + y.setProperty("p1", "v1"); + y.setProperty("p2", "v2"); + } + String id = Utils.getIdFromPath("/x/y_0"); + String parentId = Utils.getParentId(id); + + //Going into future to make parent /x old enough + clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag); + mongoDS.setClock(clock); + + //The node /x/y_1 belongs to branch. Primary should be used. + assertTrue(mongoDS.belongsToBranch(id)); + assertEquals(ReadPreference.primary(), + mongoDS.getMongoReadPreference(NODES, id, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); + + mk.getNodeStore().merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + assertFalse(mongoDS.belongsToBranch(id)); + assertEquals(ReadPreference.secondary(), + mongoDS.getMongoReadPreference(NODES, id, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); } @Test diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReplicaSetInfoTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReplicaSetInfoTest.java new file mode 100644 index 0000000..1815bcc --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReplicaSetInfoTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import static org.apache.jackrabbit.oak.plugins.document.mongo.ReplicaSetInfo.ReplicaSetMemberState.PRIMARY; +import static org.apache.jackrabbit.oak.plugins.document.mongo.ReplicaSetInfo.ReplicaSetMemberState.RECOVERING; +import static org.apache.jackrabbit.oak.plugins.document.mongo.ReplicaSetInfo.ReplicaSetMemberState.SECONDARY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.plugins.document.Revision; +import org.apache.jackrabbit.oak.plugins.document.mongo.ReplicaSetInfo.ReplicaSetMemberState; +import org.bson.BasicBSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.mongodb.DB; + +public class ReplicaSetInfoTest { + + private static final long MAX_REPLICATION_LAG = TimeUnit.HOURS.toMillis(6); + + private ReplicaSetInfo replica; + + private ReplicationSetStatusMock replicationSet; + + @Before + public void resetEstimator() { + DB db = mock(DB.class); + when(db.getName()).thenReturn("oak-db"); + when(db.getSisterDB(Mockito.anyString())).thenReturn(db); + replica = new ReplicaSetInfo(db, null, 0, MAX_REPLICATION_LAG, 0l) { + @Override + protected List getRootRevisions(String hostName) throws UnknownHostException { + return replicationSet.revisions(hostName); + } + }; + } + + @Test + public void testMinimumRevision() { + addInstance(SECONDARY, "m1").addRevisions(20, 18, 3); + addInstance(SECONDARY, "m2").addRevisions(20, 1, 19); + updateRevisions(); + + assertEquals(20, replica.rootRevisions.get(0).getTimestamp()); + assertEquals( 1, replica.rootRevisions.get(1).getTimestamp()); + assertEquals( 3, replica.rootRevisions.get(2).getTimestamp()); + assertEquals( 1, (long) replica.minRootTimestamp); + } + + @Test + public void testIsSafeRevision() { + addInstance(SECONDARY, "m1").addRevisions(10, 21, 11); + addInstance(SECONDARY, "m2").addRevisions(15, 14, 13); + addInstance(SECONDARY, "m3").addRevisions(14, 13, 22); + updateRevisions(); + + assertTrue(replica.isSecondarySafe(lastRev(9, 13, 10))); + assertFalse(replica.isSecondarySafe(lastRev(11, 14, 10))); + } + + @Test + public void testIsSafeAge() { + addInstance(SECONDARY, "m1").addRevisions(10, 21, 11); + addInstance(SECONDARY, "m2").addRevisions(15, 14, 13); + addInstance(SECONDARY, "m3").addRevisions(14, 13, 22); + updateRevisions(); + + assertTrue (replica.isSecondarySafe(21, 30)); + assertFalse(replica.isSecondarySafe(10, 30)); + assertFalse(replica.isSecondarySafe(99, 9)); // current time is before the minimum update + } + + @Test + public void testBranchRevisionIsNotSafe() { + addInstance(SECONDARY, "m1").addRevisions(10, 21, 11); + addInstance(SECONDARY, "m2").addRevisions(15, 14, 13); + addInstance(SECONDARY, "m3").addRevisions(14, 13, 22); + updateRevisions(); + + assertTrue(replica.isSecondarySafe(lastRev(1, 1, 1))); + RevisionBuilder builder = new RevisionBuilder() + .addRevision(1, 0, 0, false) + .addRevision(1, 0, 1, false) + .addRevision(1, 0, 2, true); + assertFalse(replica.isSecondarySafe(builder.revsMap)); + } + + @Test + public void testUnknownStateIsNotSafe() { + addInstance(SECONDARY, "m1").addRevisions(10, 21, 11); + addInstance(RECOVERING, "m2"); + updateRevisions(); + + assertNull(replica.minRootTimestamp); + assertNull(replica.rootRevisions); + assertFalse(replica.isSecondarySafe(lastRev(1, 1, 1))); + assertFalse(replica.isSecondarySafe(30, 30)); + } + + @Test + public void testEmptyIsNotSafe() { + addInstance(PRIMARY, "m1"); + updateRevisions(); + + assertNull(replica.minRootTimestamp); + assertNull(replica.rootRevisions); + assertFalse(replica.isSecondarySafe(lastRev(1, 1, 1))); + assertFalse(replica.isSecondarySafe(30, 30)); + } + + @Test + public void testFallbackToTheMaximumLag() { + addInstance(RECOVERING, "m1"); + updateRevisions(); + assertTrue(replica.isSecondarySafe(MAX_REPLICATION_LAG, System.currentTimeMillis())); + } + + private RevisionBuilder addInstance(ReplicaSetMemberState state, String name) { + if (replicationSet == null) { + replicationSet = new ReplicationSetStatusMock(); + } + return replicationSet.addInstance(state, name); + } + + private void updateRevisions() { + replica.updateRevisions(replicationSet.getMembers()); + replicationSet = null; + } + + private static Map lastRev(int... timestamps) { + return new RevisionBuilder().addRevisions(timestamps).revsMap; + } + + private class ReplicationSetStatusMock { + + private List members = new ArrayList(); + + private Map> memberRevisions = new HashMap>(); + + private RevisionBuilder addInstance(ReplicaSetMemberState state, String name) { + BasicBSONObject member = new BasicBSONObject(); + member.put("stateStr", state.name()); + member.put("name", name); + members.add(member); + + RevisionBuilder builder = new RevisionBuilder(); + memberRevisions.put(name, builder.revs); + return builder; + } + + private List getMembers() { + return members; + } + + private List revisions(String name) { + return memberRevisions.get(name); + } + } + + private static class RevisionBuilder { + + private final List revs = new ArrayList(); + + private final Map revsMap = new HashMap(); + + private RevisionBuilder addRevisions(int... timestamps) { + for (int i = 0; i < timestamps.length; i++) { + addRevision(timestamps[i], 0, i, false); + } + return this; + } + + private RevisionBuilder addRevision(int timestamp, int counter, int clusterId, boolean branch) { + Revision rev = new Revision(timestamp, counter, clusterId, branch); + revs.add(rev); + revsMap.put(clusterId, rev); + return this; + } + } +}