diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index 8b4d9b2..d0586f9 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -21,10 +21,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -39,6 +43,7 @@ import javax.annotation.Nullable; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.UncheckedExecutionException; import com.mongodb.MongoClientURI; @@ -72,8 +77,14 @@ import org.apache.jackrabbit.oak.util.PerfLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.collect.Maps; import com.mongodb.BasicDBObject; +import com.mongodb.BulkWriteError; +import com.mongodb.BulkWriteException; +import com.mongodb.BulkWriteOperation; +import com.mongodb.BulkWriteResult; +import com.mongodb.BulkWriteUpsert; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -84,8 +95,11 @@ import com.mongodb.WriteConcern; import com.mongodb.WriteResult; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.notNull; +import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.Maps.filterValues; +import static com.google.common.collect.Sets.difference; /** * A document store that uses MongoDB as the backend. @@ -162,6 +176,14 @@ public class MongoDocumentStore implements DocumentStore { private long maxLockedQueryTimeMS = Long.getLong("oak.mongo.maxLockedQueryTimeMS", TimeUnit.SECONDS.toMillis(3)); + /** + * The number of documents to put into one bulk update. + *

+ * Default is 30. + */ + private int bulkSize = + Integer.getInteger("oak.mongo.bulkSize", 30); + private String lastReadWriteMode; private final Map metadata; @@ -706,7 +728,7 @@ public class MongoDocumentStore implements DocumentStore { DBCollection dbCollection = getDBCollection(collection); // make sure we don't modify the original updateOp updateOp = updateOp.copy(); - DBObject update = createUpdate(updateOp); + DBObject update = createUpdate(updateOp, false); Lock lock = null; if (collection == Collection.NODES) { @@ -797,13 +819,213 @@ public class MongoDocumentStore implements DocumentStore { return doc; } + /** + * Try to apply all the {@link UpdateOp}s with at least MongoDB requests as + * possible. The return value is the list of the old documents (before + * applying changes). The mechanism is as follows: + * + *

    + *
  1. For each UpdateOp try to read the assigned document from the cache. Add them to {@code oldDocs}.
  2. + *
  3. Prepare a list of all UpdateOps that doesn't have their documents and + * read them in one find() call. Add results to {@code oldDocs}.
  4. + *
  5. Prepare a bulk update. For each remaining UpdateOp add following operation: + * + *
  6. + *
  7. Execute the bulk update.
  8. + *
+ * + * If some other process modifies the target documents between points 2 and 3, the mod_count will + * be increased as well and the bulk update will fail for the concurrently modified docs. The + * method will then remove the failed documents from the {@code oldDocs} and restart the process + * from point 2. It will stop after 3rd iteration. + */ + @SuppressWarnings("unchecked") + @CheckForNull @Override public List createOrUpdate(Collection collection, List updateOps) { - List result = new ArrayList(updateOps.size()); - for (UpdateOp update : updateOps) { - result.add(createOrUpdate(collection, update)); + log("createOrUpdate", updateOps); + + Map operationsToCover = new LinkedHashMap(); + List duplicates = new ArrayList(); + Map results = new LinkedHashMap(); + + final long start = PERFLOG.start(); + try { + for (UpdateOp updateOp : updateOps) { + UpdateUtils.assertUnconditional(updateOp); + UpdateOp clone = updateOp.copy(); + if (operationsToCover.containsKey(updateOp.getId())) { + duplicates.add(clone); + } else { + operationsToCover.put(updateOp.getId(), clone); + } + results.put(clone, null); + } + + Map oldDocs = new HashMap(); + if (collection == Collection.NODES) { + oldDocs.putAll((Map) getCachedNodes(operationsToCover.keySet())); + } + + for (int i = 0; i < 3; i++) { + if (operationsToCover.size() <= 2) { + break; // bulkUpdate() method invokes Mongo twice, so sending 2 updates + // in bulk mode wouldn't result in any performance gain + } + for (List partition : Lists.partition(Lists.newArrayList(operationsToCover.values()), bulkSize)) { + Map successfulUpdates = bulkUpdate(collection, partition, oldDocs); + results.putAll(successfulUpdates); + operationsToCover.values().removeAll(successfulUpdates.keySet()); + } + } + + // if there are some changes left, we'll apply them one after another + Iterator it = Iterators.concat(operationsToCover.values().iterator(), duplicates.iterator()); + while (it.hasNext()) { + UpdateOp op = it.next(); + it.remove(); + T oldDoc = createOrUpdate(collection, op); + if (oldDoc != null) { + results.put(op, oldDoc); + } + } + } finally { + PERFLOG.end(start, 1, "createOrUpdate [{}]"); } - return result; + List resultList = new ArrayList(results.values()); + log("createOrUpdate returns", resultList); + return resultList; + } + + private Map getCachedNodes(Set keys) { + Map nodes = new HashMap(); + for (String key : keys) { + NodeDocument cached = nodesCache.getIfPresent(key); + if (cached != null && cached != NodeDocument.NULL) { + nodes.put(key, cached); + } + } + return nodes; + } + + private Map bulkUpdate(Collection collection, List updateOperations, Map oldDocs) { + Map bulkOperations = createMap(updateOperations); + Set lackingDocs = difference(bulkOperations.keySet(), oldDocs.keySet()); + oldDocs.putAll(findDocuments(collection, lackingDocs)); + + Lock lock = null; + if (collection == Collection.NODES) { + lock = nodeLocks.acquire(bulkOperations.keySet()); + } + + try { + BulkUpdateResult bulkResult = sendBulkUpdate(collection, bulkOperations.values(), oldDocs); + + if (collection == Collection.NODES) { + for (UpdateOp op : filterKeys(bulkOperations, in(bulkResult.upserts)).values()) { + NodeDocument doc = Collection.NODES.newDocument(this); + UpdateUtils.applyChanges(doc, op); + nodesCache.put(doc); + } + + for (String key : difference(bulkOperations.keySet(), bulkResult.failedUpdates)) { + T oldDoc = oldDocs.get(key); + if (oldDoc != null) { + NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, bulkOperations.get(key)); + nodesCache.put(newDoc); + oldDoc.seal(); + } + } + } + oldDocs.keySet().removeAll(bulkResult.failedUpdates); + + Map result = new HashMap(); + for (Entry entry : bulkOperations.entrySet()) { + if (bulkResult.failedUpdates.contains(entry.getKey())) { + continue; + } else if (bulkResult.upserts.contains(entry.getKey())) { + result.put(entry.getValue(), null); + } else { + result.put(entry.getValue(), oldDocs.get(entry.getKey())); + } + } + return result; + } finally { + if (lock != null) { + lock.unlock(); + } + } + } + + private static Map createMap(List updateOps) { + return Maps.uniqueIndex(updateOps, new Function() { + @Override + public String apply(UpdateOp input) { + return input.getId(); + } + }); + } + + private Map findDocuments(Collection collection, Set keys) { + Map docs = new HashMap(); + if (!keys.isEmpty()) { + DBObject[] conditions = new DBObject[keys.size()]; + int i = 0; + for (String key : keys) { + conditions[i++] = getByKeyQuery(key).get(); + } + + QueryBuilder builder = new QueryBuilder(); + builder.or(conditions); + DBCursor cursor = getDBCollection(collection).find(builder.get()); + while (cursor.hasNext()) { + T foundDoc = convertFromDBObject(collection, cursor.next()); + docs.put(foundDoc.getId(), foundDoc); + } + } + return docs; + } + + private BulkUpdateResult sendBulkUpdate(Collection collection, + java.util.Collection updateOps, Map oldDocs) { + DBCollection dbCollection = getDBCollection(collection); + BulkWriteOperation bulk = dbCollection.initializeUnorderedBulkOperation(); + String[] bulkIds = new String[updateOps.size()]; + int i = 0; + for (UpdateOp updateOp : updateOps) { + String id = updateOp.getId(); + QueryBuilder query = createQueryForUpdate(id, updateOp.getConditions()); + T oldDoc = oldDocs.get(id); + DBObject update; + if (oldDoc == null) { + query.not().exists(Document.MOD_COUNT); + update = createUpdate(updateOp, true); + } else { + query.and(Document.MOD_COUNT).is(oldDoc.getModCount()); + update = createUpdate(updateOp, false); + } + bulk.find(query.get()).upsert().update(update); + bulkIds[i++] = id; + } + + BulkWriteResult bulkResult; + Set failedUpdates = new HashSet(); + Set upserts = new HashSet(); + try { + bulkResult = bulk.execute(); + } catch (BulkWriteException e) { + bulkResult = e.getWriteResult(); + for (BulkWriteError err : e.getWriteErrors()) { + failedUpdates.add(bulkIds[err.getIndex()]); + } + } + for (BulkWriteUpsert upsert : bulkResult.getUpserts()) { + upserts.add(bulkIds[upsert.getIndex()]); + } + return new BulkUpdateResult(failedUpdates, upserts); } @Override @@ -903,7 +1125,7 @@ public class MongoDocumentStore implements DocumentStore { QueryBuilder query = QueryBuilder.start(Document.ID).in(keys); // make sure we don't modify the original updateOp updateOp = updateOp.copy(); - DBObject update = createUpdate(updateOp); + DBObject update = createUpdate(updateOp, false); final Stopwatch watch = startWatch(); try { Map cachedDocs = Collections.emptyMap(); @@ -1144,7 +1366,11 @@ public class MongoDocumentStore implements DocumentStore { } @SuppressWarnings("unchecked") T doc = (T) nodesCache.getIfPresent(key); - return doc; + if (doc == NodeDocument.NULL) { + return null; + } else { + return doc; + } } @Nonnull @@ -1175,10 +1401,11 @@ public class MongoDocumentStore implements DocumentStore { * Creates a MongoDB update object from the given UpdateOp. * * @param updateOp the update op. + * @param whether to include the SET id operation * @return the DBObject. */ @Nonnull - private static DBObject createUpdate(UpdateOp updateOp) { + private static DBObject createUpdate(UpdateOp updateOp, boolean includeId) { BasicDBObject setUpdates = new BasicDBObject(); BasicDBObject maxUpdates = new BasicDBObject(); BasicDBObject incUpdates = new BasicDBObject(); @@ -1190,7 +1417,7 @@ public class MongoDocumentStore implements DocumentStore { // other updates for (Entry entry : updateOp.getChanges().entrySet()) { Key k = entry.getKey(); - if (k.getName().equals(Document.ID)) { + if (!includeId && k.getName().equals(Document.ID)) { // avoid exception "Mod on _id not allowed" continue; } @@ -1334,6 +1561,18 @@ public class MongoDocumentStore implements DocumentStore { return diff; } + private static class BulkUpdateResult { + + private final Set failedUpdates; + + private final Set upserts; + + private BulkUpdateResult(Set failedUpdates, Set upserts) { + this.failedUpdates = failedUpdates; + this.upserts = upserts; + } + } + private static class InvalidationResult implements CacheInvalidationStats { int invalidationCount; int upToDateCount;