Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1762289) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -30,6 +30,7 @@ import java.io.Closeable; import java.util.Calendar; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -76,6 +77,7 @@ import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; public class AsyncIndexUpdate implements Runnable, Closeable { @@ -457,6 +459,7 @@ boolean threadNameChanged = false; String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of( "creator", AsyncIndexUpdate.class.getSimpleName(), + "created", afterTime, "thread", oldThreadName, "name", name)); NodeState after = store.retrieve(afterCheckpoint); @@ -509,6 +512,12 @@ checkpointToRelease); } } + // clean up every other minute + long now = System.currentTimeMillis(); + if (!indexStats.isFailing() + && TimeUnit.MILLISECONDS.toMinutes(now) % 2 == 0) { + cleanUpCheckpoints(); + } if (updatePostRunStatus) { postAsyncRunStatsStatus(indexStats); @@ -516,6 +525,38 @@ } } + void cleanUpCheckpoints() { + Set keep = Sets.newHashSet(); + String cp = indexStats.getReferenceCheckpoint(); + if (cp == null) { + log.warn("No reference checkpoint set in index stats"); + return; + } + keep.add(cp); + keep.addAll(indexStats.tempCps); + Map info = store.checkpointInfo(cp); + String value = info.get("created"); + if (value != null) { + // remove unreferenced AsyncIndexUpdate checkpoints: + // - without 'created' info (checkpoint created before OAK-4826) + // or + // - 'created' value older than the current reference + long current = ISO8601.parse(value).getTimeInMillis(); + for (String checkpoint : store.checkpoints()) { + info = store.checkpointInfo(checkpoint); + String creator = info.get("creator"); + String created = info.get("created"); + String name = info.get("name"); + if (!keep.contains(checkpoint) + && this.name.equals(name) + && AsyncIndexUpdate.class.getSimpleName().equals(creator) + && (created == null || ISO8601.parse(created).getTimeInMillis() < current)) { + store.release(checkpoint); + } + } + } + } + protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, String beforeCheckpoint, AsyncIndexStats indexStats, Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1762289) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -73,6 +73,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore; +import org.apache.jackrabbit.oak.stats.Clock; import org.junit.Test; import ch.qos.logback.classic.Level; @@ -678,6 +679,55 @@ } } + // OAK-4826 + @Test + public void cpCleanupOrphaned() throws Exception { + Clock clock = Clock.SIMPLE; + final MemoryNodeStore store = new MemoryNodeStore(); + // prepare index and initial content + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + String cp = store.listCheckpoints().iterator().next(); + Map info = store.checkpointInfo(cp); + + builder = store.getRoot().builder(); + builder.child("testRoot").setProperty("foo", "def"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // wait until currentTimeMillis() changes. this ensures + // the created value for the checkpoint is different + // from the previous checkpoint. + clock.waitUntil(clock.getTime() + 1); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + cp = store.listCheckpoints().iterator().next(); + + // create a new checkpoint with the info from the first checkpoint + // this simulates an orphaned checkpoint that should be cleaned up + assertNotNull(store.checkpoint(TimeUnit.HOURS.toMillis(1), info)); + assertTrue("Expecting two checkpoints", + store.listCheckpoints().size() == 2); + + async.cleanUpCheckpoints(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + assertEquals(cp, store.listCheckpoints().iterator().next()); + } + /** * OAK-2203 Test reindex behavior on an async index when the index provider is missing * for a given type