Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1762606) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -30,6 +30,7 @@ import java.io.Closeable; import java.util.Calendar; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -175,6 +176,28 @@ private boolean closed; + /** + * The checkpoint cleanup interval in minutes. Defaults to 5 minutes. + * Setting it to a negative value disables automatic cleanup. See OAK-4826. + */ + private final int cleanupIntervalMinutes = getCleanupIntervalMinutes(); + + private static int getCleanupIntervalMinutes() { + int value = 5; + try { + value = Integer.parseInt(System.getProperty( + "oak.async.checkpointCleanupIntervalMinutes", String.valueOf(value))); + } catch (NumberFormatException e) { + // use default + } + return value; + } + + /** + * The time in minutes since the epoch when the last checkpoint cleanup ran. + */ + private long lastCheckpointCleanUpTime; + public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store, @Nonnull IndexEditorProvider provider, boolean switchOnSync) { this.name = checkNotNull(name); @@ -457,6 +480,7 @@ boolean threadNameChanged = false; String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of( "creator", AsyncIndexUpdate.class.getSimpleName(), + "created", afterTime, "thread", oldThreadName, "name", name)); NodeState after = store.retrieve(afterCheckpoint); @@ -509,6 +533,7 @@ checkpointToRelease); } } + maybeCleanUpCheckpoints(); if (updatePostRunStatus) { postAsyncRunStatsStatus(indexStats); @@ -516,6 +541,58 @@ } } + private void maybeCleanUpCheckpoints() { + // clean up every five minutes + long currentMinutes = TimeUnit.MILLISECONDS.toMinutes( + System.currentTimeMillis()); + if (!indexStats.isFailing() + && cleanupIntervalMinutes > -1 + && lastCheckpointCleanUpTime + cleanupIntervalMinutes < currentMinutes) { + try { + cleanUpCheckpoints(); + } catch (Throwable e) { + log.warn("Checkpoint clean up failed", e); + } + lastCheckpointCleanUpTime = currentMinutes; + } + } + + void cleanUpCheckpoints() { + log.debug("Cleaning up orphaned checkpoints"); + Set keep = newHashSet(); + String cp = indexStats.getReferenceCheckpoint(); + if (cp == null) { + log.warn("No reference checkpoint set in index stats"); + return; + } + keep.add(cp); + keep.addAll(indexStats.tempCps); + Map info = store.checkpointInfo(cp); + String value = info.get("created"); + if (value != null) { + // remove unreferenced AsyncIndexUpdate checkpoints: + // - without 'created' info (checkpoint created before OAK-4826) + // or + // - 'created' value older than the current reference + long current = ISO8601.parse(value).getTimeInMillis(); + for (String checkpoint : store.checkpoints()) { + info = store.checkpointInfo(checkpoint); + String creator = info.get("creator"); + String created = info.get("created"); + String name = info.get("name"); + if (!keep.contains(checkpoint) + && this.name.equals(name) + && AsyncIndexUpdate.class.getSimpleName().equals(creator) + && (created == null || ISO8601.parse(created).getTimeInMillis() < current)) { + if (store.release(checkpoint)) { + log.info("Removed orphaned checkpoint '{}' {}", + checkpoint, info); + } + } + } + } + } + protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, String beforeCheckpoint, AsyncIndexStats indexStats, Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1762606) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -73,6 +73,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore; +import org.apache.jackrabbit.oak.stats.Clock; import org.junit.Test; import ch.qos.logback.classic.Level; @@ -678,6 +679,77 @@ } } + // OAK-4826 + @Test + public void cpCleanupOrphaned() throws Exception { + Clock clock = Clock.SIMPLE; + MemoryNodeStore store = new MemoryNodeStore(); + // prepare index and initial content + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + assertTrue("Expecting no checkpoints", + store.listCheckpoints().size() == 0); + + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + String cp = store.listCheckpoints().iterator().next(); + Map info = store.checkpointInfo(cp); + + builder = store.getRoot().builder(); + builder.child("testRoot").setProperty("foo", "def"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // wait until currentTimeMillis() changes. this ensures + // the created value for the checkpoint is different + // from the previous checkpoint. + clock.waitUntil(clock.getTime() + 1); + async.run(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + cp = store.listCheckpoints().iterator().next(); + + // create a new checkpoint with the info from the first checkpoint + // this simulates an orphaned checkpoint that should be cleaned up + assertNotNull(store.checkpoint(TimeUnit.HOURS.toMillis(1), info)); + assertTrue("Expecting two checkpoints", + store.listCheckpoints().size() == 2); + + async.cleanUpCheckpoints(); + assertTrue("Expecting one checkpoint", + store.listCheckpoints().size() == 1); + assertEquals(cp, store.listCheckpoints().iterator().next()); + } + + @Test + public void disableCheckpointCleanup() throws Exception { + String propertyName = "oak.async.checkpointCleanupIntervalMinutes"; + MemoryNodeStore store = new MemoryNodeStore(); + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + try { + System.setProperty(propertyName, "-1"); + final AtomicBoolean cleaned = new AtomicBoolean(); + AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { + @Override + void cleanUpCheckpoints() { + cleaned.set(true); + super.cleanUpCheckpoints(); + } + }; + async.run(); + assertFalse(cleaned.get()); + } finally { + System.clearProperty(propertyName); + } + } + /** * OAK-2203 Test reindex behavior on an async index when the index provider is missing * for a given type