Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1755975) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -201,9 +201,7 @@ private final NodeStore store; /** The base checkpoint */ - private final String checkpoint; - - private final String afterCheckpoint; + private String checkpoint; /** * Property name which stores the temporary checkpoint that need to be released on the next run @@ -223,21 +221,25 @@ /** Expiration time of the last lease we committed */ private long lease; + private boolean hasLease = false; + public AsyncUpdateCallback(NodeStore store, String name, - long leaseTimeOut, String checkpoint, String afterCheckpoint, + long leaseTimeOut, String checkpoint, AsyncIndexStats indexStats, AtomicBoolean forcedStop) { this.store = store; this.name = name; this.forcedStop = forcedStop; this.leaseTimeOut = leaseTimeOut; this.checkpoint = checkpoint; - this.afterCheckpoint = afterCheckpoint; this.tempCpName = getTempCpName(name); this.indexStats = indexStats; this.leaseName = leasify(name); } - protected void prepare() throws CommitFailedException { + protected void initLease() throws CommitFailedException { + if (hasLease) { + return; + } long now = System.currentTimeMillis(); this.lease = now + 2 * leaseTimeOut; @@ -251,6 +253,18 @@ NodeBuilder async = builder.child(ASYNC); async.setProperty(leaseName, lease); mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name); + hasLease = true; + } + + protected void prepare(String afterCheckpoint) + throws CommitFailedException { + if (!hasLease) { + initLease(); + } + NodeState root = store.getRoot(); + NodeBuilder builder = root.builder(); + NodeBuilder async = builder.child(ASYNC); + updateTempCheckpoints(async, checkpoint, afterCheckpoint); mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name); @@ -311,6 +325,10 @@ } } } + + public void setCheckpoint(String checkpoint) { + this.checkpoint = checkpoint; + } } @Override @@ -390,9 +408,29 @@ // find the last indexed state, and check if there are recent changes NodeState before; String beforeCheckpoint = async.getString(name); + AsyncUpdateCallback callback = newAsyncUpdateCallback(store, + name, leaseTimeOut, beforeCheckpoint, indexStats, + forcedStopFlag); if (beforeCheckpoint != null) { NodeState state = store.retrieve(beforeCheckpoint); if (state == null) { + // to make sure we're not reading a stale root rev, we're + // attempting a write+read via the lease-grab mechanics + try { + callback.initLease(); + } catch (CommitFailedException e) { + indexStats.failed(e); + return; + } + root = store.getRoot(); + beforeCheckpoint = root.getChildNode(ASYNC).getString(name); + if (beforeCheckpoint != null) { + state = store.retrieve(beforeCheckpoint); + callback.setCheckpoint(beforeCheckpoint); + } + } + + if (state == null) { log.warn( "[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update", name, beforeCheckpoint); @@ -436,8 +474,8 @@ log.trace("Switching thread name to {}", newThreadName); threadNameChanged = true; Thread.currentThread().setName(newThreadName); - updatePostRunStatus = updateIndex(before, beforeCheckpoint, - after, afterCheckpoint, afterTime); + updatePostRunStatus = updateIndex(before, beforeCheckpoint, after, + afterCheckpoint, afterTime, callback); // the update succeeded, i.e. it no longer fails if (indexStats.isFailing()) { @@ -479,23 +517,21 @@ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, String beforeCheckpoint, - String afterCheckpoint, AsyncIndexStats indexStats, + AsyncIndexStats indexStats, AtomicBoolean stopFlag) { return new AsyncUpdateCallback(store, name, leaseTimeOut, - beforeCheckpoint, afterCheckpoint, indexStats, stopFlag); + beforeCheckpoint, indexStats, stopFlag); } private boolean updateIndex(NodeState before, String beforeCheckpoint, - NodeState after, String afterCheckpoint, String afterTime) - throws CommitFailedException { + NodeState after, String afterCheckpoint, String afterTime, + AsyncUpdateCallback callback) throws CommitFailedException { Stopwatch watch = Stopwatch.createStarted(); boolean updatePostRunStatus = true; boolean progressLogged = false; - // create an update callback for tracking index updates + // prepare the update callback for tracking index updates // and maintaining the update lease - AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name, - leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, forcedStopFlag); - callback.prepare(); + callback.prepare(afterCheckpoint); // check for index tasks split requests, if a split happened, make // sure to not delete the reference checkpoint, as the other index Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1755975) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -1067,13 +1067,13 @@ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, - String beforeCheckpoint, String afterCheckpoint, + String beforeCheckpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag) { try { asyncLock.acquire(); } catch (InterruptedException ignore) { } - return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint, + return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, indexStats, stopFlag); } }; @@ -1137,13 +1137,13 @@ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, - String beforeCheckpoint, String afterCheckpoint, + String beforeCheckpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag) { try { asyncLock.acquire(); } catch (InterruptedException ignore) { } - return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint, + return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, indexStats, stopFlag); } }; @@ -1211,9 +1211,9 @@ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, - String beforeCheckpoint, String afterCheckpoint, + String beforeCheckpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag) { - return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint, + return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, indexStats, stopFlag){ @Override @@ -1310,4 +1310,56 @@ fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: "); } + @Test + public void greedyLeaseReindex() throws Exception { + + MemoryNodeStore store = new MemoryNodeStore(); + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider); + pre.run(); + pre.close(); + + // rm all cps to simulate 'missing cp scenario' + for (String cp : store.listCheckpoints()) { + store.release(cp); + } + + final AtomicBoolean greedyLease = new AtomicBoolean(false); + final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, + provider) { + @Override + protected AsyncUpdateCallback newAsyncUpdateCallback( + NodeStore store, String name, long leaseTimeOut, + String beforeCheckpoint, AsyncIndexStats indexStats, + AtomicBoolean stopFlag) { + return new AsyncUpdateCallback(store, name, leaseTimeOut, + beforeCheckpoint, indexStats, stopFlag) { + + @Override + protected void initLease() throws CommitFailedException { + greedyLease.set(true); + super.initLease(); + } + + @Override + protected void prepare(String afterCheckpoint) + throws CommitFailedException { + assertTrue(greedyLease.get()); + super.prepare(afterCheckpoint); + } + }; + } + }; + async.run(); + async.close(); + assertTrue(greedyLease.get()); + } + }