Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1762606) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -30,6 +30,7 @@ import java.io.Closeable; import java.util.Calendar; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -175,6 +176,11 @@ private boolean closed; + /** + * The time in minutes since the epoch when the last checkpoint cleanup ran. + */ + private long lastCheckpointCleanUpTime; + public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store, @Nonnull IndexEditorProvider provider, boolean switchOnSync) { this.name = checkNotNull(name); @@ -457,6 +463,7 @@ boolean threadNameChanged = false; String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of( "creator", AsyncIndexUpdate.class.getSimpleName(), + "created", afterTime, "thread", oldThreadName, "name", name)); NodeState after = store.retrieve(afterCheckpoint); @@ -509,6 +516,7 @@ checkpointToRelease); } } + maybeCleanUpCheckpoints(); if (updatePostRunStatus) { postAsyncRunStatsStatus(indexStats); @@ -516,6 +524,57 @@ } } + private void maybeCleanUpCheckpoints() { + // clean up every five minutes + long currentMinutes = TimeUnit.MILLISECONDS.toMinutes( + System.currentTimeMillis()); + if (!indexStats.isFailing() + && lastCheckpointCleanUpTime + 5 < currentMinutes) { + try { + cleanUpCheckpoints(); + } catch (Throwable e) { + log.warn("Checkpoint clean up failed", e); + } + lastCheckpointCleanUpTime = currentMinutes; + } + } + + void cleanUpCheckpoints() { + log.debug("Cleaning up orphaned checkpoints"); + Set keep = newHashSet(); + String cp = indexStats.getReferenceCheckpoint(); + if (cp == null) { + log.warn("No reference checkpoint set in index stats"); + return; + } + keep.add(cp); + keep.addAll(indexStats.tempCps); + Map info = store.checkpointInfo(cp); + String value = info.get("created"); + if (value != null) { + // remove unreferenced AsyncIndexUpdate checkpoints: + // - without 'created' info (checkpoint created before OAK-4826) + // or + // - 'created' value older than the current reference + long current = ISO8601.parse(value).getTimeInMillis(); + for (String checkpoint : store.checkpoints()) { + info = store.checkpointInfo(checkpoint); + String creator = info.get("creator"); + String created = info.get("created"); + String name = info.get("name"); + if (!keep.contains(checkpoint) + && this.name.equals(name) + && AsyncIndexUpdate.class.getSimpleName().equals(creator) + && (created == null || ISO8601.parse(created).getTimeInMillis() < current)) { + if (store.release(checkpoint)) { + log.info("Removed orphaned checkpoint '{}' {}", + checkpoint, info); + } + } + } + } + } + protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, String beforeCheckpoint, AsyncIndexStats indexStats, Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1762606) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -73,6 +73,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore; +import org.apache.jackrabbit.oak.stats.Clock; import org.junit.Test; import ch.qos.logback.classic.Level; @@ -678,6 +679,55 @@ } } + // OAK-4826 + @Test + public void cpCleanupOrphaned() throws Exception { + Clock clock = Clock.SIMPLE; + final MemoryNodeStore store = new MemoryNodeStore(); + // prepare index and initial content + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + String cp = store.listCheckpoints().iterator().next(); + Map info = store.checkpointInfo(cp); + + builder = store.getRoot().builder(); + builder.child("testRoot").setProperty("foo", "def"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // wait until currentTimeMillis() changes. this ensures + // the created value for the checkpoint is different + // from the previous checkpoint. + clock.waitUntil(clock.getTime() + 1); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + cp = store.listCheckpoints().iterator().next(); + + // create a new checkpoint with the info from the first checkpoint + // this simulates an orphaned checkpoint that should be cleaned up + assertNotNull(store.checkpoint(TimeUnit.HOURS.toMillis(1), info)); + assertTrue("Expecting two checkpoints", + store.listCheckpoints().size() == 2); + + async.cleanUpCheckpoints(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + assertEquals(cp, store.listCheckpoints().iterator().next()); + } + /** * OAK-2203 Test reindex behavior on an async index when the index provider is missing * for a given type