Index: src/main/java/org/apache/jackrabbit/mongomk/impl/command/exception/ConcurrentCommitException.java =================================================================== --- src/main/java/org/apache/jackrabbit/mongomk/impl/command/exception/ConcurrentCommitException.java (revision 0) +++ src/main/java/org/apache/jackrabbit/mongomk/impl/command/exception/ConcurrentCommitException.java (working copy) @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.mongomk.impl.command.exception; + +public class ConcurrentCommitException extends Exception { + + private static final long serialVersionUID = -2705411724908496520L; +} Property changes on: src/main/java/org/apache/jackrabbit/mongomk/impl/command/exception/ConcurrentCommitException.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: src/main/java/org/apache/jackrabbit/mongomk/impl/command/MergeCommand.java =================================================================== --- src/main/java/org/apache/jackrabbit/mongomk/impl/command/MergeCommand.java (revision 1439852) +++ src/main/java/org/apache/jackrabbit/mongomk/impl/command/MergeCommand.java (working copy) @@ -114,9 +114,7 @@ MongoUtil.fromMongoRepresentation(currentHead), message); } - Command command = new CommitCommand(nodeStore, newCommit); - long revision = command.execute(); - return MongoUtil.fromMongoRepresentation(revision); + return nodeStore.commit(newCommit); } /** Index: src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java =================================================================== --- src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (revision 1439852) +++ src/main/java/org/apache/jackrabbit/mongomk/impl/command/CommitCommand.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.mongomk.impl.command; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -34,15 +35,15 @@ import org.apache.jackrabbit.mongomk.impl.action.SaveAndSetHeadRevisionAction; import org.apache.jackrabbit.mongomk.impl.action.SaveCommitAction; import org.apache.jackrabbit.mongomk.impl.action.SaveNodesAction; -import org.apache.jackrabbit.mongomk.impl.command.exception.ConflictingCommitException; +import org.apache.jackrabbit.mongomk.impl.command.exception.ConcurrentCommitException; import org.apache.jackrabbit.mongomk.impl.instruction.CommitCommandInstructionVisitor; import org.apache.jackrabbit.mongomk.impl.model.MongoCommit; import org.apache.jackrabbit.mongomk.impl.model.MongoNode; import org.apache.jackrabbit.mongomk.impl.model.MongoSync; +import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.QueryBuilder; @@ -66,6 +67,7 @@ private final Long initialBaseRevisionId; private Long baseRevisionId; private String branchId; + private int retries; /** * Constructs a new {@code CommitCommandMongo}. @@ -81,32 +83,28 @@ @Override public Long execute() throws Exception { - int retries = 0; - boolean success; - do { - mongoSync = new ReadAndIncHeadRevisionAction(nodeStore).execute(); - revisionId = mongoSync.getNextRevisionId() - 1; - if (initialBaseRevisionId != null) { - baseRevisionId = initialBaseRevisionId; - } else { - baseRevisionId = mongoSync.getHeadRevisionId(); - } - logger.debug("Committing @{} with diff: {}", revisionId, commit.getDiff()); - readBranchIdFromBaseCommit(); - createMongoNodes(); - prepareCommit(); - readAndMergeExistingNodes(); - prepareMongoNodes(); - success = saveNodesAndCommits(); - if (success) { - success = saveAndSetHeadRevision(); - if (success) { - cacheNodes(); - } else { - retries++; - } - } - } while (!success); + mongoSync = new ReadAndIncHeadRevisionAction(nodeStore).execute(); + revisionId = mongoSync.getNextRevisionId() - 1; + if (initialBaseRevisionId != null) { + baseRevisionId = initialBaseRevisionId; + } else { + baseRevisionId = mongoSync.getHeadRevisionId(); + } + logger.debug("Committing @{} with diff: {}", revisionId, commit.getDiff()); + readBranchIdFromBaseCommit(); + createMongoNodes(); + prepareCommit(); + readAndMergeExistingNodes(); + prepareMongoNodes(); + try { + saveNodesAndCommits(); + saveAndSetHeadRevision(); + cacheNodes(); + } catch (ConcurrentCommitException e) { + retries++; + logger.debug("Commit @{}: failure. Retries:" + retries); + throw e; + } String msg = "Commit @{}: success"; if (retries > 0) { @@ -116,17 +114,6 @@ return revisionId; } - private boolean saveNodesAndCommits() throws Exception { - long headRevisionId = new FetchHeadRevisionIdAction(nodeStore, branchId).execute(); - if (branchId == null && headRevisionId != mongoSync.getHeadRevisionId()) { - // Head revision moved on in trunk in the meantime, no need to save - return false; - } - new SaveNodesAction(nodeStore, nodes.values()).execute(); - new SaveCommitAction(nodeStore, commit).execute(); - return true; - } - @Override public int getNumOfRetries() { return 100; @@ -134,7 +121,7 @@ @Override public boolean needsRetry(Exception e) { - return e instanceof ConflictingCommitException; + return e instanceof ConcurrentCommitException; } private void readBranchIdFromBaseCommit() throws Exception { @@ -196,17 +183,10 @@ } } -// private void readExistingNodes() { -// FetchNodesAction action = new FetchNodesAction(nodeStore, affectedPaths, -// mongoSync.getHeadRevisionId()); -// action.setBranchId(branchId); -// existingNodes = action.execute(); -// } - - // FIXME - Performance, This seems to be faster for commits than the old method. private void readExistingNodes() throws Exception { if (affectedPaths == null || affectedPaths.isEmpty()) { existingNodes = Collections.emptyMap(); + return; } existingNodes = new HashMap(); @@ -228,7 +208,7 @@ for (MongoNode committingNode : nodes.values()) { MongoNode existingNode = existingNodes.get(committingNode.getPath()); if (existingNode != null) { - if(logger.isDebugEnabled()){ + if (logger.isDebugEnabled()){ logger.debug("Found existing node to merge: {}", existingNode.getPath()); logger.debug("Existing node: {}", existingNode); logger.debug("Committing node: {}", committingNode); @@ -314,68 +294,66 @@ } } + private void saveNodesAndCommits() throws Exception { + long headRevisionId = new FetchHeadRevisionIdAction(nodeStore, branchId).execute(); + if (branchId == null && headRevisionId != mongoSync.getHeadRevisionId()) { + // Head revision moved on in trunk in the meantime, no need to save + throw new ConcurrentCommitException(); + } + new SaveNodesAction(nodeStore, nodes.values()).execute(); + new SaveCommitAction(nodeStore, commit).execute(); + } + /** * Protected for testing purposed only. * * @return True if the operation was successful. * @throws Exception If an exception happens. */ - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { // Don't update the head revision id for branches. if (branchId != null) { - return true; + return; } - long assumedHeadRevision = this.mongoSync.getHeadRevisionId(); + long assumedHeadRevision = mongoSync.getHeadRevisionId(); MongoSync mongoSync = new SaveAndSetHeadRevisionAction(nodeStore, assumedHeadRevision, revisionId).execute(); - if (mongoSync == null) { - // There have been commit(s) in the meantime. If it's a conflicting - // update, retry the whole operation and count against number of retries. - // If not, need to retry again (in order to write commits and nodes properly) - // but don't count these retries against number of retries. - if (conflictingCommitsExist(assumedHeadRevision)) { - String message = String.format("Commit @%s: failed due to a conflicting commit." - + " Affected paths: %s", revisionId, commit.getAffectedPaths()); - logger.warn(message); - markAsFailed(); - throw new ConflictingCommitException(message); - } else { - logger.info("Commit @{}: failed due to a concurrent commit." + " Affected paths: {}", revisionId, commit.getAffectedPaths()); - markAsFailed(); - return false; - } + if (mongoSync == null) { // There have been concurrent commit(s). + cleanupCommitAndNodes(); + throw new ConcurrentCommitException(); } - return true; } - private boolean conflictingCommitsExist(long baseRevisionId) { - QueryBuilder queryBuilder = QueryBuilder.start(MongoCommit.KEY_FAILED).notEquals(Boolean.TRUE) - .and(MongoCommit.KEY_BASE_REVISION_ID).is(baseRevisionId) - .and(MongoCommit.KEY_REVISION_ID).greaterThan(0L) - .and(MongoCommit.KEY_REVISION_ID).notEquals(revisionId); - DBObject query = queryBuilder.get(); + private void cleanupCommitAndNodes() throws Exception { + + logger.debug("Cleaning up commit and nodes related to Commit @{}", revisionId); + + // Clean up the commit. DBCollection collection = nodeStore.getCommitCollection(); - MongoCommit conflictingCommit = (MongoCommit)collection.findOne(query); - for (String affectedPath : conflictingCommit.getAffectedPaths()) { - if (affectedPaths.contains(affectedPath)) { - return true; - } + DBObject query = QueryBuilder.start("_id").is(commit.getObjectId("_id")).get(); + + WriteResult writeResult = collection.remove(query, WriteConcern.SAFE); + if (writeResult.getError() != null) { + throw new Exception(String.format("Remove wasn't successful: %s", writeResult)); } - return false; - } - private void markAsFailed() throws Exception { - DBCollection commitCollection = nodeStore.getCommitCollection(); - DBObject query = QueryBuilder.start("_id").is(commit.getObjectId("_id")).get(); - DBObject update = new BasicDBObject("$set", new BasicDBObject(MongoCommit.KEY_FAILED, Boolean.TRUE)); - WriteResult writeResult = commitCollection.update(query, update, - false /*upsert*/, false /*multi*/, WriteConcern.SAFE); - logger.debug("Marked @{} failed", revisionId); - nodeStore.evict(commit); + // Collect ids for the nodes + Collection nodesCollection = nodes.values(); + int nodesSize = nodesCollection.size(); + DBObject[] nodesArray = nodesCollection.toArray(new DBObject[nodesSize]); + ObjectId[] objectIds = new ObjectId[nodesSize]; + for (int i = 0; i < nodesSize; i++) { + objectIds[i] = (ObjectId)nodesArray[i].get("_id"); + } + + // Clean up nodes. + collection = nodeStore.getNodeCollection(); + query = QueryBuilder.start("_id").in(objectIds).get(); + + writeResult = collection.remove(query, WriteConcern.SAFE); if (writeResult.getError() != null) { - // FIXME This is potentially a bug that we need to handle. - throw new Exception(String.format("Update wasn't successful: %s", writeResult)); + throw new Exception(String.format("Remove wasn't successful: %s", writeResult)); } } Index: src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (revision 1440303) +++ src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentConflictingCommitCommandTest.java (working copy) @@ -73,9 +73,9 @@ CommitBuilder.build("/", "+\"a1\" : {}", null), latch); ExecutorService executorService = Executors.newFixedThreadPool(n); - Future future1 = executorService.submit(new CommitExecutorCallable(cmd1)); + Future future1 = executorService.submit(new CommitCallable(cmd1)); Thread.sleep(1000); - Future future2 = executorService.submit(new CommitExecutorCallable(cmd2)); + Future future2 = executorService.submit(new CommitCallable(cmd2)); try { future1.get(); future2.get(); @@ -202,11 +202,11 @@ CommitBuilder.build("/", "+\"b\" : {}", null), latch); ExecutorService executorService = Executors.newFixedThreadPool(n); - Future future1 = executorService.submit(new CommitExecutorCallable(cmd1)); + Future future1 = executorService.submit(new CommitCallable(cmd1)); Thread.sleep(1000); // To make sure commit /a started waiting. - Future future2 = executorService.submit(new CommitExecutorCallable(cmd2)); + Future future2 = executorService.submit(new CommitCallable(cmd2)); Thread.sleep(1000); // To make sure commit /c/d incremented the head revision. - Future future3 = executorService.submit(new CommitExecutorCallable(cmd3)); + Future future3 = executorService.submit(new CommitCallable(cmd3)); try { future1.get(); future2.get(); @@ -230,13 +230,12 @@ } @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { latch.await(); - return super.saveAndSetHeadRevision(); + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } } } @@ -255,13 +254,11 @@ } @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { - boolean result = super.saveAndSetHeadRevision(); - return result; + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } finally { latch.countDown(); } @@ -281,24 +278,7 @@ @Override public Long call() throws Exception { - return command.execute(); - } - } - - /** - * A Callable that executes the command with the default command executor. - */ - private static class CommitExecutorCallable implements Callable { - - private final CommitCommand command; - - public CommitExecutorCallable(CommitCommand command) { - this.command = command; - } - - @Override - public Long call() throws Exception { return new DefaultCommandExecutor().execute(command); } } -} +} \ No newline at end of file Index: src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java (revision 1439852) +++ src/test/java/org/apache/jackrabbit/mongomk/impl/command/ConcurrentCommitCommandTest.java (working copy) @@ -52,16 +52,15 @@ "This is a concurrent commit"); CommitCommand command = new CommitCommand(getNodeStore(), commit) { @Override - protected boolean saveAndSetHeadRevision() throws Exception { + protected void saveAndSetHeadRevision() throws Exception { try { synchronized (waitLock) { waitLock.wait(); } - return super.saveAndSetHeadRevision(); + super.saveAndSetHeadRevision(); } catch (InterruptedException e) { e.printStackTrace(); - return false; } }; };