Index: oak-core/pom.xml =================================================================== --- oak-core/pom.xml (revision 1787146) +++ oak-core/pom.xml (working copy) @@ -38,6 +38,10 @@ maven-bundle-plugin + + com.mongodb*;resolution:=optional, + * + org.apache.jackrabbit.oak, org.apache.jackrabbit.oak.api, @@ -346,5 +350,11 @@ 1.1.1 test + + com.github.fakemongo + fongo + 1.6.8 + test + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (revision 1787146) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java (working copy) @@ -43,6 +43,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.UncheckedExecutionException; @@ -786,7 +787,7 @@ } return oldDoc; } catch (Exception e) { - throw DocumentStoreException.convert(e); + throw handleException(e, collection, updateOp.getId()); } finally { if (lock != null) { lock.unlock(); @@ -886,6 +887,14 @@ results.put(op, oldDoc); } } + } catch (MongoException e) { + throw handleException(e, collection, Iterables.transform(updateOps, + new Function() { + @Override + public String apply(UpdateOp input) { + return input.getId(); + } + })); } finally { stats.doneCreateOrUpdate(watch.elapsed(TimeUnit.NANOSECONDS), collection, Lists.transform(updateOps, new Function() { @@ -1169,12 +1178,7 @@ } } } catch (MongoException e) { - // some documents may still have been updated - // invalidate all documents affected by this update call - for (String k : keys) { - nodesCache.invalidate(k); - } - throw DocumentStoreException.convert(e); + throw handleException(e, collection, keys); } } finally { stats.doneUpdate(watch.elapsed(TimeUnit.NANOSECONDS), collection, keys.size()); @@ -1582,6 +1586,23 @@ return diff; } + private DocumentStoreException handleException(Exception ex, + Collection collection, + Iterable ids) { + if (collection == Collection.NODES) { + for (String id : ids) { + invalidateCache(collection, id); + } + } + return DocumentStoreException.convert(ex); + } + + private DocumentStoreException handleException(Exception ex, + Collection collection, + String id) { + return handleException(ex, collection, Collections.singleton(id)); + } + private static class BulkUpdateResult { private final Set failedUpdates; Index: oak-core/src/test/java/com/mongodb/OakFongo.java =================================================================== --- oak-core/src/test/java/com/mongodb/OakFongo.java (nonexistent) +++ oak-core/src/test/java/com/mongodb/OakFongo.java (working copy) @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; + +import com.github.fakemongo.Fongo; + +public class OakFongo extends Fongo { + + private final Map dbMap; + + public OakFongo(String name) throws Exception { + super(name); + this.dbMap = getDBMap(); + } + + @Override + public FongoDB getDB(String dbname) { + synchronized (dbMap) { + FongoDB fongoDb = dbMap.get(dbname); + if (fongoDb == null) { + try { + fongoDb = new OakFongoDB(this, dbname); + } catch (Exception e) { + throw new MongoException(e.getMessage(), e); + } + dbMap.put(dbname, fongoDb); + } + return fongoDb; + } + } + + @SuppressWarnings("unchecked") + private Map getDBMap() throws Exception { + Field f = Fongo.class.getDeclaredField("dbMap"); + f.setAccessible(true); + return (Map) f.get(this); + } + + protected void beforeInsert(List documents, + InsertOptions insertOptions) {} + + protected void afterInsert(WriteResult result) {} + + protected void beforeFindAndModify(DBObject query, + DBObject fields, + DBObject sort, + boolean remove, + DBObject update, + boolean returnNew, + boolean upsert) {} + + protected void afterFindAndModify(DBObject result) {} + + protected void beforeUpdate(DBObject q, + DBObject o, + boolean upsert, + boolean multi, + WriteConcern concern, + DBEncoder encoder) {} + + protected void afterUpdate(WriteResult result) {} + + protected void beforeRemove(DBObject query, WriteConcern writeConcern) {} + + protected void afterRemove(WriteResult result) {} + + protected void beforeExecuteBulkWriteOperation(boolean ordered, + Boolean bypassDocumentValidation, + List writeRequests, + WriteConcern aWriteConcern) {} + + protected void afterExecuteBulkWriteOperation(BulkWriteResult result) {} + private class OakFongoDB extends FongoDB { + + private final Map collMap; + + public OakFongoDB(Fongo fongo, String name) throws Exception { + super(fongo, name); + this.collMap = getCollMap(); + } + + @SuppressWarnings("unchecked") + private Map getCollMap() throws Exception { + Field f = FongoDB.class.getDeclaredField("collMap"); + f.setAccessible(true); + return (Map) f.get(this); + } + + @Override + public CommandResult command(DBObject cmd, + int options, + ReadPreference readPrefs) { + if (cmd.containsField("buildInfo")) { + CommandResult commandResult = okResult(); + commandResult.append("version", "2.6.0"); + return commandResult; + } else { + return super.command(cmd, options, readPrefs); + } + } + + @Override + public CommandResult command(DBObject cmd, + ReadPreference readPreference, + DBEncoder encoder) { + if (cmd.containsField("serverStatus")) { + CommandResult commandResult = okResult(); + commandResult.append("version", "2.6.0"); + return commandResult; + } else { + return super.command(cmd, readPreference, encoder); + } + } + + @Override + public synchronized FongoDBCollection doGetCollection(String name, + boolean idIsNotUniq) { + if (name.startsWith("system.")) { + return super.doGetCollection(name, idIsNotUniq); + } + FongoDBCollection coll = collMap.get(name); + if (coll == null) { + coll = new OakFongoDBCollection(this, name, idIsNotUniq); + collMap.put(name, coll); + } + return coll; + } + + private String asString(ServerVersion serverVersion) { + StringBuilder sb = new StringBuilder(); + for (int i : serverVersion.getVersionList()) { + if (sb.length() != 0) { + sb.append('.'); + } + sb.append(String.valueOf(i)); + } + return sb.toString(); + } + } + + private class OakFongoDBCollection extends FongoDBCollection { + + public OakFongoDBCollection(FongoDB db, + String name, + boolean idIsNotUniq) { + super(db, name, idIsNotUniq); + } + + @Override + public WriteResult insert(List documents, + WriteConcern concern, + DBEncoder encoder) { + beforeInsert(documents, null); + WriteResult result = super.insert(documents, concern, encoder); + afterInsert(result); + return result; + } + + @Override + public WriteResult remove(DBObject query, WriteConcern writeConcern) { + beforeRemove(query, writeConcern); + WriteResult result = super.remove(query, writeConcern); + afterRemove(result); + return result; + } + + @Override + public WriteResult update(DBObject q, + DBObject o, + boolean upsert, + boolean multi, + WriteConcern concern, + DBEncoder encoder) { + beforeUpdate(q, o, upsert, multi, concern, encoder); + WriteResult result = super.update(q, o, upsert, multi, concern, encoder); + afterUpdate(result); + return result; + } + + @Override + public DBObject findAndModify(DBObject query, + DBObject fields, + DBObject sort, + boolean remove, + DBObject update, + boolean returnNew, + boolean upsert) { + beforeFindAndModify(query, fields, sort, remove, update, returnNew, upsert); + DBObject result = super.findAndModify(query, fields, sort, remove, update, returnNew, upsert); + afterFindAndModify(result); + return result; + } + + @Override + BulkWriteResult executeBulkWriteOperation(boolean ordered, + Boolean bypassDocumentValidation, + List writeRequests, + WriteConcern aWriteConcern) { + beforeExecuteBulkWriteOperation(ordered, bypassDocumentValidation, writeRequests, aWriteConcern); + BulkWriteResult result = super.executeBulkWriteOperation(ordered, bypassDocumentValidation, writeRequests, aWriteConcern); + afterExecuteBulkWriteOperation(result); + return result; + } + } +} Property changes on: oak-core/src/test/java/com/mongodb/OakFongo.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java (nonexistent) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java (working copy) @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import java.util.List; + +import com.github.fakemongo.Fongo; +import com.mongodb.BulkWriteResult; +import com.mongodb.DBObject; +import com.mongodb.MongoException; +import com.mongodb.OakFongo; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; + +import org.apache.jackrabbit.oak.plugins.document.CacheConsistencyTestBase; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider; +import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStoreFixture; +import org.junit.Rule; + +public class MongoCacheConsistencyTest extends CacheConsistencyTestBase { + + @Rule + public DocumentMKBuilderProvider provider = new DocumentMKBuilderProvider(); + + private String exceptionMsg; + + @Override + public DocumentStoreFixture getFixture() throws Exception { + Fongo fongo = new OakFongo("fongo") { + + private String suppressedEx = null; + + @Override + protected void afterInsert(WriteResult result) { + maybeThrow(); + } + + @Override + protected void afterFindAndModify(DBObject result) { + maybeThrow(); + } + + @Override + protected void afterUpdate(WriteResult result) { + maybeThrow(); + } + + @Override + protected void afterRemove(WriteResult result) { + maybeThrow(); + } + + @Override + protected void beforeExecuteBulkWriteOperation(boolean ordered, + Boolean bypassDocumentValidation, + List writeRequests, + WriteConcern aWriteConcern) { + // suppress potentially set exception message because + // fongo bulk writes call other update methods + suppressedEx = exceptionMsg; + exceptionMsg = null; + } + + @Override + protected void afterExecuteBulkWriteOperation(BulkWriteResult result) { + exceptionMsg = suppressedEx; + suppressedEx = null; + maybeThrow(); + } + + private void maybeThrow() { + if (exceptionMsg != null) { + throw new MongoException(exceptionMsg); + } + } + }; + DocumentMK.Builder builder = provider.newBuilder().setAsyncDelay(0); + final DocumentStore store = new MongoDocumentStore(fongo.getDB("oak"), builder); + return new DocumentStoreFixture() { + @Override + public String getName() { + return "MongoDB"; + } + + @Override + public DocumentStore createDocumentStore(int clusterId) { + return store; + } + }; + } + + @Override + public void setTemporaryUpdateException(String msg) { + this.exceptionMsg = msg; + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoCacheConsistencyTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: . =================================================================== --- . (revision 1787146) +++ . (working copy) Property changes on: . ___________________________________________________________________ Modified: svn:mergeinfo ## -0,0 +0,1 ## Merged /jackrabbit/oak/trunk:r1760709,1761866