Index: oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java =================================================================== --- oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (revision 1439852) +++ oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.mongomk.impl.command; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,11 +40,13 @@ import org.apache.jackrabbit.mongomk.impl.model.MongoCommit; import org.apache.jackrabbit.mongomk.impl.model.MongoNode; import org.apache.jackrabbit.mongomk.impl.model.MongoSync; +import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; +import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.QueryBuilder; import com.mongodb.WriteConcern; @@ -326,45 +329,81 @@ return true; } - long assumedHeadRevision = this.mongoSync.getHeadRevisionId(); + long assumedHeadRevision = mongoSync.getHeadRevisionId(); MongoSync mongoSync = new SaveAndSetHeadRevisionAction(nodeStore, assumedHeadRevision, revisionId).execute(); - if (mongoSync == null) { - // There have been commit(s) in the meantime. If it's a conflicting - // update, retry the whole operation and count against number of retries. - // If not, need to retry again (in order to write commits and nodes properly) - // but don't count these retries against number of retries. - if (conflictingCommitsExist(assumedHeadRevision)) { - String message = String.format("Commit @%s: failed due to a conflicting commit." - + " Affected paths: %s", revisionId, commit.getAffectedPaths()); - logger.warn(message); - markAsFailed(); - throw new ConflictingCommitException(message); - } else { - logger.info("Commit @{}: failed due to a concurrent commit." + " Affected paths: {}", revisionId, commit.getAffectedPaths()); - markAsFailed(); - return false; - } + if (mongoSync == null) { // There have been concurrent commit(s). + handleConflictingCommits(assumedHeadRevision); + return false; } return true; } - private boolean conflictingCommitsExist(long baseRevisionId) { + private void handleConflictingCommits(long baseRevisionId) throws Exception { QueryBuilder queryBuilder = QueryBuilder.start(MongoCommit.KEY_FAILED).notEquals(Boolean.TRUE) .and(MongoCommit.KEY_BASE_REVISION_ID).is(baseRevisionId) .and(MongoCommit.KEY_REVISION_ID).greaterThan(0L) .and(MongoCommit.KEY_REVISION_ID).notEquals(revisionId); DBObject query = queryBuilder.get(); DBCollection collection = nodeStore.getCommitCollection(); - MongoCommit conflictingCommit = (MongoCommit)collection.findOne(query); - for (String affectedPath : conflictingCommit.getAffectedPaths()) { - if (affectedPaths.contains(affectedPath)) { - return true; + DBCursor dbCursor = collection.find(query); + while (dbCursor.hasNext()) { + MongoCommit conflictingCommit = (MongoCommit)dbCursor.next(); + for (String affectedPath : conflictingCommit.getAffectedPaths()) { + if (affectedPaths.contains(affectedPath)) { + // Conflicting concurrent commit. Retry the commit and count + // against number of retries. + String message = String.format("Commit @%s failed due to a concurrent conflicting Commit @%s" + + " Affected paths: %s", revisionId, conflictingCommit.getRevisionId(), commit.getAffectedPaths()); + logger.warn(message); + logger.debug("Commit @{} affected paths: {}. Commit @{} affected paths: {}", + new Object[]{revisionId, affectedPaths, conflictingCommit.getRevisionId(), conflictingCommit.getAffectedPaths()}); + cleanupCommitAndNodes(); //markAsFailed(); + throw new ConflictingCommitException(message); + } } } - return false; + // Non-conflicting concurrent commit. Retry the commit in order to write + // commit and nodes with proper revision/baseRevision ids but don't count + // against the number of retries. + logger.info("Commit @{} failed due to a concurrent but non-conflicting commit." + + " Affected paths: {}", revisionId, commit.getAffectedPaths()); + cleanupCommitAndNodes(); //markAsFailed(); } + private void cleanupCommitAndNodes() throws Exception { + + logger.debug("Cleaning up commit and nodes related to Commit @{}", revisionId); + + // Clean up the commit. + DBCollection collection = nodeStore.getCommitCollection(); + DBObject query = QueryBuilder.start("_id").is(commit.getObjectId("_id")).get(); + + WriteResult writeResult = collection.remove(query, WriteConcern.SAFE); + if (writeResult.getError() != null) { + throw new Exception(String.format("Remove wasn't successful: %s", writeResult)); + } + + // Collect ids for the nodes + Collection nodesCollection = nodes.values(); + int nodesSize = nodesCollection.size(); + DBObject[] nodesArray = nodesCollection.toArray(new DBObject[nodesSize]); + ObjectId[] objectIds = new ObjectId[nodesSize]; + for (int i = 0; i < nodesSize; i++) { + objectIds[i] = (ObjectId)nodesArray[i].get("_id"); + } + + // Clean up nodes. + collection = nodeStore.getNodeCollection(); + query = QueryBuilder.start("_id").in(objectIds).get(); + + writeResult = collection.remove(query, WriteConcern.SAFE); + if (writeResult.getError() != null) { + throw new Exception(String.format("Remove wasn't successful: %s", writeResult)); + } + } + + // FIXME - Remove if not used private void markAsFailed() throws Exception { DBCollection commitCollection = nodeStore.getCommitCollection(); DBObject query = QueryBuilder.start("_id").is(commit.getObjectId("_id")).get();