Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java (revision 1584000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Branch.java (working copy) @@ -183,7 +183,7 @@ * @param mergeCommit the revision of the merge commit. */ public void applyTo(@Nonnull UnsavedModifications trunk, - @Nonnull Revision mergeCommit) { + @Nonnull Revision mergeCommit) { checkNotNull(trunk); for (BranchCommit c : commits.values()) { c.getModifications().applyTo(trunk, mergeCommit); @@ -217,6 +217,17 @@ } /** + * @param rev the revision to check. + * @return {@code true} if the given revision is the head of this branch, + * {@code false} otherwise. + */ + public boolean isHead(@Nonnull Revision rev) { + checkArgument(checkNotNull(rev).isBranch(), + "Not a branch revision: %s", rev); + return checkNotNull(rev).equals(commits.lastKey()); + } + + /** * Information about a commit within a branch. */ private static final class BranchCommit { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java (revision 1584000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java (working copy) @@ -79,11 +79,6 @@ private final DocumentNodeStore store; - /** - * TODO: OAK-1056 - */ - private boolean isBranch; - DocumentNodeState(@Nonnull DocumentNodeStore store, @Nonnull String path, @Nonnull Revision rev) { this(store, path, rev, false); @@ -101,15 +96,6 @@ return rev; } - DocumentNodeState setBranch() { - isBranch = true; - return this; - } - - boolean isBranch() { - return isBranch; - } - //--------------------------< NodeState >----------------------------------- @Override @@ -217,10 +203,21 @@ @Nonnull @Override public NodeBuilder builder() { - if (isBranch) { - return new MemoryNodeBuilder(this); - } else if ("/".equals(getPath())) { - return new DocumentRootBuilder(this, store); + if ("/".equals(getPath())) { + if (rev.isBranch()) { + // check if this node state is head of a branch + Branch b = store.getBranches().getBranch(rev); + if (b == null) { + throw new IllegalStateException("No branch for revision: " + rev); + } + if (b.isHead(rev)) { + return new DocumentRootBuilder(this, store); + } else { + return new MemoryNodeBuilder(this); + } + } else { + return new DocumentRootBuilder(this, store); + } } else { return new MemoryNodeBuilder(this); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1584000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -986,6 +986,10 @@ @Nonnull DocumentNodeStoreBranch createBranch(DocumentNodeState base) { + DocumentNodeStoreBranch b = DocumentNodeStoreBranch.getCurrentBranch(); + if (b != null) { + return b; + } return new DocumentNodeStoreBranch(this, base, mergeLock); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (revision 1584000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java (working copy) @@ -16,8 +16,10 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.util.concurrent.Callable; import java.util.concurrent.locks.ReadWriteLock; +import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -33,18 +35,17 @@ /** * Implementation of a DocumentMK based node store branch. */ -public class DocumentNodeStoreBranch +class DocumentNodeStoreBranch extends AbstractNodeStoreBranch { /** Lock for coordinating concurrent merge operations */ private final ReadWriteLock mergeLock; - public DocumentNodeStoreBranch(DocumentNodeStore store, - DocumentNodeState base, - ReadWriteLock mergeLock) { - // maximum back off is twice the async delay, but at least 2 seconds. + DocumentNodeStoreBranch(DocumentNodeStore store, + DocumentNodeState base, + ReadWriteLock mergeLock) { super(store, new ChangeDispatcher(store.getRoot()), mergeLock.readLock(), - base, Math.max(store.getAsyncDelay(), 1000) * 2); + base, null, getMaxBackoffMillis(store)); this.mergeLock = mergeLock; } @@ -55,13 +56,13 @@ @Override protected DocumentNodeState createBranch(DocumentNodeState state) { - return store.getRoot(state.getRevision().asBranchRevision()).setBranch(); + return store.getRoot(state.getRevision().asBranchRevision()); } @Override protected DocumentNodeState rebase(DocumentNodeState branchHead, DocumentNodeState base) { - return store.getRoot(store.rebase(branchHead.getRevision(), base.getRevision())).setBranch(); + return store.getRoot(store.rebase(branchHead.getRevision(), base.getRevision())); } @Override @@ -74,25 +75,20 @@ @Override protected DocumentNodeState reset(@Nonnull DocumentNodeState branchHead, @Nonnull DocumentNodeState ancestor) { - return store.getRoot(store.reset(branchHead.getRevision(), - ancestor.getRevision())).setBranch(); + return store.getRoot(store.reset(branchHead.getRevision(), ancestor.getRevision())); } @Override protected DocumentNodeState persist(final NodeState toPersist, final DocumentNodeState base, final CommitInfo info) { - DocumentNodeState state = persist(new Changes() { + return persist(new Changes() { @Override public void with(Commit c) { toPersist.compareAgainstBaseState(base, new CommitDiff(store, c, store.getBlobSerializer())); } }, base, info); - if (base.isBranch()) { - state.setBranch(); - } - return state; } @Override @@ -146,8 +142,13 @@ } } -//------------------------------< internal >-------------------------------- + //------------------------------< internal >-------------------------------- + private static long getMaxBackoffMillis(DocumentNodeStore store) { + // maximum back off is twice the async delay, but at least 2 seconds. + return Math.max(store.getAsyncDelay(), 1000) * 2; + } + /** * Persist some changes on top of the given base state. * @@ -185,4 +186,21 @@ void with(Commit c); } + + /** + * Returns the branch instance in use by the current thread or + * null if there is none. + *

+ * See also {@link AbstractNodeStoreBranch#withCurrentBranch(Callable)}. + * + * @return + */ + @CheckForNull + static DocumentNodeStoreBranch getCurrentBranch() { + AbstractNodeStoreBranch b = BRANCHES.get(Thread.currentThread()); + if (b instanceof DocumentNodeStoreBranch) { + return (DocumentNodeStoreBranch) b; + } + return null; + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java (revision 1584000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentRootBuilder.java (working copy) @@ -41,7 +41,7 @@ * Number of content updates that need to happen before the updates * are automatically purged to the private branch. */ - private static final int UPDATE_LIMIT = Integer.getInteger("update.limit", 1000); + static final int UPDATE_LIMIT = Integer.getInteger("update.limit", 1000); /** * The underlying store @@ -67,7 +67,8 @@ */ private int updates; - DocumentRootBuilder(DocumentNodeState base, DocumentNodeStore store) { + DocumentRootBuilder(@Nonnull DocumentNodeState base, + @Nonnull DocumentNodeStore store) { super(checkNotNull(base)); this.store = checkNotNull(store); this.base = base; @@ -100,7 +101,14 @@ } } + @Nonnull @Override + public NodeState getNodeState() { + purge(); + return branch.getHead(); + } + + @Override public Blob createBlob(InputStream stream) throws IOException { return store.createBlob(stream); } @@ -111,7 +119,7 @@ * Rebase this builder on top of the head of the underlying store */ NodeState rebase() { - NodeState head = getNodeState(); + NodeState head = super.getNodeState(); NodeState inMemBase = super.getBaseState(); // Rebase branch @@ -123,7 +131,7 @@ // Set new base and return rebased head base = branch.getBase(); - return getNodeState(); + return super.getNodeState(); } /** @@ -185,7 +193,7 @@ } private void purge() { - branch.setRoot(getNodeState()); + branch.setRoot(super.getNodeState()); super.reset(branch.getHead()); updates = 0; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java (revision 1584000) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/AbstractNodeStoreBranch.java (working copy) @@ -17,6 +17,8 @@ package org.apache.jackrabbit.oak.spi.state; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import static com.google.common.base.Preconditions.checkNotNull; @@ -36,6 +38,8 @@ import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import com.google.common.collect.Maps; + /** * A base implementation of a node store branch, which supports partially * persisted branches. @@ -48,8 +52,10 @@ private static final Random RANDOM = new Random(); - private static final long MIN_BACKOFF = 100; + private static final long MIN_BACKOFF = 50; + protected static final ConcurrentMap BRANCHES = Maps.newConcurrentMap(); + /** The underlying store to which this branch belongs */ protected final S store; @@ -72,7 +78,7 @@ ChangeDispatcher dispatcher, Lock mergeLock, N base) { - this(kernelNodeStore, dispatcher, mergeLock, base, + this(kernelNodeStore, dispatcher, mergeLock, base, null, MILLISECONDS.convert(10, SECONDS)); } @@ -80,11 +86,16 @@ ChangeDispatcher dispatcher, Lock mergeLock, N base, + N head, long maximumBackoff) { this.store = checkNotNull(kernelNodeStore); this.dispatcher = dispatcher; this.mergeLock = checkNotNull(mergeLock); - branchState = new Unmodified(checkNotNull(base)); + if (head == null) { + this.branchState = new Unmodified(checkNotNull(base)); + } else { + this.branchState = new Persisted(checkNotNull(base), head); + } this.maximumBackoff = Math.max(maximumBackoff, MIN_BACKOFF); } @@ -336,7 +347,8 @@ * Persist this branch to an underlying branch in the {@code MicroKernel}. */ Persisted persist() { - Persisted p = new Persisted(base, getHead()); + Persisted p = new Persisted(base); + p.persistTransientHead(getHead()); branchState = p; return p; } @@ -519,12 +531,17 @@ return "Persisted[" + base + ", " + head + ']'; } - Persisted(N base, NodeState head) { + Persisted(N base) { super(base); this.head = createBranch(base); - persistTransientHead(head); } + Persisted(N base, N head) { + super(base); + createBranch(base); + this.head = head; + } + void move(String source, String target) { head = AbstractNodeStoreBranch.this.move(source, target, head); } @@ -564,32 +581,38 @@ @Override @Nonnull - NodeState merge(@Nonnull CommitHook hook, @Nonnull CommitInfo info) + NodeState merge(@Nonnull final CommitHook hook, + @Nonnull final CommitInfo info) throws CommitFailedException { + boolean success = false; + N previousHead = head; try { rebase(); + previousHead = head; dispatcher.contentChanged(base, null); - NodeState toCommit = checkNotNull(hook).processCommit(base, head, info); - N newRoot = AbstractNodeStoreBranch.this.persist(toCommit, head, info); - boolean success = false; - try { - newRoot = AbstractNodeStoreBranch.this.merge(newRoot, info); - success = true; - } finally { - if (!success) { - try { - AbstractNodeStoreBranch.this.reset(newRoot, head); - } catch (Exception e) { - CommitFailedException ex = new CommitFailedException( - OAK, 100, "Branch reset failed", e); - branchState = new ResetFailed(base, ex); - } + N newRoot = withCurrentBranch(new Callable() { + @Override + public N call() throws Exception { + NodeState toCommit = checkNotNull(hook).processCommit(base, head, info); + head = AbstractNodeStoreBranch.this.persist(toCommit, head, info); + return AbstractNodeStoreBranch.this.merge(head, info); } - } + }); branchState = new Merged(base); + success = true; dispatcher.contentChanged(newRoot, info); return newRoot; + } catch (Exception e) { + if (e instanceof CommitFailedException) { + throw (CommitFailedException) e; + } else { + throw new CommitFailedException(MERGE, 1, + "Failed to merge changes to the underlying store", e); + } } finally { + if (!success) { + resetBranch(head, previousHead); + } dispatcher.contentChanged(getRoot(), null); } } @@ -597,6 +620,16 @@ private void persistTransientHead(NodeState newHead) { head = AbstractNodeStoreBranch.this.persist(newHead, head, null); } + + private void resetBranch(N branchHead, N ancestor) { + try { + head = AbstractNodeStoreBranch.this.reset(branchHead, ancestor); + } catch (Exception e) { + CommitFailedException ex = new CommitFailedException( + OAK, 100, "Branch reset failed", e); + branchState = new ResetFailed(base, ex); + } + } } /** @@ -686,4 +719,15 @@ } } + private T withCurrentBranch(Callable callable) throws Exception { + Thread t = Thread.currentThread(); + Object previous = BRANCHES.putIfAbsent(t, this); + try { + return callable.call(); + } finally { + if (previous == null) { + BRANCHES.remove(t, this); + } + } + } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1584000) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.plugins.document; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -26,28 +27,38 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.apache.jackrabbit.mk.api.MicroKernelException; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.kernel.KernelNodeState; import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.CompositeHook; +import org.apache.jackrabbit.oak.spi.commit.DefaultEditor; +import org.apache.jackrabbit.oak.spi.commit.Editor; +import org.apache.jackrabbit.oak.spi.commit.EditorHook; +import org.apache.jackrabbit.oak.spi.commit.EditorProvider; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; -import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Iterables; +import static org.apache.jackrabbit.oak.api.CommitFailedException.CONSTRAINT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class DocumentNodeStoreTest { @@ -267,4 +278,115 @@ ns1.dispose(); ns2.dispose(); } + + @Test + public void commitHookChangesOnBranch() throws Exception { + final int NUM_NODES = DocumentRootBuilder.UPDATE_LIMIT / 2; + final int NUM_PROPS = 10; + DocumentNodeStore ns = new DocumentMK.Builder().getNodeStore(); + NodeBuilder builder = ns.getRoot().builder(); + for (int i = 0; i < NUM_NODES; i++) { + NodeBuilder c = builder.child("n" + i); + for (int j = 0; j < NUM_PROPS; j++) { + c.setProperty("q" + j, "value"); + c.setProperty("p" + j, "value"); + } + } + try { + ns.merge(builder, CompositeHook.compose( + Arrays.asList(new TestHook("p"), new TestHook("q"), FAILING_HOOK)), + CommitInfo.EMPTY); + fail("merge must fail and reset changes done by commit hooks"); + } catch (CommitFailedException e) { + // expected + } + for (int i = 0; i < NUM_NODES; i++) { + NodeBuilder c = builder.getChildNode("n" + i); + assertTrue(c.exists()); + for (int j = 0; j < NUM_PROPS; j++) { + PropertyState p = c.getProperty("p" + j); + assertNotNull(p); + // must still see initial values before failed merge + assertEquals("value", p.getValue(Type.STRING)); + // same for property 'qX' + p = c.getProperty("q" + j); + assertNotNull(p); + // must still see initial values before failed merge + assertEquals("value", p.getValue(Type.STRING)); + } + } + ns.merge(builder, CompositeHook.compose( + Arrays.asList(new TestHook("p"), new TestHook("q"))), + CommitInfo.EMPTY); + + builder = ns.getRoot().builder(); + // must see properties changed by commit hook + for (int i = 0; i < NUM_NODES; i++) { + NodeBuilder c = builder.getChildNode("n" + i); + assertTrue(c.exists()); + for (int j = 0; j < NUM_PROPS; j++) { + PropertyState p = c.getProperty("p" + j); + assertNotNull(p); + assertEquals("test", p.getValue(Type.STRING)); + p = c.getProperty("q" + j); + assertNotNull(p); + assertEquals("test", p.getValue(Type.STRING)); + } + } + + ns.dispose(); + } + + private static class TestHook extends EditorHook { + + TestHook(final String prefix) { + super(new EditorProvider() { + @CheckForNull + @Override + public Editor getRootEditor(NodeState before, + NodeState after, + NodeBuilder builder, + CommitInfo info) + throws CommitFailedException { + return new TestEditor(builder, prefix); + } + }); + } + } + + private static final CommitHook FAILING_HOOK = new CommitHook() { + @Nonnull + @Override + public NodeState processCommit(NodeState before, + NodeState after, + CommitInfo info) + throws CommitFailedException { + throw new CommitFailedException(CONSTRAINT, 0, "fail"); + } + }; + + private static class TestEditor extends DefaultEditor { + + private final NodeBuilder builder; + private final String prefix; + + TestEditor(NodeBuilder builder, String prefix) { + this.builder = builder; + this.prefix = prefix; + } + + @Override + public Editor childNodeAdded(String name, NodeState after) + throws CommitFailedException { + return new TestEditor(builder.child(name), prefix); + } + + @Override + public void propertyAdded(PropertyState after) + throws CommitFailedException { + if (after.getName().startsWith(prefix)) { + builder.setProperty(after.getName(), "test"); + } + } + } }