Index: oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java =================================================================== --- oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (revision 1439798) +++ oak-mongomk/src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (working copy) @@ -99,12 +99,10 @@ prepareMongoNodes(); success = saveNodesAndCommits(); if (success) { - success = saveAndSetHeadRevision(); - if (success) { - cacheNodes(); - } else { - retries++; - } + saveAndSetHeadRevision(); + cacheNodes(); + } else { + retries++; } } while (!success); @@ -320,49 +318,97 @@ * @return True if the operation was successful. * @throws Exception If an exception happens. */ - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { // Don't update the head revision id for branches. if (branchId != null) { - return true; + return; } long assumedHeadRevision = this.mongoSync.getHeadRevisionId(); MongoSync mongoSync = new SaveAndSetHeadRevisionAction(nodeStore, assumedHeadRevision, revisionId).execute(); if (mongoSync == null) { - // There have been commit(s) in the meantime. If it's a conflicting - // update, retry the whole operation and count against number of retries. - // If not, need to retry again (in order to write commits and nodes properly) - // but don't count these retries against number of retries. - if (conflictingCommitsExist(assumedHeadRevision)) { - String message = String.format("Commit @%s: failed due to a conflicting commit." - + " Affected paths: %s", revisionId, commit.getAffectedPaths()); - logger.warn(message); - markAsFailed(); - throw new ConflictingCommitException(message); - } else { - logger.info("Commit @{}: failed due to a concurrent commit." + " Affected paths: {}", revisionId, commit.getAffectedPaths()); - markAsFailed(); - return false; - } + logger.debug("Commit @{} encountered a concurrent commit", revisionId); + checkConflictingCommit(assumedHeadRevision); } - return true; } - private boolean conflictingCommitsExist(long baseRevisionId) { + private void checkConflictingCommit(long baseRevisionId) throws Exception { + // Find the commit that's based on the same revision. QueryBuilder queryBuilder = QueryBuilder.start(MongoCommit.KEY_FAILED).notEquals(Boolean.TRUE) .and(MongoCommit.KEY_BASE_REVISION_ID).is(baseRevisionId) .and(MongoCommit.KEY_REVISION_ID).greaterThan(0L) .and(MongoCommit.KEY_REVISION_ID).notEquals(revisionId); DBObject query = queryBuilder.get(); DBCollection collection = nodeStore.getCommitCollection(); + // FIXME - OAK-585: There could be multiple commits with the same base revision + // In that case, adjusting base revision ids get more complicated. MongoCommit conflictingCommit = (MongoCommit)collection.findOne(query); + long conflictingRevision = conflictingCommit.getRevisionId(); + + logger.debug("Commit @{} checking whether it conflicts with concurrent Commit @{}", revisionId, conflictingRevision); + logger.debug("Commit @{} affected paths: {}. Commit @{} affected paths: ", + new Object[]{revisionId, commit.getAffectedPaths(), conflictingRevision, + conflictingCommit.getAffectedPaths()}); + for (String affectedPath : conflictingCommit.getAffectedPaths()) { if (affectedPaths.contains(affectedPath)) { - return true; + String message = String.format("Commit @%s conflicts with concurrent Commit @%s at path %s", + revisionId, conflictingRevision, affectedPath); + logger.warn(message); + markAsFailed(); + throw new ConflictingCommitException(message); } } - return false; + + // FIXME - OAK-585: What if these commits do not conflict but a subsequent + // commit based on conflictingRevision does actually conflict with revisionId? + logger.debug("Commit @{} and concurrent Commit @{} do not conflict. Adjusting base revisions.", + revisionId, conflictingRevision); + + if (revisionId < conflictingRevision) { + adjustBaseRevisionId(conflictingCommit, commit, baseRevisionId, revisionId); + } else { + adjustBaseRevisionId(commit, conflictingCommit, baseRevisionId, conflictingRevision); + } + } + + private void adjustBaseRevisionId(MongoCommit highRevCommit, MongoCommit lowRevCommit, + long oldBaseRevId, long newBaseRevId) throws Exception { + + // Adjust baseRevId of commit based on lowRevCommit + QueryBuilder queryBuilder = QueryBuilder.start(MongoCommit.KEY_FAILED).notEquals(Boolean.TRUE) + .and(MongoCommit.KEY_BASE_REVISION_ID).is(lowRevCommit.getRevisionId()); + DBObject query = queryBuilder.get(); + DBObject update = new BasicDBObject("$set", new BasicDBObject(MongoCommit.KEY_BASE_REVISION_ID, + highRevCommit.getRevisionId())); + DBCollection collection = nodeStore.getCommitCollection(); + // FIXME - OAK-585: The problem here is that there might be a commit in the middle + // of being committed with base revision of lowRevCommit.getRevisionId but + // this findAndModify is not going to detect that right now and later cause + // issues when nodes are retrieved. + MongoCommit commit = (MongoCommit)collection.findAndModify(query, null/*fields*/, + null/*sort*/, false/*remove*/, update, false/*returnNew*/, false/*upsert*/); + if (commit != null) { + logger.debug("Adjusted1 Commit @{}'s base revision from {} to {}", + new Object[] {commit.getRevisionId(), commit.getBaseRevisionId(), + highRevCommit.getRevisionId()}); + } + + // Adjust highRevCommit's base revision from old to newBaseRevId + query = QueryBuilder.start("_id").is(highRevCommit.getObjectId("_id")).get(); + update = new BasicDBObject("$set", new BasicDBObject(MongoCommit.KEY_BASE_REVISION_ID, + newBaseRevId)); + WriteResult writeResult = collection.update(query, update, + false /*upsert*/, false /*multi*/, WriteConcern.SAFE); + logger.debug("Adjusted Commit @{}'s base revision id from {} to {}", + new Object[] {highRevCommit.getRevisionId(), oldBaseRevId, newBaseRevId}); + nodeStore.evict(highRevCommit); + if (writeResult.getError() != null) { + // FIXME This is potentially a bug that we need to handle. + throw new Exception(String.format("Update wasn't successful: %s", writeResult)); + } + } private void markAsFailed() throws Exception { Index: oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java =================================================================== --- oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (revision 1439293) +++ oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (working copy) @@ -205,13 +205,12 @@ } @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { latch.await(); - return super.saveAndSetHeadRevision(); + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } } } @@ -230,13 +229,11 @@ } @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { - boolean result = super.saveAndSetHeadRevision(); - return result; + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } finally { latch.countDown(); } Index: oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java =================================================================== --- oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java (revision 1439293) +++ oak-mongomk/src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java (working copy) @@ -52,16 +52,14 @@ "This is a concurrent commit"); CommitCommand command = new CommitCommand(getNodeStore(), commit) { @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { synchronized (waitLock) { waitLock.wait(); } - - return super.saveAndSetHeadRevision(); + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } }; };