Index: src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java =================================================================== --- src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (revision 1439323) +++ src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (working copy) @@ -97,10 +97,9 @@ prepareCommit(); readAndMergeExistingNodes(); prepareMongoNodes(); - new SaveNodesAction(nodeStore, nodes.values()).execute(); - new SaveCommitAction(nodeStore, commit).execute(); - success = saveAndSetHeadRevision(); + success = saveNodesAndCommits(); if (success) { + saveAndSetHeadRevision(); cacheNodes(); } else { retries++; @@ -109,12 +108,23 @@ String msg = "Commit @{}: success"; if (retries > 0) { - msg += " with {} retries."; + msg += " with {} internal retries."; } logger.debug(msg, revisionId, retries); return revisionId; } + private boolean saveNodesAndCommits() throws Exception { + long headRevisionId = new FetchHeadRevisionIdAction(nodeStore, branchId).execute(); + if (branchId == null && headRevisionId != mongoSync.getHeadRevisionId()) { + // Head revision moved on in trunk in the meantime, no need to save + return false; + } + new SaveNodesAction(nodeStore, nodes.values()).execute(); + new SaveCommitAction(nodeStore, commit).execute(); + return true; + } + @Override public int getNumOfRetries() { return 100; @@ -184,7 +194,6 @@ } } - // private void readExistingNodes() { // FetchNodesAction action = new FetchNodesAction(nodeStore, affectedPaths, // mongoSync.getHeadRevisionId()); @@ -309,36 +318,23 @@ * @return True if the operation was successful. * @throws Exception If an exception happens. */ - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { // Don't update the head revision id for branches. if (branchId != null) { - return true; + return; } long assumedHeadRevision = this.mongoSync.getHeadRevisionId(); MongoSync mongoSync = new SaveAndSetHeadRevisionAction(nodeStore, assumedHeadRevision, revisionId).execute(); if (mongoSync == null) { - // There have been commit(s) in the meantime. If it's a conflicting - // update, retry the whole operation and count against number of retries. - // If not, need to retry again (in order to write commits and nodes properly) - // but don't count these retries against number of retries. - if (conflictingCommitsExist(assumedHeadRevision)) { - String message = String.format("Commit @%s: failed due to a conflicting commit." - + " Affected paths: %s", revisionId, commit.getAffectedPaths()); - logger.warn(message); - markAsFailed(); - throw new ConflictingCommitException(message); - } else { - logger.info("Commit @{}: failed due to a concurrent commit." + " Affected paths: {}", revisionId, commit.getAffectedPaths()); - markAsFailed(); - return false; - } + logger.debug("Commit @{} encountered a concurrent commit", revisionId); + checkConflictingCommit(assumedHeadRevision); } - return true; } - private boolean conflictingCommitsExist(long baseRevisionId) { + private void checkConflictingCommit(long baseRevisionId) throws Exception { + // Find the commit that's based on the same revision. QueryBuilder queryBuilder = QueryBuilder.start(MongoCommit.KEY_FAILED).notEquals(Boolean.TRUE) .and(MongoCommit.KEY_BASE_REVISION_ID).is(baseRevisionId) .and(MongoCommit.KEY_REVISION_ID).greaterThan(0L) @@ -346,12 +342,47 @@ DBObject query = queryBuilder.get(); DBCollection collection = nodeStore.getCommitCollection(); MongoCommit conflictingCommit = (MongoCommit)collection.findOne(query); + long conflictingRevision = conflictingCommit.getRevisionId(); + + logger.debug("Commit @{} checking against concurrent Commit @{}", revisionId, conflictingRevision); for (String affectedPath : conflictingCommit.getAffectedPaths()) { if (affectedPaths.contains(affectedPath)) { - return true; + String message = String.format("Commit @%s failed due to concurrent conflicting Commit @%s", + revisionId, conflictingRevision); + logger.warn(message); + logger.debug("Commit @{} affected paths: {}. Commit @{} affected paths: ", + new Object[]{revisionId, commit.getAffectedPaths(), conflictingRevision, + conflictingCommit.getAffectedPaths()}); + markAsFailed(); + throw new ConflictingCommitException(message); } } - return false; + + logger.debug("Commit @{} and concurrent Commit @{} are not conflicting", + revisionId, conflictingRevision); + //adjustBaseRevisionId(conflictingCommit, baseRevisionId, revisionId); + if (revisionId < conflictingRevision) { + adjustBaseRevisionId(conflictingCommit, baseRevisionId, revisionId); + } else { + adjustBaseRevisionId(commit, baseRevisionId, conflictingRevision); + } + } + + private void adjustBaseRevisionId(MongoCommit commit, long oldBaseRevId, + long newBaseRevId) throws Exception { + DBCollection commitCollection = nodeStore.getCommitCollection(); + DBObject query = QueryBuilder.start("_id").is(commit.getObjectId("_id")).get(); + DBObject update = new BasicDBObject("$set", new BasicDBObject(MongoCommit.KEY_BASE_REVISION_ID, + newBaseRevId)); + WriteResult writeResult = commitCollection.update(query, update, + false /*upsert*/, false /*multi*/, WriteConcern.SAFE); + logger.debug("Adjusting Commit @{}'s base revision id from {} to {}", + new Object[] {commit.getRevisionId(), oldBaseRevId, newBaseRevId}); + nodeStore.evict(commit); + if (writeResult.getError() != null) { + // FIXME This is potentially a bug that we need to handle. + throw new Exception(String.format("Update wasn't successful: %s", writeResult)); + } } private void markAsFailed() throws Exception { Index: src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java (revision 1439293) +++ src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java (working copy) @@ -52,16 +52,14 @@ "This is a concurrent commit"); CommitCommand command = new CommitCommand(getNodeStore(), commit) { @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { synchronized (waitLock) { waitLock.wait(); } - - return super.saveAndSetHeadRevision(); + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } }; }; Index: src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (revision 1439293) +++ src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (working copy) @@ -205,13 +205,12 @@ } @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { latch.await(); - return super.saveAndSetHeadRevision(); + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } } } @@ -230,13 +229,11 @@ } @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { - boolean result = super.saveAndSetHeadRevision(); - return result; + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } finally { latch.countDown(); }