Index: oak-core/pom.xml
===================================================================
--- oak-core/pom.xml (revision 1787146)
+++ oak-core/pom.xml (working copy)
@@ -38,6 +38,10 @@
maven-bundle-plugin
+
+ com.mongodb*;resolution:=optional,
+ *
+
org.apache.jackrabbit.oak,
org.apache.jackrabbit.oak.api,
@@ -346,5 +350,11 @@
1.1.1
test
+
+ com.github.fakemongo
+ fongo
+ 1.6.8
+ test
+
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (revision 1787146)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (working copy)
@@ -43,6 +43,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
@@ -786,7 +787,7 @@
}
return oldDoc;
} catch (Exception e) {
- throw DocumentStoreException.convert(e);
+ throw handleException(e, collection, updateOp.getId());
} finally {
if (lock != null) {
lock.unlock();
@@ -886,6 +887,14 @@
results.put(op, oldDoc);
}
}
+ } catch (MongoException e) {
+ throw handleException(e, collection, Iterables.transform(updateOps,
+ new Function() {
+ @Override
+ public String apply(UpdateOp input) {
+ return input.getId();
+ }
+ }));
} finally {
stats.doneCreateOrUpdate(watch.elapsed(TimeUnit.NANOSECONDS),
collection, Lists.transform(updateOps, new Function() {
@@ -1169,12 +1178,7 @@
}
}
} catch (MongoException e) {
- // some documents may still have been updated
- // invalidate all documents affected by this update call
- for (String k : keys) {
- nodesCache.invalidate(k);
- }
- throw DocumentStoreException.convert(e);
+ throw handleException(e, collection, keys);
}
} finally {
stats.doneUpdate(watch.elapsed(TimeUnit.NANOSECONDS), collection, keys.size());
@@ -1582,6 +1586,23 @@
return diff;
}
+ private DocumentStoreException handleException(Exception ex,
+ Collection collection,
+ Iterable ids) {
+ if (collection == Collection.NODES) {
+ for (String id : ids) {
+ invalidateCache(collection, id);
+ }
+ }
+ return DocumentStoreException.convert(ex);
+ }
+
+ private DocumentStoreException handleException(Exception ex,
+ Collection collection,
+ String id) {
+ return handleException(ex, collection, Collections.singleton(id));
+ }
+
private static class BulkUpdateResult {
private final Set failedUpdates;
Index: oak-core/src/test/java/com/mongodb/OakFongo.java
===================================================================
--- oak-core/src/test/java/com/mongodb/OakFongo.java (nonexistent)
+++ oak-core/src/test/java/com/mongodb/OakFongo.java (working copy)
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+
+import com.github.fakemongo.Fongo;
+
+public class OakFongo extends Fongo {
+
+ private final Map dbMap;
+
+ public OakFongo(String name) throws Exception {
+ super(name);
+ this.dbMap = getDBMap();
+ }
+
+ @Override
+ public FongoDB getDB(String dbname) {
+ synchronized (dbMap) {
+ FongoDB fongoDb = dbMap.get(dbname);
+ if (fongoDb == null) {
+ try {
+ fongoDb = new OakFongoDB(this, dbname);
+ } catch (Exception e) {
+ throw new MongoException(e.getMessage(), e);
+ }
+ dbMap.put(dbname, fongoDb);
+ }
+ return fongoDb;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map getDBMap() throws Exception {
+ Field f = Fongo.class.getDeclaredField("dbMap");
+ f.setAccessible(true);
+ return (Map) f.get(this);
+ }
+
+ protected void beforeInsert(List extends DBObject> documents,
+ InsertOptions insertOptions) {}
+
+ protected void afterInsert(WriteResult result) {}
+
+ protected void beforeFindAndModify(DBObject query,
+ DBObject fields,
+ DBObject sort,
+ boolean remove,
+ DBObject update,
+ boolean returnNew,
+ boolean upsert) {}
+
+ protected void afterFindAndModify(DBObject result) {}
+
+ protected void beforeUpdate(DBObject q,
+ DBObject o,
+ boolean upsert,
+ boolean multi,
+ WriteConcern concern,
+ DBEncoder encoder) {}
+
+ protected void afterUpdate(WriteResult result) {}
+
+ protected void beforeRemove(DBObject query, WriteConcern writeConcern) {}
+
+ protected void afterRemove(WriteResult result) {}
+
+ protected void beforeExecuteBulkWriteOperation(boolean ordered,
+ Boolean bypassDocumentValidation,
+ List> writeRequests,
+ WriteConcern aWriteConcern) {}
+
+ protected void afterExecuteBulkWriteOperation(BulkWriteResult result) {}
+ private class OakFongoDB extends FongoDB {
+
+ private final Map collMap;
+
+ public OakFongoDB(Fongo fongo, String name) throws Exception {
+ super(fongo, name);
+ this.collMap = getCollMap();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map getCollMap() throws Exception {
+ Field f = FongoDB.class.getDeclaredField("collMap");
+ f.setAccessible(true);
+ return (Map) f.get(this);
+ }
+
+ @Override
+ public CommandResult command(DBObject cmd,
+ int options,
+ ReadPreference readPrefs) {
+ if (cmd.containsField("buildInfo")) {
+ CommandResult commandResult = okResult();
+ commandResult.append("version", "2.6.0");
+ return commandResult;
+ } else {
+ return super.command(cmd, options, readPrefs);
+ }
+ }
+
+ @Override
+ public CommandResult command(DBObject cmd,
+ ReadPreference readPreference,
+ DBEncoder encoder) {
+ if (cmd.containsField("serverStatus")) {
+ CommandResult commandResult = okResult();
+ commandResult.append("version", "2.6.0");
+ return commandResult;
+ } else {
+ return super.command(cmd, readPreference, encoder);
+ }
+ }
+
+ @Override
+ public synchronized FongoDBCollection doGetCollection(String name,
+ boolean idIsNotUniq) {
+ if (name.startsWith("system.")) {
+ return super.doGetCollection(name, idIsNotUniq);
+ }
+ FongoDBCollection coll = collMap.get(name);
+ if (coll == null) {
+ coll = new OakFongoDBCollection(this, name, idIsNotUniq);
+ collMap.put(name, coll);
+ }
+ return coll;
+ }
+
+ private String asString(ServerVersion serverVersion) {
+ StringBuilder sb = new StringBuilder();
+ for (int i : serverVersion.getVersionList()) {
+ if (sb.length() != 0) {
+ sb.append('.');
+ }
+ sb.append(String.valueOf(i));
+ }
+ return sb.toString();
+ }
+ }
+
+ private class OakFongoDBCollection extends FongoDBCollection {
+
+ public OakFongoDBCollection(FongoDB db,
+ String name,
+ boolean idIsNotUniq) {
+ super(db, name, idIsNotUniq);
+ }
+
+ @Override
+ public WriteResult insert(List documents,
+ WriteConcern concern,
+ DBEncoder encoder) {
+ beforeInsert(documents, null);
+ WriteResult result = super.insert(documents, concern, encoder);
+ afterInsert(result);
+ return result;
+ }
+
+ @Override
+ public WriteResult remove(DBObject query, WriteConcern writeConcern) {
+ beforeRemove(query, writeConcern);
+ WriteResult result = super.remove(query, writeConcern);
+ afterRemove(result);
+ return result;
+ }
+
+ @Override
+ public WriteResult update(DBObject q,
+ DBObject o,
+ boolean upsert,
+ boolean multi,
+ WriteConcern concern,
+ DBEncoder encoder) {
+ beforeUpdate(q, o, upsert, multi, concern, encoder);
+ WriteResult result = super.update(q, o, upsert, multi, concern, encoder);
+ afterUpdate(result);
+ return result;
+ }
+
+ @Override
+ public DBObject findAndModify(DBObject query,
+ DBObject fields,
+ DBObject sort,
+ boolean remove,
+ DBObject update,
+ boolean returnNew,
+ boolean upsert) {
+ beforeFindAndModify(query, fields, sort, remove, update, returnNew, upsert);
+ DBObject result = super.findAndModify(query, fields, sort, remove, update, returnNew, upsert);
+ afterFindAndModify(result);
+ return result;
+ }
+
+ @Override
+ BulkWriteResult executeBulkWriteOperation(boolean ordered,
+ Boolean bypassDocumentValidation,
+ List writeRequests,
+ WriteConcern aWriteConcern) {
+ beforeExecuteBulkWriteOperation(ordered, bypassDocumentValidation, writeRequests, aWriteConcern);
+ BulkWriteResult result = super.executeBulkWriteOperation(ordered, bypassDocumentValidation, writeRequests, aWriteConcern);
+ afterExecuteBulkWriteOperation(result);
+ return result;
+ }
+ }
+}
Property changes on: oak-core/src/test/java/com/mongodb/OakFongo.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java (nonexistent)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java (working copy)
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+
+import com.github.fakemongo.Fongo;
+import com.mongodb.BulkWriteResult;
+import com.mongodb.DBObject;
+import com.mongodb.MongoException;
+import com.mongodb.OakFongo;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+
+import org.apache.jackrabbit.oak.plugins.document.CacheConsistencyTestBase;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture;
+import org.junit.Rule;
+
+public class MongoCacheConsistencyTest extends CacheConsistencyTestBase {
+
+ @Rule
+ public DocumentMKBuilderProvider provider = new DocumentMKBuilderProvider();
+
+ private String exceptionMsg;
+
+ @Override
+ public DocumentStoreFixture getFixture() throws Exception {
+ Fongo fongo = new OakFongo("fongo") {
+
+ private String suppressedEx = null;
+
+ @Override
+ protected void afterInsert(WriteResult result) {
+ maybeThrow();
+ }
+
+ @Override
+ protected void afterFindAndModify(DBObject result) {
+ maybeThrow();
+ }
+
+ @Override
+ protected void afterUpdate(WriteResult result) {
+ maybeThrow();
+ }
+
+ @Override
+ protected void afterRemove(WriteResult result) {
+ maybeThrow();
+ }
+
+ @Override
+ protected void beforeExecuteBulkWriteOperation(boolean ordered,
+ Boolean bypassDocumentValidation,
+ List> writeRequests,
+ WriteConcern aWriteConcern) {
+ // suppress potentially set exception message because
+ // fongo bulk writes call other update methods
+ suppressedEx = exceptionMsg;
+ exceptionMsg = null;
+ }
+
+ @Override
+ protected void afterExecuteBulkWriteOperation(BulkWriteResult result) {
+ exceptionMsg = suppressedEx;
+ suppressedEx = null;
+ maybeThrow();
+ }
+
+ private void maybeThrow() {
+ if (exceptionMsg != null) {
+ throw new MongoException(exceptionMsg);
+ }
+ }
+ };
+ DocumentMK.Builder builder = provider.newBuilder().setAsyncDelay(0);
+ final DocumentStore store = new MongoDocumentStore(fongo.getDB("oak"), builder);
+ return new DocumentStoreFixture() {
+ @Override
+ public String getName() {
+ return "MongoDB";
+ }
+
+ @Override
+ public DocumentStore createDocumentStore(int clusterId) {
+ return store;
+ }
+ };
+ }
+
+ @Override
+ public void setTemporaryUpdateException(String msg) {
+ this.exceptionMsg = msg;
+ }
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: .
===================================================================
--- . (revision 1787146)
+++ . (working copy)
Property changes on: .
___________________________________________________________________
Modified: svn:mergeinfo
## -0,0 +0,1 ##
Merged /jackrabbit/oak/trunk:r1760709,1761866