Index: oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (revision 1625245) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (working copy) @@ -84,4 +84,29 @@ */ long getUpdates(); + /** + * Returns the current reference checkpoint used by the async indexer + * + * @return the reference checkpoint + */ + String getReferenceCheckpoint(); + + /** + * Returns the processed checkpoint used by the async indexer. If this index + * round finishes successfully, the processed checkpoint will become the + * reference checkpoint, and the old reference checkpoint wil be released. + * + * @return the processed checkpoint + */ + String getProcessedCheckpoint(); + + /** + * Temporary checkpoints represent old checkpoints that have been processed + * but the cleanup was not successful of did not happen at all (like in the + * event the system was forcibly stopped). + * + * @return the already processed checkpoints + */ + String getTemporaryCheckpoints(); + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1625245) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -19,11 +19,11 @@ package org.apache.jackrabbit.oak.plugins.index; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE; import static org.apache.jackrabbit.oak.commons.PathUtils.elements; -import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; import java.util.Calendar; import java.util.HashSet; @@ -34,6 +34,7 @@ import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean; import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler; import org.apache.jackrabbit.oak.plugins.commit.ConflictHook; @@ -53,6 +54,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Objects; +import com.google.common.collect.Sets; public class AsyncIndexUpdate implements Runnable { @@ -141,13 +143,17 @@ private long updates = 0; - public AsyncUpdateCallback(String checkpoint) + private final String leaseName; + private final String tempCpName; + + public AsyncUpdateCallback(String checkpoint, String afterCheckpoint) throws CommitFailedException { long now = System.currentTimeMillis(); this.checkpoint = checkpoint; this.lease = now + 2 * ASYNC_TIMEOUT; + this.leaseName = name + "-lease"; + this.tempCpName = name + "-temp"; - String leaseName = name + "-lease"; NodeState root = store.getRoot(); long beforeLease = root.getChildNode(ASYNC).getLong(leaseName); if (beforeLease > now) { @@ -155,13 +161,46 @@ } NodeBuilder builder = root.builder(); - builder.child(ASYNC).setProperty(leaseName, lease); + NodeBuilder async = builder.child(ASYNC); + async.setProperty(leaseName, lease); + updateTempCheckpoints(async, checkpoint, afterCheckpoint); mergeWithConcurrencyCheck(builder, checkpoint, beforeLease); - //reset updates counter + // reset updates counter indexStats.setUpdates(this.updates); } + private void updateTempCheckpoints(NodeBuilder async, + String checkpoint, String afterCheckpoint) { + + indexStats.setReferenceCheckpoint(checkpoint); + indexStats.setProcessedCheckpoint(afterCheckpoint); + + // try to drop temp cps, add 'currentCp' to the temp cps list + Set temps = Sets.newHashSet(); + for (String cp : getStrings(async, tempCpName)) { + if (cp.equals(checkpoint)) { + continue; + } + boolean released = store.release(cp); + log.debug("Releasing temporary checkpoint {}: {}", cp, released); + if (!released) { + temps.add(cp); + } + } + temps.add(afterCheckpoint); + async.setProperty(tempCpName, temps, Type.STRINGS); + indexStats.setTempCheckpoints(temps); + } + + private Iterable getStrings(NodeBuilder b, String p) { + PropertyState ps = b.getProperty(p); + if (ps != null) { + return ps.getValue(Type.STRINGS); + } + return Sets.newHashSet(); + } + boolean isDirty() { return updates > 0; } @@ -169,7 +208,7 @@ void close() throws CommitFailedException { NodeBuilder builder = store.getRoot().builder(); NodeBuilder async = builder.child(ASYNC); - async.removeProperty(name + "-lease"); + async.removeProperty(leaseName); mergeWithConcurrencyCheck(builder, async.getString(name), lease); } @@ -182,7 +221,7 @@ if (now + ASYNC_TIMEOUT > lease) { long newLease = now + 2 * ASYNC_TIMEOUT; NodeBuilder builder = store.getRoot().builder(); - builder.child(ASYNC).setProperty(name + "-lease", newLease); + builder.child(ASYNC).setProperty(leaseName, newLease); mergeWithConcurrencyCheck(builder, checkpoint, lease); lease = newLease; } @@ -254,6 +293,9 @@ // otherwise the new checkpoint associated with the failed update // will get released in the finally block checkpointToRelease = beforeCheckpoint; + indexStats.setReferenceCheckpoint(afterCheckpoint); + indexStats.setProcessedCheckpoint(""); + indexStats.releaseTempCheckpoint(afterCheckpoint); } catch (CommitFailedException e) { if (e == CONCURRENT_UPDATE) { @@ -267,7 +309,10 @@ } finally { if (checkpointToRelease != null) { // null during initial indexing - store.release(checkpointToRelease); + if (!store.release(checkpointToRelease)) { + log.debug("Unable to reelase checkpoint {}", + checkpointToRelease); + } } } } @@ -282,7 +327,7 @@ // create an update callback for tracking index updates // and maintaining the update lease AsyncUpdateCallback callback = - new AsyncUpdateCallback(beforeCheckpoint); + new AsyncUpdateCallback(beforeCheckpoint, afterCheckpoint); try { NodeBuilder builder = store.getRoot().builder(); @@ -356,7 +401,7 @@ stats.start(now()); } - private static void postAsyncRunStatsStatus(AsyncIndexStats stats) { + private static void postAsyncRunStatsStatus(AsyncIndexStats stats) { stats.done(now()); } @@ -377,6 +422,9 @@ private String start = ""; private String done = ""; private String status = STATUS_INIT; + private String referenceCp = ""; + private String processedCp = ""; + private Set tempCps = new HashSet(); private volatile boolean isPaused; private volatile long updates; @@ -433,11 +481,44 @@ return updates; } + void setReferenceCheckpoint(String checkpoint) { + this.referenceCp = checkpoint; + } + + @Override + public String getReferenceCheckpoint() { + return referenceCp; + } + + void setProcessedCheckpoint(String checkpoint) { + this.processedCp = checkpoint; + } + + @Override + public String getProcessedCheckpoint() { + return processedCp; + } + + void setTempCheckpoints(Set tempCheckpoints) { + this.tempCps = tempCheckpoints; + } + + void releaseTempCheckpoint(String tempCheckpoint) { + this.tempCps.remove(tempCheckpoint); + } + + @Override + public String getTemporaryCheckpoints() { + return tempCps.toString(); + } + @Override public String toString() { return "AsyncIndexStats [start=" + start + ", done=" + done + ", status=" + status + ", paused=" + isPaused - + ", updates=" + updates + "]"; + + ", updates=" + updates + ", referenceCheckpoint=" + + referenceCp + ", processedCheckpoint=" + processedCp + + " ,tempCheckpoints=" + tempCps + " ]"; } }