Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (revision 1727429) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java (working copy) @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import org.apache.jackrabbit.oak.api.PropertyState; @@ -279,7 +280,6 @@ // branch commits always use root node as commit root commitRootPath = "/"; } - ArrayList newNodes = new ArrayList(); ArrayList changedNodes = new ArrayList(); // operations are added to this list before they are executed, // so that all operations can be rolled back if there is a conflict @@ -312,114 +312,70 @@ int commitRootDepth = PathUtils.getDepth(commitRootPath); // check if there are real changes on the commit root boolean commitRootHasChanges = operations.containsKey(commitRootPath); + for (UpdateOp op : operations.values()) { + NodeDocument.setCommitRoot(op, revision, commitRootDepth); + changedNodes.add(op); + } // create a "root of the commit" if there is none UpdateOp commitRoot = getUpdateOperationForNode(commitRootPath); - for (String p : operations.keySet()) { - UpdateOp op = operations.get(p); - if (op.isNew()) { - NodeDocument.setDeleted(op, revision, false); - } - if (op == commitRoot) { - if (!op.isNew() && commitRootHasChanges) { - // commit root already exists and this is an update - changedNodes.add(op); - } - } else { - NodeDocument.setCommitRoot(op, revision, commitRootDepth); - if (op.isNew()) { - newNodes.add(op); - } else { - changedNodes.add(op); - } - } - } - if (changedNodes.size() == 0 && commitRoot.isNew()) { - // no updates and root of commit is also new. that is, - // it is the root of a subtree added in a commit. - // so we try to add the root like all other nodes - NodeDocument.setRevision(commitRoot, revision, commitValue); - newNodes.add(commitRoot); - } + boolean success = false; try { - if (newNodes.size() > 0) { - // set commit root on new nodes - if (!store.create(NODES, newNodes)) { - // some of the documents already exist: - // try to apply all changes one by one - for (UpdateOp op : newNodes) { - if (op == commitRoot) { - // don't write the commit root just yet - // (because there might be a conflict) - NodeDocument.unsetRevision(commitRoot, revision); - } - changedNodes.add(op); - } - newNodes.clear(); - } + opLog.addAll(changedNodes); + List oldDocs = store.createOrUpdate(NODES, changedNodes); + checkConflicts(oldDocs, changedNodes); + checkSplitCandidate(oldDocs); + + // finally write the commit root (the commit root might be written + // twice, first to check if there was a conflict, and only then to + // commit the revision, with the revision property set) + NodeDocument.setRevision(commitRoot, revision, commitValue); + if (commitRootHasChanges) { + // remove previously added commit root + NodeDocument.removeCommitRoot(commitRoot, revision); } - for (UpdateOp op : changedNodes) { - // set commit root on changed nodes. this may even apply - // to the commit root. the _commitRoot entry is removed - // again when the _revisions entry is set at the end - NodeDocument.setCommitRoot(op, revision, commitRootDepth); - opLog.add(op); - createOrUpdateNode(store, op); - } - // finally write the commit root, unless it was already written - // with added nodes (the commit root might be written twice, - // first to check if there was a conflict, and only then to commit - // the revision, with the revision property set) - if (changedNodes.size() > 0 || !commitRoot.isNew()) { - // set revision to committed - NodeDocument.setRevision(commitRoot, revision, commitValue); - if (commitRootHasChanges) { - // remove previously added commit root - NodeDocument.removeCommitRoot(commitRoot, revision); - } - opLog.add(commitRoot); - if (baseBranchRevision == null) { - // create a clone of the commitRoot in order - // to set isNew to false. If we get here the - // commitRoot document already exists and - // only needs an update - UpdateOp commit = commitRoot.copy(); - commit.setNew(false); - // only set revision on commit root when there is - // no collision for this commit revision - commit.containsMapEntry(COLLISIONS, revision, false); - NodeDocument before = nodeStore.updateCommitRoot(commit, revision); - if (before == null) { - String msg = "Conflicting concurrent change. " + - "Update operation failed: " + commitRoot; - NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId()); - DocumentStoreException dse; - if (commitRootDoc == null) { - dse = new DocumentStoreException(msg); - } else { - dse = new ConflictException(msg, - commitRootDoc.getConflictsFor( + opLog.add(commitRoot); + if (baseBranchRevision == null) { + // create a clone of the commitRoot in order + // to set isNew to false. If we get here the + // commitRoot document already exists and + // only needs an update + UpdateOp commit = commitRoot.copy(); + commit.setNew(false); + // only set revision on commit root when there is + // no collision for this commit revision + commit.containsMapEntry(COLLISIONS, revision, false); + NodeDocument before = nodeStore.updateCommitRoot(commit, revision); + if (before == null) { + String msg = "Conflicting concurrent change. " + + "Update operation failed: " + commitRoot; + NodeDocument commitRootDoc = store.find(NODES, commitRoot.getId()); + DocumentStoreException dse; + if (commitRootDoc == null) { + dse = new DocumentStoreException(msg); + } else { + dse = new ConflictException(msg, + commitRootDoc.getConflictsFor( Collections.singleton(revision))); - } - throw dse; - } else { - success = true; - // if we get here the commit was successful and - // the commit revision is set on the commitRoot - // document for this commit. - // now check for conflicts/collisions by other commits. - // use original commitRoot operation with - // correct isNew flag. - checkConflicts(commitRoot, before); - checkSplitCandidate(before); } + throw dse; } else { - // this is a branch commit, do not fail on collisions now - // trying to merge the branch will fail later - createOrUpdateNode(store, commitRoot); + success = true; + // if we get here the commit was successful and + // the commit revision is set on the commitRoot + // document for this commit. + // now check for conflicts/collisions by other commits. + // use original commitRoot operation with + // correct isNew flag. + checkConflicts(commitRoot, before); + checkSplitCandidate(before); } - operations.put(commitRootPath, commitRoot); + } else { + // this is a branch commit, do not fail on collisions now + // trying to merge the branch will fail later + createOrUpdateNode(store, commitRoot); } + operations.put(commitRootPath, commitRoot); } catch (DocumentStoreException e) { // OAK-3084 do not roll back if already committed if (success) { @@ -426,7 +382,7 @@ LOG.error("Exception occurred after commit. Rollback will be suppressed.", e); } else { try { - rollback(newNodes, opLog, commitRoot); + rollback(opLog, commitRoot); } catch (Exception ex) { // catch any exception caused by the rollback, log it // and throw the original exception @@ -456,8 +412,7 @@ } } - private void rollback(List newDocuments, - List changed, + private void rollback(List changed, UpdateOp commitRoot) { DocumentStore store = nodeStore.getDocumentStore(); for (UpdateOp op : changed) { @@ -467,11 +422,6 @@ } store.findAndUpdate(NODES, reverse); } - for (UpdateOp op : newDocuments) { - UpdateOp reverse = op.getReverseOperation(); - NodeDocument.setDeletedOnce(reverse); - store.findAndUpdate(NODES, reverse); - } UpdateOp removeCollision = new UpdateOp(commitRoot.getId(), false); NodeDocument.removeCollision(removeCollision, revision); store.findAndUpdate(NODES, removeCollision); @@ -490,6 +440,12 @@ checkSplitCandidate(doc); } + private void checkSplitCandidate(Iterable docs) { + for (NodeDocument doc : docs) { + checkSplitCandidate(doc); + } + } + private void checkSplitCandidate(@Nullable NodeDocument doc) { if (doc != null && doc.getMemory() > SPLIT_CANDIDATE_THRESHOLD) { nodeStore.addSplitCandidate(doc.getId()); @@ -587,6 +543,25 @@ } } + private void checkConflicts(List oldDocs, + List updates) { + int i = 0; + List exceptions = new ArrayList(); + Set revisions = new HashSet(); + for (NodeDocument doc : oldDocs) { + UpdateOp op = updates.get(i++); + try { + checkConflicts(op, doc); + } catch (ConflictException e) { + exceptions.add(e); + Iterables.addAll(revisions, e.getConflictRevisions()); + } + } + if (!exceptions.isEmpty()) { + throw new ConflictException("Following exceptions occurred during the bulk update operations: " + exceptions, revisions); + } + } + private String formatConflictRevision(Revision r) { if (nodeStore.getHeadRevision().isRevisionNewer(r)) { return r + " (not yet visible)"; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1727429) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy) @@ -239,10 +239,10 @@ final Semaphore created = new Semaphore(0); DocumentStore docStore = new MemoryDocumentStore() { @Override - public boolean create(Collection collection, - List updateOps) { + public List createOrUpdate(Collection collection, + List updateOps) { Semaphore semaphore = locks.get(Thread.currentThread()); - boolean result = super.create(collection, updateOps); + List result = super.createOrUpdate(collection, updateOps); if (semaphore != null) { created.release(); semaphore.acquireUninterruptibly();