Index: oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/kernel/KernelNodeStore.java (working copy) @@ -227,6 +227,11 @@ } } + @Override + public void release(String checkpoint) { + // TODO + } + public CacheStats getCacheStats() { return cacheStats; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -1336,6 +1336,11 @@ return getRoot(Revision.fromString(checkpoint)); } + @Override + public void release(@Nonnull String checkpoint) { + // TODO + } + //------------------------< RevisionContext >------------------------------- @Override Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -155,11 +155,13 @@ NodeBuilder builder = after.builder(); NodeBuilder async = builder.child(ASYNC); + String refCheckpoint = null; NodeState before = null; final PropertyState state = async.getProperty(name); if (state != null && state.getType() == STRING) { - before = store.retrieve(state.getValue(STRING)); + refCheckpoint = state.getValue(STRING); + before = store.retrieve(refCheckpoint); } if (before == null) { before = MISSING_NODE; @@ -216,6 +218,15 @@ } postAsyncRunStatsStatus(indexStats); + // checkpoints cleanup + if (refCheckpoint != null) { + store.release(refCheckpoint); + } + if (exception != null || (exception == null && !callback.dirty)) { + // error? cleanup current cp too + store.release(checkpoint); + } + if (exception != null) { if (!failing) { log.warn("Index update {} failed", name, exception); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/memory/MemoryNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/memory/MemoryNodeStore.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/memory/MemoryNodeStore.java (working copy) @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.CheckForNull; @@ -199,6 +200,16 @@ return checkpoints.get(checkNotNull(checkpoint)); } + @Override + public synchronized void release(String checkpoint) { + checkpoints.remove(checkpoint); + } + + /** test purpose only! */ + public synchronized Set listCheckpoints() { + return checkpoints.keySet(); + } + //------------------------------------------------------------< private >--- private static class MemoryNodeStoreBranch implements NodeStoreBranch { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -244,6 +244,7 @@ @Override @CheckForNull public NodeState retrieve(@Nonnull String checkpoint) { + checkNotNull(checkpoint); NodeState cp = head.get() .getChildNode("checkpoints") .getChildNode(checkpoint) @@ -254,6 +255,39 @@ return null; } + @Override + public void release(@Nonnull String checkpoint) { + checkNotNull(checkpoint); + + // try 5 times + for (int i = 0; i < 5; i++) { + if (commitSemaphore.tryAcquire()) { + try { + refreshHead(); + store.getTracker().getWriter().flush(); + + SegmentNodeState state = head.get(); + SegmentNodeBuilder builder = state.builder(); + + NodeBuilder cp = builder.child("checkpoints").child( + checkpoint); + if (cp.exists()) { + cp.remove(); + } + SegmentNodeState newState = builder.getNodeState(); + store.getTracker().getWriter().flush(); + if (store.setHead(state, newState)) { + refreshHead(); + return; + } + + } finally { + commitSemaphore.release(); + } + } + } + } + private class Commit { private final Random random = new Random(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStore.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStore.java (working copy) @@ -125,4 +125,7 @@ */ @CheckForNull NodeState retrieve(@Nonnull String checkpoint); + + void release(@Nonnull String checkpoint); + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/ProxyNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/ProxyNodeStore.java (revision 1597948) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/ProxyNodeStore.java (working copy) @@ -72,4 +72,9 @@ return getNodeStore().retrieve(checkpoint); } + @Override + public void release(String checkpoint) { + getNodeStore().release(checkpoint); + } + } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1597948) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -40,12 +40,14 @@ import com.google.common.collect.Sets; import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup; import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; import org.apache.jackrabbit.oak.query.index.FilterImpl; import org.apache.jackrabbit.oak.spi.commit.CommitHook; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.Editor; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.query.PropertyValues; import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; @@ -81,7 +83,7 @@ *
  • Add some content
  • *
  • Search & verify
  • * - * + * */ @Test public void testAsync() throws Exception { @@ -118,7 +120,7 @@ *
  • Add some content
  • *
  • Search & verify
  • * - * + * */ @Test public void testAsyncDouble() throws Exception { @@ -152,10 +154,10 @@ PropertyIndexLookup lookup = new PropertyIndexLookup(root); assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc")); - assertEquals(ImmutableSet.of(), find(lookup, "foo", "def")); - assertEquals(ImmutableSet.of(), find(lookup, "foo", "ghi")); + assertEquals(ImmutableSet. of(), find(lookup, "foo", "def")); + assertEquals(ImmutableSet. of(), find(lookup, "foo", "ghi")); - assertEquals(ImmutableSet.of(), find(lookup, "bar", "abc")); + assertEquals(ImmutableSet. of(), find(lookup, "bar", "abc")); assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "def")); assertEquals(ImmutableSet.of("testSecond"), find(lookup, "bar", "ghi")); @@ -168,7 +170,7 @@ *
  • Add some content
  • *
  • Search & verify
  • * - * + * */ @Test public void testAsyncDoubleSubtree() throws Exception { @@ -182,8 +184,8 @@ createIndexDefinition( builder.child("newchild").child("other") .child(INDEX_DEFINITIONS_NAME), "subIndex", true, - false, ImmutableSet.of("foo"), null) - .setProperty(ASYNC_PROPERTY_NAME, "async"); + false, ImmutableSet.of("foo"), null).setProperty( + ASYNC_PROPERTY_NAME, "async"); builder.child("testRoot").setProperty("foo", "abc"); builder.child("newchild").child("other").child("testChild") @@ -209,7 +211,8 @@ .getChildNode("newchild").getChildNode("other")); assertEquals(ImmutableSet.of("testChild"), find(lookupChild, "foo", "xyz")); - assertEquals(ImmutableSet.of(), find(lookupChild, "foo", "abc")); + assertEquals(ImmutableSet. of(), + find(lookupChild, "foo", "abc")); } // OAK-1749 @@ -242,14 +245,17 @@ IndexEditorProvider provider = new PropertyIndexEditorProvider(); NodeBuilder builder = store.getRoot().builder(); - createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo", false, ImmutableSet.of("foo"), null, TYPE, Collections.singletonMap(ASYNC_PROPERTY_NAME, "async")); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), "foo", + false, ImmutableSet.of("foo"), null, TYPE, + Collections.singletonMap(ASYNC_PROPERTY_NAME, "async")); builder.child("test").setProperty("foo", "a"); builder.child("child"); store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, + provider); async.run(); builder = store.getRoot().builder(); @@ -290,8 +296,7 @@ @Nonnull @Override public NodeState merge(@Nonnull NodeBuilder builder, - @Nonnull CommitHook commitHook, - @Nullable CommitInfo info) + @Nonnull CommitHook commitHook, @Nullable CommitInfo info) throws CommitFailedException { Semaphore s = locks.get(Thread.currentThread()); if (s != null) { @@ -311,7 +316,8 @@ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); - final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, + provider); async.run(); builder = store.getRoot().builder(); @@ -353,4 +359,145 @@ assertNoConflictMarker(builder.getChildNode(name)); } } + + @Test + public void cpCleanupNoChanges() throws Exception { + MemoryNodeStore store = new MemoryNodeStore(); + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + + // no changes on diff, no checkpoints left behind + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + } + + @Test + public void cpCleanupWChanges() throws Exception { + MemoryNodeStore store = new MemoryNodeStore(); + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + String firstCp = store.listCheckpoints().iterator().next(); + + builder = store.getRoot().builder(); + builder.child("testRoot").setProperty("foo", "def"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + String secondCp = store.listCheckpoints().iterator().next(); + assertFalse("Store should keep only second checkpoint", + secondCp.equals(firstCp)); + } + + @Test + public void cpCleanupWErrors() throws Exception { + MemoryNodeStore store = new MemoryNodeStore(); + FaultyIndexEditorProvder provider = new FaultyIndexEditorProvder(); + + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + assertTrue("Error should have been triggered by the commit", + provider.isFailed()); + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + } + + private static class FaultyIndexEditorProvder implements + IndexEditorProvider { + + private final FaultyIndexEditor faulty = new FaultyIndexEditor(); + + @Override + public Editor getIndexEditor(String type, NodeBuilder definition, + NodeState root, IndexUpdateCallback callback) + throws CommitFailedException { + return faulty; + } + + public boolean isFailed() { + return faulty.failed; + } + + } + + private static class FaultyIndexEditor implements IndexEditor { + + private boolean failed = false; + + @Override + public void enter(NodeState before, NodeState after) + throws CommitFailedException { + failed = true; + throw new CommitFailedException("test", -1, "Testing failures"); + } + + @Override + public void leave(NodeState before, NodeState after) + throws CommitFailedException { + } + + @Override + public void propertyAdded(PropertyState after) + throws CommitFailedException { + } + + @Override + public void propertyChanged(PropertyState before, PropertyState after) + throws CommitFailedException { + } + + @Override + public void propertyDeleted(PropertyState before) + throws CommitFailedException { + } + + @Override + public Editor childNodeAdded(String name, NodeState after) + throws CommitFailedException { + return null; + } + + @Override + public Editor childNodeChanged(String name, NodeState before, + NodeState after) throws CommitFailedException { + return null; + } + + @Override + public Editor childNodeDeleted(String name, NodeState before) + throws CommitFailedException { + return null; + } + + } + } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CheckpointTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CheckpointTest.java (revision 1597948) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/CheckpointTest.java (working copy) @@ -18,6 +18,7 @@ import static junit.framework.Assert.assertFalse; import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertNull; import static junit.framework.Assert.assertTrue; import java.util.concurrent.TimeUnit; @@ -45,7 +46,18 @@ store.retrieve("missing-checkpoint"); } - private static void verifyNS(SegmentNodeStore store, boolean exists) { + @Test + public void testRelease() throws CommitFailedException { + SegmentNodeStore store = new SegmentNodeStore(new MemoryStore()); + addTestNode(store, "test-checkpoint"); + String cp = verifyNS(store, true); + + store.release(cp); + assertNull(store.retrieve(cp)); + + } + + private static String verifyNS(SegmentNodeStore store, boolean exists) { String cp = store.checkpoint(TimeUnit.HOURS.toMillis(1)); assertNotNull("Checkpoint must not be null", cp); @@ -58,6 +70,7 @@ assertFalse("Node shouldn't exist in checkpoint", cpns .getChildNode("test-checkpoint").exists()); } + return cp; } private static void addTestNode(NodeStore store, String name)