diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
index 8b4d9b2..d0586f9 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
@@ -21,10 +21,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -39,6 +43,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.mongodb.MongoClientURI;
@@ -72,8 +77,14 @@ import org.apache.jackrabbit.oak.util.PerfLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
+import com.mongodb.BulkWriteError;
+import com.mongodb.BulkWriteException;
+import com.mongodb.BulkWriteOperation;
+import com.mongodb.BulkWriteResult;
+import com.mongodb.BulkWriteUpsert;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
@@ -84,8 +95,11 @@ import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Predicates.in;
import static com.google.common.base.Predicates.notNull;
+import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.Maps.filterValues;
+import static com.google.common.collect.Sets.difference;
/**
* A document store that uses MongoDB as the backend.
@@ -162,6 +176,14 @@ public class MongoDocumentStore implements DocumentStore {
private long maxLockedQueryTimeMS =
Long.getLong("oak.mongo.maxLockedQueryTimeMS", TimeUnit.SECONDS.toMillis(3));
+ /**
+ * The number of documents to put into one bulk update.
+ *
+ * Default is 30.
+ */
+ private int bulkSize =
+ Integer.getInteger("oak.mongo.bulkSize", 30);
+
private String lastReadWriteMode;
private final Map metadata;
@@ -706,7 +728,7 @@ public class MongoDocumentStore implements DocumentStore {
DBCollection dbCollection = getDBCollection(collection);
// make sure we don't modify the original updateOp
updateOp = updateOp.copy();
- DBObject update = createUpdate(updateOp);
+ DBObject update = createUpdate(updateOp, false);
Lock lock = null;
if (collection == Collection.NODES) {
@@ -797,13 +819,213 @@ public class MongoDocumentStore implements DocumentStore {
return doc;
}
+ /**
+ * Try to apply all the {@link UpdateOp}s with at least MongoDB requests as
+ * possible. The return value is the list of the old documents (before
+ * applying changes). The mechanism is as follows:
+ *
+ *
+ * - For each UpdateOp try to read the assigned document from the cache. Add them to {@code oldDocs}.
+ * - Prepare a list of all UpdateOps that doesn't have their documents and
+ * read them in one find() call. Add results to {@code oldDocs}.
+ * - Prepare a bulk update. For each remaining UpdateOp add following operation:
+ *
+ * - Find document with the same id and the same mod_count as in the {@code oldDocs}.
+ * - Apply changes from the UpdateOps.
+ *
+ *
+ * - Execute the bulk update.
+ *
+ *
+ * If some other process modifies the target documents between points 2 and 3, the mod_count will
+ * be increased as well and the bulk update will fail for the concurrently modified docs. The
+ * method will then remove the failed documents from the {@code oldDocs} and restart the process
+ * from point 2. It will stop after 3rd iteration.
+ */
+ @SuppressWarnings("unchecked")
+ @CheckForNull
@Override
public List createOrUpdate(Collection collection, List updateOps) {
- List result = new ArrayList(updateOps.size());
- for (UpdateOp update : updateOps) {
- result.add(createOrUpdate(collection, update));
+ log("createOrUpdate", updateOps);
+
+ Map operationsToCover = new LinkedHashMap();
+ List duplicates = new ArrayList();
+ Map results = new LinkedHashMap();
+
+ final long start = PERFLOG.start();
+ try {
+ for (UpdateOp updateOp : updateOps) {
+ UpdateUtils.assertUnconditional(updateOp);
+ UpdateOp clone = updateOp.copy();
+ if (operationsToCover.containsKey(updateOp.getId())) {
+ duplicates.add(clone);
+ } else {
+ operationsToCover.put(updateOp.getId(), clone);
+ }
+ results.put(clone, null);
+ }
+
+ Map oldDocs = new HashMap();
+ if (collection == Collection.NODES) {
+ oldDocs.putAll((Map) getCachedNodes(operationsToCover.keySet()));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ if (operationsToCover.size() <= 2) {
+ break; // bulkUpdate() method invokes Mongo twice, so sending 2 updates
+ // in bulk mode wouldn't result in any performance gain
+ }
+ for (List partition : Lists.partition(Lists.newArrayList(operationsToCover.values()), bulkSize)) {
+ Map successfulUpdates = bulkUpdate(collection, partition, oldDocs);
+ results.putAll(successfulUpdates);
+ operationsToCover.values().removeAll(successfulUpdates.keySet());
+ }
+ }
+
+ // if there are some changes left, we'll apply them one after another
+ Iterator it = Iterators.concat(operationsToCover.values().iterator(), duplicates.iterator());
+ while (it.hasNext()) {
+ UpdateOp op = it.next();
+ it.remove();
+ T oldDoc = createOrUpdate(collection, op);
+ if (oldDoc != null) {
+ results.put(op, oldDoc);
+ }
+ }
+ } finally {
+ PERFLOG.end(start, 1, "createOrUpdate [{}]");
}
- return result;
+ List resultList = new ArrayList(results.values());
+ log("createOrUpdate returns", resultList);
+ return resultList;
+ }
+
+ private Map getCachedNodes(Set keys) {
+ Map nodes = new HashMap();
+ for (String key : keys) {
+ NodeDocument cached = nodesCache.getIfPresent(key);
+ if (cached != null && cached != NodeDocument.NULL) {
+ nodes.put(key, cached);
+ }
+ }
+ return nodes;
+ }
+
+ private Map bulkUpdate(Collection collection, List updateOperations, Map oldDocs) {
+ Map bulkOperations = createMap(updateOperations);
+ Set lackingDocs = difference(bulkOperations.keySet(), oldDocs.keySet());
+ oldDocs.putAll(findDocuments(collection, lackingDocs));
+
+ Lock lock = null;
+ if (collection == Collection.NODES) {
+ lock = nodeLocks.acquire(bulkOperations.keySet());
+ }
+
+ try {
+ BulkUpdateResult bulkResult = sendBulkUpdate(collection, bulkOperations.values(), oldDocs);
+
+ if (collection == Collection.NODES) {
+ for (UpdateOp op : filterKeys(bulkOperations, in(bulkResult.upserts)).values()) {
+ NodeDocument doc = Collection.NODES.newDocument(this);
+ UpdateUtils.applyChanges(doc, op);
+ nodesCache.put(doc);
+ }
+
+ for (String key : difference(bulkOperations.keySet(), bulkResult.failedUpdates)) {
+ T oldDoc = oldDocs.get(key);
+ if (oldDoc != null) {
+ NodeDocument newDoc = (NodeDocument) applyChanges(collection, oldDoc, bulkOperations.get(key));
+ nodesCache.put(newDoc);
+ oldDoc.seal();
+ }
+ }
+ }
+ oldDocs.keySet().removeAll(bulkResult.failedUpdates);
+
+ Map result = new HashMap();
+ for (Entry entry : bulkOperations.entrySet()) {
+ if (bulkResult.failedUpdates.contains(entry.getKey())) {
+ continue;
+ } else if (bulkResult.upserts.contains(entry.getKey())) {
+ result.put(entry.getValue(), null);
+ } else {
+ result.put(entry.getValue(), oldDocs.get(entry.getKey()));
+ }
+ }
+ return result;
+ } finally {
+ if (lock != null) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static Map createMap(List updateOps) {
+ return Maps.uniqueIndex(updateOps, new Function() {
+ @Override
+ public String apply(UpdateOp input) {
+ return input.getId();
+ }
+ });
+ }
+
+ private Map findDocuments(Collection collection, Set keys) {
+ Map docs = new HashMap();
+ if (!keys.isEmpty()) {
+ DBObject[] conditions = new DBObject[keys.size()];
+ int i = 0;
+ for (String key : keys) {
+ conditions[i++] = getByKeyQuery(key).get();
+ }
+
+ QueryBuilder builder = new QueryBuilder();
+ builder.or(conditions);
+ DBCursor cursor = getDBCollection(collection).find(builder.get());
+ while (cursor.hasNext()) {
+ T foundDoc = convertFromDBObject(collection, cursor.next());
+ docs.put(foundDoc.getId(), foundDoc);
+ }
+ }
+ return docs;
+ }
+
+ private BulkUpdateResult sendBulkUpdate(Collection collection,
+ java.util.Collection updateOps, Map oldDocs) {
+ DBCollection dbCollection = getDBCollection(collection);
+ BulkWriteOperation bulk = dbCollection.initializeUnorderedBulkOperation();
+ String[] bulkIds = new String[updateOps.size()];
+ int i = 0;
+ for (UpdateOp updateOp : updateOps) {
+ String id = updateOp.getId();
+ QueryBuilder query = createQueryForUpdate(id, updateOp.getConditions());
+ T oldDoc = oldDocs.get(id);
+ DBObject update;
+ if (oldDoc == null) {
+ query.not().exists(Document.MOD_COUNT);
+ update = createUpdate(updateOp, true);
+ } else {
+ query.and(Document.MOD_COUNT).is(oldDoc.getModCount());
+ update = createUpdate(updateOp, false);
+ }
+ bulk.find(query.get()).upsert().update(update);
+ bulkIds[i++] = id;
+ }
+
+ BulkWriteResult bulkResult;
+ Set failedUpdates = new HashSet();
+ Set upserts = new HashSet();
+ try {
+ bulkResult = bulk.execute();
+ } catch (BulkWriteException e) {
+ bulkResult = e.getWriteResult();
+ for (BulkWriteError err : e.getWriteErrors()) {
+ failedUpdates.add(bulkIds[err.getIndex()]);
+ }
+ }
+ for (BulkWriteUpsert upsert : bulkResult.getUpserts()) {
+ upserts.add(bulkIds[upsert.getIndex()]);
+ }
+ return new BulkUpdateResult(failedUpdates, upserts);
}
@Override
@@ -903,7 +1125,7 @@ public class MongoDocumentStore implements DocumentStore {
QueryBuilder query = QueryBuilder.start(Document.ID).in(keys);
// make sure we don't modify the original updateOp
updateOp = updateOp.copy();
- DBObject update = createUpdate(updateOp);
+ DBObject update = createUpdate(updateOp, false);
final Stopwatch watch = startWatch();
try {
Map cachedDocs = Collections.emptyMap();
@@ -1144,7 +1366,11 @@ public class MongoDocumentStore implements DocumentStore {
}
@SuppressWarnings("unchecked")
T doc = (T) nodesCache.getIfPresent(key);
- return doc;
+ if (doc == NodeDocument.NULL) {
+ return null;
+ } else {
+ return doc;
+ }
}
@Nonnull
@@ -1175,10 +1401,11 @@ public class MongoDocumentStore implements DocumentStore {
* Creates a MongoDB update object from the given UpdateOp.
*
* @param updateOp the update op.
+ * @param whether to include the SET id operation
* @return the DBObject.
*/
@Nonnull
- private static DBObject createUpdate(UpdateOp updateOp) {
+ private static DBObject createUpdate(UpdateOp updateOp, boolean includeId) {
BasicDBObject setUpdates = new BasicDBObject();
BasicDBObject maxUpdates = new BasicDBObject();
BasicDBObject incUpdates = new BasicDBObject();
@@ -1190,7 +1417,7 @@ public class MongoDocumentStore implements DocumentStore {
// other updates
for (Entry entry : updateOp.getChanges().entrySet()) {
Key k = entry.getKey();
- if (k.getName().equals(Document.ID)) {
+ if (!includeId && k.getName().equals(Document.ID)) {
// avoid exception "Mod on _id not allowed"
continue;
}
@@ -1334,6 +1561,18 @@ public class MongoDocumentStore implements DocumentStore {
return diff;
}
+ private static class BulkUpdateResult {
+
+ private final Set failedUpdates;
+
+ private final Set upserts;
+
+ private BulkUpdateResult(Set failedUpdates, Set upserts) {
+ this.failedUpdates = failedUpdates;
+ this.upserts = upserts;
+ }
+ }
+
private static class InvalidationResult implements CacheInvalidationStats {
int invalidationCount;
int upToDateCount;