From bad9bbbb1e25466b633d6a9e269931f1a3f0d508 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Thu, 5 Apr 2018 09:23:53 +0200 Subject: [PATCH 01/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../document/mongo/MongoBlobReferenceIterator.java | 3 +- .../mongo/MongoDocumentNodeStoreBuilderBase.java | 4 +- .../plugins/document/mongo/MongoDocumentStore.java | 246 ++++++++++++++++----- .../document/mongo/MongoMissingLastRevSeeker.java | 3 +- .../document/mongo/MongoSessionFactory.java | 114 ++++++++++ .../oak/plugins/document/mongo/MongoStatus.java | 35 ++- .../src/test/java/com/mongodb/OakFongo.java | 22 ++ 7 files changed, 356 insertions(+), 71 deletions(-) create mode 100644 oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java index 5aee5fb8d9..ff4800d13a 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java @@ -38,7 +38,8 @@ private final MongoDocumentStore documentStore; - public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, MongoDocumentStore documentStore) { + public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, + MongoDocumentStore documentStore) { super(nodeStore); this.documentStore = documentStore; } diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java index 2f18604e92..d76bc9b77b 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java @@ -79,12 +79,10 @@ public T setMongoDB(@Nonnull String uri, throws UnknownHostException { this.mongoUri = uri; - MongoClusterListener listener = new MongoClusterListener(); MongoClientOptions.Builder options = MongoConnection.getDefaultBuilder(); - options.addClusterListener(listener); options.socketKeepAlive(socketKeepAlive); MongoClient client = new MongoClient(new MongoClientURI(uri, options)); - MongoStatus status = new MongoStatus(client, name, listener); + MongoStatus status = new MongoStatus(client, name); MongoDatabase db = client.getDatabase(name); if (!MongoConnection.hasWriteConcern(uri)) { db = db.withWriteConcern(MongoConnection.getDefaultWriteConcern(client)); diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index f23d7aa426..864e893149 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -105,7 +105,9 @@ import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; +import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; +import com.mongodb.session.ClientSession; import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.not; @@ -150,6 +152,8 @@ private final MongoCollection journal; private final MongoClient client; + private final MongoStatus status; + private final MongoSessionFactory sessionFactory; private final MongoDatabase db; private final NodeDocumentCache nodesCache; @@ -236,6 +240,12 @@ private final int queryRetries = Integer.getInteger("oak.mongo.queryRetries", 2); + /** + * Feature flag for use of MongoDB client sessions. + */ + private final boolean useClientSessions = Boolean.parseBoolean( + System.getProperty("oak.mongo.clientSessions", "true")); + private String lastReadWriteMode; private final Map metadata; @@ -262,6 +272,8 @@ public MongoDocumentStore(MongoClient client, String dbName, .build(); this.client = client; + this.status = mongoStatus; + this.sessionFactory = new MongoSessionFactory(client); this.db = client.getDatabase(dbName); stats = builder.getDocumentStoreStatsCollector(); nodes = db.getCollection(Collection.NODES.toString(), BasicDBObject.class); @@ -346,9 +358,10 @@ public MongoDocumentStore(MongoClient client, String dbName, LOG.info("Connected to MongoDB {} with maxReplicationLagMillis {}, " + "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, " + - "{}, serverStatus {}", - mongoStatus.getVersion(), maxReplicationLagMillis, maxDeltaForModTimeIdxSecs, - disableIndexHint, db.getWriteConcern(), + "withClientSession {}, {}, serverStatus {}", + mongoStatus.getVersion(), maxReplicationLagMillis, + maxDeltaForModTimeIdxSecs, disableIndexHint, + status.isClientSessionSupported(), db.getWriteConcern(), mongoStatus.getServerDetails()); } @@ -551,12 +564,12 @@ public NodeDocument call() throws Exception { @CheckForNull protected T findUncached(Collection collection, String key, DocumentReadPreference docReadPref) { log("findUncached", key, docReadPref); - MongoCollection dbCollection = getDBCollection(collection); final Stopwatch watch = startWatch(); boolean isSlaveOk = false; boolean docFound = true; try { ReadPreference readPreference = getMongoReadPreference(collection, null, key, docReadPref); + MongoCollection dbCollection = getDBCollection(collection, readPreference); if(readPreference.isSlaveOk()){ LOG.trace("Routing call to secondary for fetching [{}]", key); @@ -564,7 +577,14 @@ public NodeDocument call() throws Exception { } List result = new ArrayList<>(1); - dbCollection.withReadPreference(readPreference).find(getByKeyQuery(key)).into(result); + execute(session -> { + if (session != null) { + dbCollection.find(session, getByKeyQuery(key)).into(result); + } else { + dbCollection.find(getByKeyQuery(key)).into(result); + } + return null; + }); if(result.isEmpty()) { docFound = false; @@ -644,13 +664,18 @@ public NodeDocument call() throws Exception { int limit, long maxQueryTime) { log("query", fromKey, toKey, indexedProperty, startValue, limit); - MongoCollection dbCollection = getDBCollection(collection); List clauses = new ArrayList<>(); clauses.add(Filters.gt(Document.ID, fromKey)); clauses.add(Filters.lt(Document.ID, toKey)); - Bson hint = new BasicDBObject(NodeDocument.ID, 1); + Bson hint; + if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty) + && canUseModifiedTimeIdx(startValue)) { + hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1); + } else { + hint = new BasicDBObject(NodeDocument.ID, 1); + } if (indexedProperty != null) { if (NodeDocument.DELETED_ONCE.equals(indexedProperty)) { @@ -662,17 +687,12 @@ public NodeDocument call() throws Exception { clauses.add(Filters.eq(indexedProperty, true)); } else { clauses.add(Filters.gte(indexedProperty, startValue)); - - if (NodeDocument.MODIFIED_IN_SECS.equals(indexedProperty) - && canUseModifiedTimeIdx(startValue)) { - hint = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1); - } } } Bson query = Filters.and(clauses); String parentId = Utils.getParentIdFromLowerLimit(fromKey); long lockTime = -1; - final Stopwatch watch = startWatch(); + final Stopwatch watch = startWatch(); boolean isSlaveOk = false; int resultSize = 0; @@ -688,29 +708,38 @@ public NodeDocument call() throws Exception { isSlaveOk = true; LOG.trace("Routing call to secondary for fetching children from [{}] to [{}]", fromKey, toKey); } - FindIterable result = dbCollection - .withReadPreference(readPreference).find(query).sort(BY_ID_ASC); - if (limit >= 0) { - result.limit(limit); - } - if (!disableIndexHint && !hasModifiedIdCompoundIndex) { - result.modifiers(new BasicDBObject("$hint", hint)); - } - if (maxQueryTime > 0) { - // OAK-2614: set maxTime if maxQueryTimeMS > 0 - result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); - } - List list; - try (MongoCursor cursor = result.iterator()) { - list = new ArrayList(); - for (int i = 0; i < limit && cursor.hasNext(); i++) { - BasicDBObject o = cursor.next(); - T doc = convertFromDBObject(collection, o); - list.add(doc); + List list = new ArrayList(); + MongoCollection dbCollection = getDBCollection(collection, readPreference); + execute(session -> { + FindIterable result; + if (session != null) { + result = dbCollection.find(session, query); + } else { + result = dbCollection.find(query); } - resultSize = list.size(); - } + result.sort(BY_ID_ASC); + if (limit >= 0) { + result.limit(limit); + } + if (!disableIndexHint && !hasModifiedIdCompoundIndex) { + result.modifiers(new BasicDBObject("$hint", hint)); + } + if (maxQueryTime > 0) { + // OAK-2614: set maxTime if maxQueryTimeMS > 0 + result.maxTime(maxQueryTime, TimeUnit.MILLISECONDS); + } + + try (MongoCursor cursor = result.iterator()) { + for (int i = 0; i < limit && cursor.hasNext(); i++) { + BasicDBObject o = cursor.next(); + T doc = convertFromDBObject(collection, o); + list.add(doc); + } + } + return null; + }); + resultSize = list.size(); if (cacheChangesTracker != null) { nodesCache.putNonConflictingDocs(cacheChangesTracker, (List) list); @@ -739,7 +768,15 @@ boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { MongoCollection dbCollection = getDBCollection(collection); Stopwatch watch = startWatch(); try { - dbCollection.deleteOne(getByKeyQuery(key)); + execute(session -> { + Bson filter = getByKeyQuery(key); + if (session != null) { + dbCollection.deleteOne(session, filter); + } else { + dbCollection.deleteOne(filter); + } + return null; + }); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + key); } finally { @@ -757,7 +794,14 @@ boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { for(List keyBatch : Lists.partition(keys, IN_CLAUSE_BATCH_SIZE)){ Bson query = Filters.in(Document.ID, keyBatch); try { - dbCollection.deleteMany(query); + execute(session -> { + if (session != null) { + dbCollection.deleteMany(session, query); + } else { + dbCollection.deleteMany(query); + } + return null; + }); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + keyBatch); } finally { @@ -793,7 +837,15 @@ boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { if (!it.hasNext() || batch.size() == IN_CLAUSE_BATCH_SIZE) { Bson query = Filters.or(batch); try { - num += dbCollection.deleteMany(query).getDeletedCount(); + num += execute(session -> { + DeleteResult result; + if (session != null) { + result = dbCollection.deleteMany(session, query); + } else { + result = dbCollection.deleteMany(query); + } + return result.getDeletedCount(); + }); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + batch); } finally { @@ -825,7 +877,15 @@ boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { Filters.lt(indexedProperty, endValue) ); try { - num = (int) Math.min(dbCollection.deleteMany(query).getDeletedCount(), Integer.MAX_VALUE); + num = (int) Math.min(execute((DocumentStoreCallable) session -> { + DeleteResult result; + if (session != null) { + result = dbCollection.deleteMany(session, query); + } else { + result = dbCollection.deleteMany(query); + } + return result.getDeletedCount(); + }), Integer.MAX_VALUE); } catch (Exception e) { throw DocumentStoreException.convert(e, "Remove failed for " + collection + ": " + indexedProperty + " in (" + startValue + ", " + endValue + ")"); @@ -880,16 +940,23 @@ boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { // no conditions and the check is OK. this avoid an // unnecessary call when the conditions do not match if (!checkConditions || UpdateUtils.checkConditions(cachedDoc, updateOp.getConditions())) { - Bson query = createQueryForUpdate(updateOp.getId(), - updateOp.getConditions()); // below condition may overwrite a user supplied condition // on _modCount. This is fine, because the conditions were // already checked against the cached document with the // matching _modCount value. There is no need to check the // user supplied condition on _modCount again on the server - query = Filters.and(query, Filters.eq(Document.MOD_COUNT, modCount)); - - UpdateResult result = dbCollection.updateOne(query, update); + Bson query = Filters.and( + createQueryForUpdate(updateOp.getId(), updateOp.getConditions()), + Filters.eq(Document.MOD_COUNT, modCount) + ); + + UpdateResult result = execute(session -> { + if (session != null) { + return dbCollection.updateOne(session, query, update); + } else { + return dbCollection.updateOne(query, update); + } + }); if (result.getModifiedCount() > 0) { // success, update cached document if (collection == Collection.NODES) { @@ -907,7 +974,13 @@ boolean canUseModifiedTimeIdx(long modifiedTimeInSecs) { Bson query = createQueryForUpdate(updateOp.getId(), updateOp.getConditions()); FindOneAndUpdateOptions options = new FindOneAndUpdateOptions() .returnDocument(ReturnDocument.BEFORE).upsert(upsert); - BasicDBObject oldNode = dbCollection.findOneAndUpdate(query, update, options); + BasicDBObject oldNode = execute(session -> { + if (session != null) { + return dbCollection.findOneAndUpdate(session, query, update, options); + } else { + return dbCollection.findOneAndUpdate(query, update, options); + } + }); if (oldNode == null){ newEntry = true; @@ -1143,13 +1216,20 @@ public String apply(UpdateOp input) { for (String key : keys) { conditions.add(getByKeyQuery(key)); } - - FindIterable cursor = getDBCollection(collection) - .find(Filters.or(conditions)); - for (BasicDBObject doc : cursor) { - T foundDoc = convertFromDBObject(collection, doc); - docs.put(foundDoc.getId(), foundDoc); - } + MongoCollection dbCollection = getDBCollection(collection); + execute(session -> { + FindIterable cursor; + if (session != null) { + cursor = dbCollection.find(session, Filters.or(conditions)); + } else { + cursor = dbCollection.find(Filters.or(conditions)); + } + for (BasicDBObject doc : cursor) { + T foundDoc = convertFromDBObject(collection, doc); + docs.put(foundDoc.getId(), foundDoc); + } + return null; + }); } return docs; } @@ -1177,9 +1257,15 @@ public String apply(UpdateOp input) { BulkWriteResult bulkResult; Set failedUpdates = new HashSet(); Set upserts = new HashSet(); + BulkWriteOptions options = new BulkWriteOptions().ordered(false); try { - bulkResult = dbCollection.bulkWrite(writes, - new BulkWriteOptions().ordered(false)); + bulkResult = execute(session -> { + if (session != null) { + return dbCollection.bulkWrite(session, writes, options); + } else { + return dbCollection.bulkWrite(writes, options); + } + }); } catch (MongoBulkWriteException e) { bulkResult = e.getWriteResult(); for (BulkWriteError err : e.getWriteErrors()) { @@ -1257,7 +1343,14 @@ public String apply(UpdateOp input) { boolean insertSuccess = false; try { try { - dbCollection.insertMany(inserts); + execute(session -> { + if (session != null) { + dbCollection.insertMany(session, inserts); + } else { + dbCollection.insertMany(inserts); + } + return null; + }); if (collection == Collection.NODES) { for (T doc : docs) { nodesCache.putIfAbsent((NodeDocument) doc); @@ -1323,8 +1416,14 @@ DocumentReadPreference getReadPreference(int maxCacheAge){ } } - DocumentReadPreference getDefaultReadPreference(Collection col){ - return col == Collection.NODES ? DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH : DocumentReadPreference.PRIMARY; + DocumentReadPreference getDefaultReadPreference(Collection col) { + DocumentReadPreference preference = DocumentReadPreference.PRIMARY; + if (status.isClientSessionSupported()) { + preference = DocumentReadPreference.PREFER_SECONDARY; + } else if (col == Collection.NODES) { + preference = DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH; + } + return preference; } ReadPreference getMongoReadPreference(@Nonnull Collection collection, @@ -1444,6 +1543,11 @@ DocumentReadPreference getDefaultReadPreference(Collection col){ } } + MongoCollection getDBCollection(Collection collection, + ReadPreference readPreference) { + return getDBCollection(collection).withReadPreference(readPreference); + } + MongoDatabase getDatabase() { return db; } @@ -1790,6 +1894,36 @@ private static void toMapBuilder(ImmutableMap.Builder builder, }); } + /** + * Execute a callable with an optional {@link ClientSession}. A client + * session is passed to {@link DocumentStoreCallable#call(ClientSession)} if + * the connected MongoDB servers support client sessions, otherwise the + * session is {@code null}. The client session must only be used within + * the scope of the {@link DocumentStoreCallable#call(ClientSession)}. + * + * @param callable the callable. + * @param the return type of the callable. + * @return the result of the callable. + * @throws DocumentStoreException if the callable throws an exception. + */ + private T execute(DocumentStoreCallable callable) + throws DocumentStoreException { + T result; + if (status.isClientSessionSupported() && useClientSessions) { + try (ClientSession session = sessionFactory.createClientSession()) { + result = callable.call(session); + } + } else { + result = callable.call(null); + } + return result; + } + + interface DocumentStoreCallable { + + T call(@Nullable ClientSession session) throws DocumentStoreException; + } + private static class BulkUpdateResult { private final Set failedUpdates; diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java index 8be743b892..0731db4f08 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java @@ -59,7 +59,6 @@ public MongoMissingLastRevSeeker(MongoDocumentStore store, Clock clock) { Bson sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, 1); FindIterable cursor = getNodeCollection() - .withReadPreference(ReadPreference.primary()) .find(query).sort(sortFields); return CloseableIterable.wrap(transform(cursor, input -> store.convertFromDBObject(NODES, input))); @@ -76,7 +75,7 @@ public boolean isRecoveryNeeded() { } private MongoCollection getNodeCollection() { - return store.getDBCollection(NODES); + return store.getDBCollection(NODES, ReadPreference.primary()); } private MongoCollection getClusterNodeCollection() { diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java new file mode 100644 index 0000000000..0a8ec280e0 --- /dev/null +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoSessionFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import com.mongodb.ClientSessionOptions; +import com.mongodb.MongoClient; +import com.mongodb.session.ClientSession; +import com.mongodb.session.ServerSession; + +import org.bson.BsonDocument; +import org.bson.BsonTimestamp; + +/** + * Factory for {@link ClientSession}s. + */ +class MongoSessionFactory { + + private final MongoClient client; + + private final ClientSessionOptions options; + + private BsonDocument clusterTime; + + private BsonTimestamp operationTime; + + MongoSessionFactory(MongoClient client) { + this.client = client; + this.options = ClientSessionOptions.builder() + .causallyConsistent(true).build(); + } + + ClientSession createClientSession() { + ClientSession s = client.startSession(options); + synchronized (this) { + s.advanceClusterTime(clusterTime); + s.advanceOperationTime(operationTime); + } + return new TrackingClientSession(s); + } + + private class TrackingClientSession implements ClientSession { + + private final ClientSession session; + + TrackingClientSession(ClientSession session) { + this.session = session; + } + + @Override + public ClientSessionOptions getOptions() { + return session.getOptions(); + } + + @Override + public boolean isCausallyConsistent() { + return session.isCausallyConsistent(); + } + + @Override + public Object getOriginator() { + return session.getOriginator(); + } + + @Override + public ServerSession getServerSession() { + return session.getServerSession(); + } + + @Override + public BsonTimestamp getOperationTime() { + return session.getOperationTime(); + } + + @Override + public void advanceOperationTime(BsonTimestamp operationTime) { + session.advanceOperationTime(operationTime); + } + + @Override + public void advanceClusterTime(BsonDocument clusterTime) { + session.advanceClusterTime(clusterTime); + } + + @Override + public BsonDocument getClusterTime() { + return session.getClusterTime(); + } + + @Override + public void close() { + synchronized (this) { + session.advanceClusterTime(clusterTime); + clusterTime = session.getClusterTime(); + session.advanceOperationTime(operationTime); + operationTime = session.getOperationTime(); + } + session.close(); + } + } +} diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java index fe90bddb1f..6c44f545d5 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoStatus.java @@ -19,11 +19,14 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.mongodb.BasicDBObject; +import com.mongodb.ClientSessionOptions; import com.mongodb.MongoClient; +import com.mongodb.MongoClientException; import com.mongodb.MongoQueryException; import com.mongodb.ReadConcern; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; +import com.mongodb.session.ClientSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +49,6 @@ private final String dbName; - private final ClusterDescriptionProvider descriptionProvider; - private BasicDBObject serverStatus; private BasicDBObject buildInfo; @@ -58,17 +59,12 @@ private Boolean majorityReadConcernEnabled; - public MongoStatus(@Nonnull MongoClient client, - @Nonnull String dbName) { - this(client, dbName, () -> null); - } + private Boolean clientSessionSupported; public MongoStatus(@Nonnull MongoClient client, - @Nonnull String dbName, - @Nonnull ClusterDescriptionProvider descriptionProvider) { + @Nonnull String dbName) { this.client = client; this.dbName = dbName; - this.descriptionProvider = descriptionProvider; } public void checkVersion() { @@ -172,6 +168,27 @@ boolean isVersion(int requiredMajor, int requiredMinor) { } } + /** + * @return {@code true} if client sessions are supported. + */ + boolean isClientSessionSupported() { + if (clientSessionSupported == null) { + // must be at least 3.6 + if (isVersion(3, 6)) { + ClientSessionOptions options = ClientSessionOptions.builder() + .causallyConsistent(true).build(); + try (ClientSession ignored = client.startSession(options)) { + clientSessionSupported = true; + } catch (MongoClientException e) { + clientSessionSupported = false; + } + } else { + clientSessionSupported = false; + } + } + return clientSessionSupported; + } + private BasicDBObject getServerStatus() { if (serverStatus == null) { serverStatus = client.getDatabase(dbName).runCommand( diff --git a/oak-store-document/src/test/java/com/mongodb/OakFongo.java b/oak-store-document/src/test/java/com/mongodb/OakFongo.java index 3c9566586b..f8649bdbc4 100644 --- a/oak-store-document/src/test/java/com/mongodb/OakFongo.java +++ b/oak-store-document/src/test/java/com/mongodb/OakFongo.java @@ -17,6 +17,7 @@ package com.mongodb; import java.lang.reflect.Field; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,6 +33,10 @@ import com.mongodb.client.model.WriteModel; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; +import com.mongodb.connection.Cluster; +import com.mongodb.connection.ClusterConnectionMode; +import com.mongodb.connection.ClusterDescription; +import com.mongodb.connection.ClusterType; import com.mongodb.connection.ServerVersion; import com.mongodb.session.ClientSession; @@ -42,6 +47,7 @@ import static java.util.stream.Collectors.toList; import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -99,6 +105,22 @@ private MongoClient createClientProxy() { for (String dbName : new String[]{MongoUtils.DB, "oak"}) { when(c.getDatabase(dbName)).thenReturn(new OakFongoMongoDatabase(dbName, this)); } + try { + Field credentialsList = Mongo.class.getDeclaredField("credentialsList"); + credentialsList.setAccessible(true); + credentialsList.set(c, Collections.emptyList()); + + ClusterDescription cd = new ClusterDescription(ClusterConnectionMode.SINGLE, + ClusterType.STANDALONE, Collections.emptyList()); + Cluster cl = mock(Cluster.class); + when(cl.getDescription()).thenReturn(cd); + + Field cluster = Mongo.class.getDeclaredField("cluster"); + cluster.setAccessible(true); + cluster.set(c, cl); + } catch (Exception e) { + throw new RuntimeException(e); + } return c; } From a86d5b20ffc875a9af666386eb075fc88919fd2f Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Tue, 24 Apr 2018 14:43:41 +0200 Subject: [PATCH 02/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .travis.yml | 1 + oak-store-document/pom.xml | 9 ++++++ .../mongo/MongoDocumentNodeStoreBuilderBase.java | 27 +++++++++++++---- .../plugins/document/mongo/MongoDocumentStore.java | 34 +++++++++++++++------- .../oak/plugins/document/DocumentStoreStatsIT.java | 7 +++-- .../mongo/MongoDocumentNodeStoreBuilderTest.java | 7 +++++ .../plugins/document/mongo/ReadPreferenceIT.java | 6 ++++ 7 files changed, 72 insertions(+), 19 deletions(-) diff --git a/.travis.yml b/.travis.yml index 131cf22d50..e04ae287df 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ env: - MODULE=oak-jcr PROFILE="-PintegrationTesting" UT="-Dsurefire.skip.ut=true" MONGODB_MODE="--single" - MODULE=oak-jcr PROFILE="" UT="" MONGODB_MODE="--single" - MODULE=oak-store-document PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" + - MODULE=oak-store-document PROFILE="-PintegrationTesting -Preplicaset" UT="" MONGODB_MODE="--replicaset" - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-lucene PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-run PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" diff --git a/oak-store-document/pom.xml b/oak-store-document/pom.xml index 6626781429..ada30e330d 100644 --- a/oak-store-document/pom.xml +++ b/oak-store-document/pom.xml @@ -63,6 +63,15 @@ + + + replicaset + + mongodb://localhost:27017,localhost:27018,localhost:27019/MongoMKDB?readPreference=secondaryPreferred + + + + diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java index d76bc9b77b..a0c7f19663 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderBase.java @@ -16,7 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.document.mongo; -import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -56,6 +55,7 @@ private boolean socketKeepAlive = true; private MongoStatus mongoStatus; private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6); + private boolean clientSessionDisabled = false; /** * Uses the given information to connect to to MongoDB as backend @@ -70,13 +70,10 @@ * any database name given in the {@code uri}. * @param blobCacheSizeMB the blob cache size in MB. * @return this - * @throws UnknownHostException if one of the hosts given in the URI - * is unknown. */ public T setMongoDB(@Nonnull String uri, @Nonnull String name, - int blobCacheSizeMB) - throws UnknownHostException { + int blobCacheSizeMB) { this.mongoUri = uri; MongoClientOptions.Builder options = MongoConnection.getDefaultBuilder(); @@ -165,6 +162,26 @@ public boolean isSocketKeepAlive() { return socketKeepAlive; } + /** + * Disables the use of a client session available with MongoDB 3.6 and + * newer. By default the MongoDocumentStore will use a client session if + * available. That is, when connected to MongoDB 3.6 and newer. + * + * @param b whether to disable the use of a client session. + * @return this + */ + public T setClientSessionDisabled(boolean b) { + this.clientSessionDisabled = b; + return thisBuilder(); + } + + /** + * @return whether the use of a client session is disabled. + */ + boolean isClientSessionDisabled() { + return clientSessionDisabled; + } + public T setMaxReplicationLag(long duration, TimeUnit unit){ maxReplicationLagMillis = unit.toMillis(duration); return thisBuilder(); diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index 864e893149..753a1e13ed 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -243,8 +243,7 @@ /** * Feature flag for use of MongoDB client sessions. */ - private final boolean useClientSessions = Boolean.parseBoolean( - System.getProperty("oak.mongo.clientSessions", "true")); + private final boolean useClientSession; private String lastReadWriteMode; @@ -292,6 +291,8 @@ public MongoDocumentStore(MongoClient client, String dbName, replicaInfoThread.setDaemon(true); replicaInfoThread.start(); } + useClientSession = !builder.isClientSessionDisabled() + && Boolean.parseBoolean(System.getProperty("oak.mongo.clientSession", "true")); // indexes: // the _id field is the primary key, so we don't need to define it @@ -358,11 +359,11 @@ public MongoDocumentStore(MongoClient client, String dbName, LOG.info("Connected to MongoDB {} with maxReplicationLagMillis {}, " + "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, " + - "withClientSession {}, {}, serverStatus {}", + "clientSessionSupported {}, clientSessionInUse {}, serverStatus {}", mongoStatus.getVersion(), maxReplicationLagMillis, maxDeltaForModTimeIdxSecs, disableIndexHint, - status.isClientSessionSupported(), db.getWriteConcern(), - mongoStatus.getServerDetails()); + status.isClientSessionSupported(), useClientSession, + db.getWriteConcern(), mongoStatus.getServerDetails()); } public boolean isReadOnly() { @@ -459,8 +460,11 @@ public CacheInvalidationStats invalidateCache(Iterable keys) { boolean preferCached, final int maxCacheAge) { if (collection != Collection.NODES) { - return findUncachedWithRetry(collection, key, - DocumentReadPreference.PRIMARY); + DocumentReadPreference readPref = DocumentReadPreference.PRIMARY; + if (withClientSession()) { + readPref = getDefaultReadPreference(collection); + } + return findUncachedWithRetry(collection, key, readPref); } NodeDocument doc; if (maxCacheAge > 0 || preferCached) { @@ -1407,7 +1411,9 @@ public String apply(UpdateOp input) { DocumentReadPreference getReadPreference(int maxCacheAge){ long lag = fallbackSecondaryStrategy ? maxReplicationLagMillis : replicaInfo.getLag(); - if(maxCacheAge >= 0 && maxCacheAge < lag) { + if (withClientSession()) { + return DocumentReadPreference.PREFER_SECONDARY; + } else if(maxCacheAge >= 0 && maxCacheAge < lag) { return DocumentReadPreference.PRIMARY; } else if(maxCacheAge == Integer.MAX_VALUE){ return DocumentReadPreference.PREFER_SECONDARY; @@ -1418,7 +1424,7 @@ DocumentReadPreference getReadPreference(int maxCacheAge){ DocumentReadPreference getDefaultReadPreference(Collection col) { DocumentReadPreference preference = DocumentReadPreference.PRIMARY; - if (status.isClientSessionSupported()) { + if (withClientSession()) { preference = DocumentReadPreference.PREFER_SECONDARY; } else if (col == Collection.NODES) { preference = DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH; @@ -1443,7 +1449,9 @@ DocumentReadPreference getDefaultReadPreference(Collection col) { } boolean secondarySafe; - if (fallbackSecondaryStrategy) { + if (withClientSession()) { + secondarySafe = true; + } else if (fallbackSecondaryStrategy) { // This is not quite accurate, because ancestors // are updated in a background thread (_lastRev). We // will need to revise this for low maxReplicationLagMillis @@ -1894,6 +1902,10 @@ private static void toMapBuilder(ImmutableMap.Builder builder, }); } + private boolean withClientSession() { + return status.isClientSessionSupported() && useClientSession; + } + /** * Execute a callable with an optional {@link ClientSession}. A client * session is passed to {@link DocumentStoreCallable#call(ClientSession)} if @@ -1909,7 +1921,7 @@ private static void toMapBuilder(ImmutableMap.Builder builder, private T execute(DocumentStoreCallable callable) throws DocumentStoreException { T result; - if (status.isClientSessionSupported() && useClientSessions) { + if (withClientSession()) { try (ClientSession session = sessionFactory.createClientSession()) { result = callable.call(session); } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java index 381acd3782..d37e4bc219 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreStatsIT.java @@ -39,6 +39,7 @@ import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.getModifiedInSecs; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeFalse; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -85,7 +86,7 @@ public void findCached_Uncached() throws Exception{ ds.invalidateCache(); ds.find(Collection.NODES, id); - verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(true), eq(false)); + verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(true), anyBoolean()); } @Test @@ -93,7 +94,7 @@ public void findMissing() throws Exception { String id = testName.getMethodName(); ds.find(Collection.NODES, id); - verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(false), eq(false)); + verify(stats).doneFindUncached(anyLong(), eq(Collection.NODES), eq(id), eq(false), anyBoolean()); } @Test @@ -113,7 +114,7 @@ public void query() throws Exception{ eq(false), //indexedProperty eq(5) , // resultSize anyLong(), //lockTime - eq(false) //isSlaveOk + anyBoolean() //isSlaveOk ); } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderTest.java index 27f65ca69d..48e549519a 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentNodeStoreBuilderTest.java @@ -18,6 +18,7 @@ import org.junit.Test; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class MongoDocumentNodeStoreBuilderTest { @@ -27,4 +28,10 @@ public void socketKeepAlive() { MongoDocumentNodeStoreBuilder builder = new MongoDocumentNodeStoreBuilder(); assertTrue(builder.isSocketKeepAlive()); } + + @Test + public void clientSessionDisabled() { + MongoDocumentNodeStoreBuilder builder = new MongoDocumentNodeStoreBuilder(); + assertFalse(builder.isClientSessionDisabled()); + } } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java index 769f186637..ed8c5e13de 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/ReadPreferenceIT.java @@ -70,6 +70,7 @@ public void setUpConnection() throws Exception { mk = new DocumentMK.Builder() .clock(clock) .setClusterId(1) + .setClientSessionDisabled(true) .setMongoDB(mongoConnection.getMongoClient(), mongoConnection.getDBName()) .setLeaseCheck(false) .open(); @@ -80,6 +81,7 @@ public void setUpConnection() throws Exception { mk2 = new DocumentMK.Builder() .clock(clock) .setClusterId(2) + .setClientSessionDisabled(true) .setMongoDB(mongoConnection2.getMongoClient(), mongoConnection2.getDBName()) .setLeaseCheck(false) .open(); @@ -127,6 +129,9 @@ public void testPreferenceConversion() throws Exception{ @Test public void testMongoReadPreferencesDefault() throws Exception{ + // start with read preference set to primary + mongoDS.setReadWriteMode(rwMode(ReadPreference.primary())); + assertEquals(ReadPreference.primary(), mongoDS.getMongoReadPreference(NODES,"foo", null, DocumentReadPreference.PRIMARY)); @@ -224,6 +229,7 @@ public void testMongoReadPreferencesForLocalChanges() throws Exception { @Test public void testReadWriteMode() throws Exception{ + mongoDS.setReadWriteMode(rwMode(ReadPreference.primary())); assertEquals(ReadPreference.primary(), mongoDS.getConfiguredReadPreference(NODES)); mongoDS.setReadWriteMode("readPreference=secondary&w=2&safe=true&j=true"); From 947630fcd173e8fe824b691471903889e531072a Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Tue, 24 Apr 2018 17:46:44 +0200 Subject: [PATCH 03/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../document/mongo/MongoBlobReferenceIterator.java | 14 ++++++++++---- .../oak/plugins/document/mongo/MongoBlobStore.java | 6 +++++- .../oak/plugins/document/mongo/MongoDocumentStore.java | 2 +- .../plugins/document/AbstractMultiDocumentStoreTest.java | 16 ++++++++++++++++ 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java index ff4800d13a..8ec3e2aaa1 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java @@ -31,6 +31,7 @@ import org.bson.conversions.Bson; import com.mongodb.BasicDBObject; +import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Filters; @@ -47,10 +48,15 @@ public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, @Override public Iterator getIteratorOverDocsWithBinaries() { Bson query = Filters.eq(NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL); - // TODO It currently prefers secondary. Would that be Ok? - MongoCursor cursor = documentStore.getDBCollection(NODES) - .withReadPreference(documentStore.getConfiguredReadPreference(NODES)) - .find(query).iterator(); + MongoCollection nodes = documentStore.getDBCollection(NODES) + .withReadPreference(documentStore.getConfiguredReadPreference(NODES)); + MongoCursor cursor = documentStore.execute(session -> { + if (session != null) { + return nodes.find(session, query).iterator(); + } else { + return nodes.find(query).iterator(); + } + }); return CloseableIterator.wrap(transform(cursor, input -> documentStore.convertFromDBObject(NODES, input)), diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java index 548e01b9d8..46b1483907 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java @@ -184,8 +184,12 @@ public int sweep() throws IOException { .noneMatch(COLLECTION_BLOBS::equals)) { db.createCollection(COLLECTION_BLOBS); } + // override the read preference configured with the MongoDB URI + // and use the primary as default. Reading a blob will still + // try a secondary first and then fallback to the primary. return db.getCollection(COLLECTION_BLOBS, MongoBlob.class) - .withCodecRegistry(CODEC_REGISTRY); + .withCodecRegistry(CODEC_REGISTRY) + .withReadPreference(primary()); } private MongoCollection getBlobCollection() { diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index 753a1e13ed..dcec433990 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -1918,7 +1918,7 @@ private boolean withClientSession() { * @return the result of the callable. * @throws DocumentStoreException if the callable throws an exception. */ - private T execute(DocumentStoreCallable callable) + T execute(DocumentStoreCallable callable) throws DocumentStoreException { T result; if (withClientSession()) { diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java index 5aa52fefd8..f352b2c4a6 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractMultiDocumentStoreTest.java @@ -18,6 +18,8 @@ import java.util.Collection; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.runners.Parameterized; public abstract class AbstractMultiDocumentStoreTest extends AbstractDocumentStoreTest { @@ -30,6 +32,20 @@ public AbstractMultiDocumentStoreTest(DocumentStoreFixture dsf) { this.ds2 = dsf.createDocumentStore(2); } + @BeforeClass + public static void disableClientSession() { + // Disable the use of client session for this kind of tests. + // Most of these tests assume causal consistency across multiple + // DocumentStore instances, which is not the case when the test + // runs on a replica set and a client session is used. + System.setProperty("oak.mongo.clientSession", "false"); + } + + @AfterClass + public static void resetSystemProperty() { + System.clearProperty("oak.mongo.clientSession"); + } + @Override public void cleanUp() throws Exception { super.cleanUp(); From 7d51ee12b6fb1ff94541df486e71e378615f6203 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 10:10:35 +0200 Subject: [PATCH 04/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../oak/plugins/document/MongoBlobGCTest.java | 32 ++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java index 955ceabadc..840b2d9754 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java @@ -43,6 +43,8 @@ import com.google.common.io.Closeables; import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; +import com.mongodb.ReadPreference; + import junit.framework.Assert; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.jackrabbit.oak.api.Blob; @@ -56,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -72,6 +75,7 @@ .SharedStoreRecordType.REPOSITORY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for MongoMK GC @@ -87,8 +91,21 @@ public DataStoreState setUp(boolean deleteDirect) throws Exception { return setUp(deleteDirect, 10); } + @Override + protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) { + // Disable client session because this test modifies + // data directly in MongoDB. + // Disable lease check wrapper to get access to the MongoDocumentStore + return super.addToBuilder(mk) + .setClientSessionDisabled(true) + .setLeaseCheck(false); + } + public DataStoreState setUp(boolean deleteDirect, int count) throws Exception { DocumentNodeStore s = mk.getNodeStore(); + // ensure primary read preference for this test because we modify data + // directly in MongoDB without going through the MongoDocumentStore + setReadPreference(s, ReadPreference.primary()); NodeBuilder a = s.getRoot().builder(); int number = count; @@ -145,7 +162,15 @@ public DataStoreState setUp(boolean deleteDirect, int count) throws Exception { return state; } - + + private void setReadPreference(DocumentNodeStore s, ReadPreference pref) { + if (!(s.getDocumentStore() instanceof MongoDocumentStore)) { + fail("Not a MongoDocumentStore " + s.getDocumentStore()); + } + MongoDocumentStore store = (MongoDocumentStore) s.getDocumentStore(); + store.setReadWriteMode("readPreference=" + pref); + } + private class DataStoreState { Set blobsAdded = Sets.newHashSet(); Set blobsPresent = Sets.newHashSet(); @@ -284,7 +309,10 @@ public void consistencyCheckInit() throws Exception { public void consistencyCheckWithGc() throws Exception { DataStoreState state = setUp(true); Set existingAfterGC = gc(0); - assertTrue(Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty()); + assertTrue("blobsAdded: " + state.blobsAdded + + ", blobsPresent: " + state.blobsPresent + + ", existingAfterGC: " + existingAfterGC, + Sets.symmetricDifference(state.blobsPresent, existingAfterGC).isEmpty()); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); MarkSweepGarbageCollector gcObj = init(86400, executor); From abfcdb3d7924b85b374cd7d6985c8ac1375862a1 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 12:09:51 +0200 Subject: [PATCH 05/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../document/mongo/MongoBlobReferenceIterator.java | 13 +++-------- .../plugins/document/mongo/MongoDocumentStore.java | 2 +- .../document/mongo/MongoVersionGCSupport.java | 6 +---- .../document/blob/cloud/MongoCloudBlobGCTest.java | 26 +++++++++++----------- .../document/blob/ds/MongoDataStoreBlobGCTest.java | 18 +++++---------- 5 files changed, 24 insertions(+), 41 deletions(-) diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java index 8ec3e2aaa1..2e8ad894ea 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java @@ -31,7 +31,6 @@ import org.bson.conversions.Bson; import com.mongodb.BasicDBObject; -import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Filters; @@ -48,15 +47,9 @@ public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, @Override public Iterator getIteratorOverDocsWithBinaries() { Bson query = Filters.eq(NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL); - MongoCollection nodes = documentStore.getDBCollection(NODES) - .withReadPreference(documentStore.getConfiguredReadPreference(NODES)); - MongoCursor cursor = documentStore.execute(session -> { - if (session != null) { - return nodes.find(session, query).iterator(); - } else { - return nodes.find(query).iterator(); - } - }); + // TODO It currently prefers secondary. Would that be Ok? + MongoCursor cursor = documentStore.getDBCollection(NODES) + .find(query).iterator(); return CloseableIterator.wrap(transform(cursor, input -> documentStore.convertFromDBObject(NODES, input)), diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index dcec433990..753a1e13ed 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -1918,7 +1918,7 @@ private boolean withClientSession() { * @return the result of the callable. * @throws DocumentStoreException if the callable throws an exception. */ - T execute(DocumentStoreCallable callable) + private T execute(DocumentStoreCallable callable) throws DocumentStoreException { T result; if (withClientSession()) { diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java index 70ba9f6b49..1daa1a7d69 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoVersionGCSupport.java @@ -34,7 +34,6 @@ import com.google.common.collect.Lists; import com.mongodb.BasicDBObject; import com.mongodb.Block; -import com.mongodb.ReadPreference; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.Filters; @@ -100,7 +99,6 @@ public MongoVersionGCSupport(MongoDocumentStore store) { Filters.lt(MODIFIED_IN_SECS, getModifiedInSecs(toModified)) ); FindIterable cursor = getNodeCollection() - .withReadPreference(ReadPreference.secondaryPreferred()) .find(query).batchSize(batchSize); return CloseableIterable.wrap(transform(cursor, @@ -110,9 +108,7 @@ public MongoVersionGCSupport(MongoDocumentStore store) { @Override public long getDeletedOnceCount() { Bson query = Filters.eq(DELETED_ONCE, Boolean.TRUE); - return getNodeCollection() - .withReadPreference(ReadPreference.secondaryPreferred()) - .count(query); + return getNodeCollection().count(query); } @Override diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java index 082da235b9..5a507c4ba3 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/cloud/MongoCloudBlobGCTest.java @@ -19,7 +19,7 @@ import org.apache.jackrabbit.oak.plugins.blob.cloud.CloudBlobStore; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.MongoBlobGCTest; -import org.apache.jackrabbit.oak.plugins.document.MongoUtils; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.junit.After; import org.junit.Assume; import org.junit.Before; @@ -30,8 +30,11 @@ * */ public class MongoCloudBlobGCTest extends MongoBlobGCTest { + + private BlobStore blobStore; + @BeforeClass - public static void setUpBeforeClass() throws Exception { + public static void setUpBeforeClass() { try { Assume.assumeNotNull(CloudStoreUtils.getBlobStore()); } catch (Exception e) { @@ -40,20 +43,17 @@ public static void setUpBeforeClass() throws Exception { } @Before - @Override - public void setUpConnection() throws Exception { - mongoConnection = connectionFactory.getConnection(); - MongoUtils.dropCollections(mongoConnection.getDBName()); - mk = new DocumentMK.Builder() - .setMongoDB(mongoConnection.getMongoClient(), mongoConnection.getDBName()) - .setBlobStore(CloudStoreUtils.getBlobStore()).open(); + public void setUpBlobStore() throws Exception { + blobStore = CloudStoreUtils.getBlobStore(); } @After - @Override - public void tearDownConnection() throws Exception { + public void deleteBucket() { ((CloudBlobStore) mk.getNodeStore().getBlobStore()).deleteBucket(); - mk.dispose(); - MongoUtils.dropCollections(connectionFactory.getConnection().getDB()); + } + + @Override + protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) { + return super.addToBuilder(mk).setBlobStore(blobStore); } } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java index 31fdd3ccd0..fafa72e57d 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/ds/MongoDataStoreBlobGCTest.java @@ -45,22 +45,16 @@ public static void setUpBeforeClass() throws Exception { } } + @Override + protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) { + return super.addToBuilder(mk).setBlobStore(blobStore); + } + @Before @Override public void setUpConnection() throws Exception { startDate = new Date(); - mongoConnection = connectionFactory.getConnection(); - MongoUtils.dropCollections(mongoConnection.getDBName()); blobStore = DataStoreUtils.getBlobStore(folder.newFolder()); - mk = new DocumentMK.Builder().clock(getTestClock()) - .setMongoDB(mongoConnection.getMongoClient(), mongoConnection.getDBName()) - .setBlobStore(blobStore).open(); - } - - @After - @Override - public void tearDownConnection() throws Exception { - mk.dispose(); - MongoUtils.dropCollections(connectionFactory.getConnection().getDB()); + super.setUpConnection(); } } From 26ff8a215fe0ff5f491e969a7efb4d6d75044fd8 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 13:56:02 +0200 Subject: [PATCH 06/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../plugins/document/mongo/MongoBlobReferenceIterator.java | 3 ++- .../oak/plugins/document/BlobReferenceIteratorTest.java | 13 +++++++++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java index 2e8ad894ea..a1e4ff706a 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobReferenceIterator.java @@ -31,6 +31,7 @@ import org.bson.conversions.Bson; import com.mongodb.BasicDBObject; +import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Filters; @@ -47,7 +48,7 @@ public MongoBlobReferenceIterator(DocumentNodeStore nodeStore, @Override public Iterator getIteratorOverDocsWithBinaries() { Bson query = Filters.eq(NodeDocument.HAS_BINARY_FLAG, NodeDocument.HAS_BINARY_VAL); - // TODO It currently prefers secondary. Would that be Ok? + // TODO It currently uses the configured read preference. Would that be Ok? MongoCursor cursor = documentStore.getDBCollection(NODES) .find(query).iterator(); diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java index 70764bc333..f2334f1b72 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.mongodb.ReadPreference; + import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -67,11 +69,12 @@ public BlobReferenceIteratorTest(DocumentStoreFixture fixture) { } @Before - public void setUp() throws InterruptedException { + public void setUp() { store = new DocumentMK.Builder() .setDocumentStore(fixture.createDocumentStore()) .setAsyncDelay(0) .getNodeStore(); + setReadPreference(store.getDocumentStore()); } @After @@ -95,6 +98,12 @@ public void testBlobIterator() throws Exception { List collectedBlobs = ImmutableList.copyOf(store.getReferencedBlobsIterator()); assertEquals(blobs.size(), collectedBlobs.size()); - assertEquals(new HashSet(blobs), new HashSet(collectedBlobs)); + assertEquals(new HashSet<>(blobs), new HashSet<>(collectedBlobs)); + } + + private void setReadPreference(DocumentStore store) { + // enforce primary read preference, otherwise test fails on a replica + // set with a read preference configured to secondary. + store.setReadWriteMode("readPreference=" + ReadPreference.primary()); } } From 0dd328546b81b90e55640b1d96a2d3ca6f9d8c6c Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 14:20:42 +0200 Subject: [PATCH 07/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java index 840b2d9754..7bb9cd3732 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java @@ -58,7 +58,6 @@ import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator; -import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -75,7 +74,6 @@ .SharedStoreRecordType.REPOSITORY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Tests for MongoMK GC @@ -95,7 +93,6 @@ public DataStoreState setUp(boolean deleteDirect) throws Exception { protected DocumentMK.Builder addToBuilder(DocumentMK.Builder mk) { // Disable client session because this test modifies // data directly in MongoDB. - // Disable lease check wrapper to get access to the MongoDocumentStore return super.addToBuilder(mk) .setClientSessionDisabled(true) .setLeaseCheck(false); @@ -164,11 +161,7 @@ public DataStoreState setUp(boolean deleteDirect, int count) throws Exception { } private void setReadPreference(DocumentNodeStore s, ReadPreference pref) { - if (!(s.getDocumentStore() instanceof MongoDocumentStore)) { - fail("Not a MongoDocumentStore " + s.getDocumentStore()); - } - MongoDocumentStore store = (MongoDocumentStore) s.getDocumentStore(); - store.setReadWriteMode("readPreference=" + pref); + s.getDocumentStore().setReadWriteMode("readPreference=" + pref); } private class DataStoreState { From a6086319b3f286473041cdf9c389192bdd0cc5a6 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 14:54:59 +0200 Subject: [PATCH 08/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../document/BlobReferenceIteratorTest.java | 13 ++++---- .../plugins/document/DocumentNodeStoreSweepIT.java | 8 +++++ .../oak/plugins/document/MongoBlobGCTest.java | 7 ++--- .../oak/plugins/document/mongo/MongoTestUtils.java | 35 ++++++++++++++++++++++ 4 files changed, 50 insertions(+), 13 deletions(-) create mode 100644 oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoTestUtils.java diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java index f2334f1b72..20d0b39c42 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/BlobReferenceIteratorTest.java @@ -29,6 +29,7 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -52,7 +53,7 @@ public BlobReferenceIteratorTest(DocumentStoreFixture fixture) { } @Parameterized.Parameters(name="{0}") - public static java.util.Collection fixtures() throws IOException { + public static java.util.Collection fixtures() { List fixtures = Lists.newArrayList(); fixtures.add(new Object[] { new DocumentStoreFixture.MemoryFixture() }); @@ -74,7 +75,9 @@ public void setUp() { .setDocumentStore(fixture.createDocumentStore()) .setAsyncDelay(0) .getNodeStore(); - setReadPreference(store.getDocumentStore()); + // enforce primary read preference, otherwise test fails on a replica + // set with a read preference configured to secondary. + MongoTestUtils.setReadPreference(store, ReadPreference.primary()); } @After @@ -100,10 +103,4 @@ public void testBlobIterator() throws Exception { assertEquals(blobs.size(), collectedBlobs.size()); assertEquals(new HashSet<>(blobs), new HashSet<>(collectedBlobs)); } - - private void setReadPreference(DocumentStore store) { - // enforce primary read preference, otherwise test fails on a replica - // set with a read preference configured to secondary. - store.setReadWriteMode("readPreference=" + ReadPreference.primary()); - } } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java index 6860b66cb8..81b652964e 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java @@ -18,7 +18,10 @@ import java.util.SortedMap; +import com.mongodb.ReadPreference; + import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -49,6 +52,11 @@ public DocumentNodeStoreSweepIT(DocumentStoreFixture fixture) { @Override protected DocumentStore customize(DocumentStore store) { + // Enforce primary read preference because this test assumes causal + // consistent reads across multiple document stores. Otherwise this + // test fails on a replica set with secondary read preference + MongoTestUtils.setReadPreference(store, ReadPreference.primary()); + DocumentStore s; if (store1 == null) { store1 = new FailingDocumentStore(store, 42); diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java index 7bb9cd3732..56701e4f6c 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java @@ -58,6 +58,7 @@ import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils; import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -102,7 +103,7 @@ public DataStoreState setUp(boolean deleteDirect, int count) throws Exception { DocumentNodeStore s = mk.getNodeStore(); // ensure primary read preference for this test because we modify data // directly in MongoDB without going through the MongoDocumentStore - setReadPreference(s, ReadPreference.primary()); + MongoTestUtils.setReadPreference(s, ReadPreference.primary()); NodeBuilder a = s.getRoot().builder(); int number = count; @@ -160,10 +161,6 @@ public DataStoreState setUp(boolean deleteDirect, int count) throws Exception { return state; } - private void setReadPreference(DocumentNodeStore s, ReadPreference pref) { - s.getDocumentStore().setReadWriteMode("readPreference=" + pref); - } - private class DataStoreState { Set blobsAdded = Sets.newHashSet(); Set blobsPresent = Sets.newHashSet(); diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoTestUtils.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoTestUtils.java new file mode 100644 index 0000000000..8347019172 --- /dev/null +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoTestUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import com.mongodb.ReadPreference; + +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStore; + +public final class MongoTestUtils { + + public static void setReadPreference(DocumentNodeStore ns, + ReadPreference preference) { + setReadPreference(ns.getDocumentStore(), preference); + } + + public static void setReadPreference(DocumentStore store, + ReadPreference preference) { + store.setReadWriteMode("readPreference=" + preference.getName()); + } +} From 9cb635ff6b3226c24897cc55ae7d8fa2075ac42b Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 14:59:20 +0200 Subject: [PATCH 09/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index e04ae287df..b41ae4ed7a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ env: - MODULE=oak-jcr PROFILE="-PintegrationTesting" UT="-Dsurefire.skip.ut=true" MONGODB_MODE="--single" - MODULE=oak-jcr PROFILE="" UT="" MONGODB_MODE="--single" - MODULE=oak-store-document PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - - MODULE=oak-store-document PROFILE="-PintegrationTesting -Preplicaset" UT="" MONGODB_MODE="--replicaset" + - MODULE=oak-store-document PROFILE="-PintegrationTesting,replicaset" UT="" MONGODB_MODE="--replicaset" - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-lucene PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-run PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" From 6b5f24f50255a2e89ff490a3a73677e3799d867a Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 16:14:39 +0200 Subject: [PATCH 10/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../jackrabbit/oak/plugins/document/mongo/JournalIT.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java index 5b797689ee..6cf8f75575 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java @@ -19,6 +19,8 @@ import java.util.List; import java.util.Set; +import com.mongodb.ReadPreference; + import org.apache.jackrabbit.oak.cache.CacheStats; import org.apache.jackrabbit.oak.plugins.document.AbstractJournalTest; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; @@ -210,8 +212,13 @@ private void doLargeCleanupTest(int offset, int size) throws Exception { protected DocumentMK createMK(int clusterId, int asyncDelay) { MongoConnection c = connectionFactory.getConnection(); builder = newDocumentMKBuilder(); - return register(builder.setMongoDB(c.getMongoClient(), c.getDBName()) - .setClusterId(clusterId).setAsyncDelay(asyncDelay).setBundlingDisabled(true).open()); + DocumentMK mk = builder.setMongoDB(c.getMongoClient(), c.getDBName()) + .setClusterId(clusterId).setAsyncDelay(asyncDelay) + .setBundlingDisabled(true).open(); + // enforce primary read preference, otherwise test fails on a replica + // set with a read preference configured to secondary. + MongoTestUtils.setReadPreference(mk.getDocumentStore(), ReadPreference.primary()); + return register(mk); } private static long getCacheElementCount(DocumentStore ds) { From 82400695fc13cbb04cd819971f588b8bc4b174cb Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 16:26:06 +0200 Subject: [PATCH 11/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java | 6 ++++++ .../jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java | 8 -------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java index 8722a1fa9d..70658941e8 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractTwoNodeTest.java @@ -22,7 +22,9 @@ import java.util.List; import com.google.common.collect.Lists; +import com.mongodb.ReadPreference; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils; import org.apache.jackrabbit.oak.stats.Clock; import org.junit.After; import org.junit.Before; @@ -127,6 +129,10 @@ public void tearDown() throws Exception { } private static DocumentStore wrap(DocumentStore ds) { + // Enforce primary read preference because this test assumes causal + // consistent reads across multiple document stores. Otherwise this + // test fails on a replica set with secondary read preference + MongoTestUtils.setReadPreference(ds, ReadPreference.primary()); return new DocumentStoreTestWrapper(ds); } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java index 81b652964e..6860b66cb8 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreSweepIT.java @@ -18,10 +18,7 @@ import java.util.SortedMap; -import com.mongodb.ReadPreference; - import org.apache.jackrabbit.oak.api.CommitFailedException; -import org.apache.jackrabbit.oak.plugins.document.mongo.MongoTestUtils; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -52,11 +49,6 @@ public DocumentNodeStoreSweepIT(DocumentStoreFixture fixture) { @Override protected DocumentStore customize(DocumentStore store) { - // Enforce primary read preference because this test assumes causal - // consistent reads across multiple document stores. Otherwise this - // test fails on a replica set with secondary read preference - MongoTestUtils.setReadPreference(store, ReadPreference.primary()); - DocumentStore s; if (store1 == null) { store1 = new FailingDocumentStore(store, 42); From 9ac7b05e669a89f7e8b8bb49dfbd70d8c662c6c4 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 16:36:26 +0200 Subject: [PATCH 12/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../oak/plugins/document/mongo/CacheInvalidationIT.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java index 1151dfac93..83b86da340 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java @@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.plugins.document.mongo; +import com.mongodb.ReadPreference; + import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest; @@ -195,7 +197,7 @@ private static void runBgOps(DocumentNodeStore... stores) { private DocumentNodeStore createNS(int clusterId) throws Exception { MongoConnection mc = connectionFactory.getConnection(); - return new DocumentMK.Builder() + DocumentNodeStore ns = new DocumentMK.Builder() .setMongoDB(mc.getMongoClient(), mc.getDBName()) .setClusterId(clusterId) //Set delay to 0 so that effect of changes are immediately reflected @@ -203,6 +205,10 @@ private DocumentNodeStore createNS(int clusterId) throws Exception { .setBundlingDisabled(true) .setLeaseCheck(false) .getNodeStore(); + // enforce primary read preference, otherwise test fails on a replica + // set with a read preference configured to secondary. + MongoTestUtils.setReadPreference(ns, ReadPreference.primary()); + return ns; } } From 904cfc0c44920c3156eccd4b9628400c56966d7e Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 17:06:36 +0200 Subject: [PATCH 13/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .../jackrabbit/oak/plugins/document/DocumentStoreFixture.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java index eee2eef85e..d907f59cfb 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java @@ -232,7 +232,11 @@ public void dispose() { } catch (Exception ignore) { } for (MongoConnection c : connections) { - c.close(); + try { + c.close(); + } catch (IllegalStateException e) { + // may happen when connection is already closed (OAK-7447) + } } connections.clear(); } From 8ab9997775b9f26a3877e188c34cde33b126cf1a Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 25 Apr 2018 17:38:36 +0200 Subject: [PATCH 14/14] OAK-6087: Avoid reads from MongoDB primary Work in progress --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index b41ae4ed7a..48a0e72e2e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,12 +22,12 @@ env: - MODULE=oak-jcr PROFILE="" UT="" MONGODB_MODE="--single" - MODULE=oak-store-document PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-store-document PROFILE="-PintegrationTesting,replicaset" UT="" MONGODB_MODE="--replicaset" - - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-lucene PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" + - MODULE=oak-it PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-run PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - - MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - - MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" - MODULE=oak-upgrade PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" + - MODULE=oak-pojosr PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" + - MODULE=oak-it-osgi PROFILE="-PintegrationTesting" UT="" MONGODB_MODE="--single" install: - wget -N http://fastdl.mongodb.org/linux/mongodb-linux-x86_64-${MONGODB}.tgz -P $HOME/.mongodb - tar --skip-old-files -C $HOME/.mongodb -xf $HOME/.mongodb/mongodb-linux-x86_64-${MONGODB}.tgz