Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1755975) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -201,9 +201,7 @@ private final NodeStore store; /** The base checkpoint */ - private final String checkpoint; - - private final String afterCheckpoint; + private String checkpoint; /** * Property name which stores the temporary checkpoint that need to be released on the next run @@ -223,21 +221,25 @@ /** Expiration time of the last lease we committed */ private long lease; + private boolean hasLease = false; + public AsyncUpdateCallback(NodeStore store, String name, - long leaseTimeOut, String checkpoint, String afterCheckpoint, + long leaseTimeOut, String checkpoint, AsyncIndexStats indexStats, AtomicBoolean forcedStop) { this.store = store; this.name = name; this.forcedStop = forcedStop; this.leaseTimeOut = leaseTimeOut; this.checkpoint = checkpoint; - this.afterCheckpoint = afterCheckpoint; this.tempCpName = getTempCpName(name); this.indexStats = indexStats; this.leaseName = leasify(name); } - protected void prepare() throws CommitFailedException { + protected void initLease() throws CommitFailedException { + if (hasLease) { + return; + } long now = System.currentTimeMillis(); this.lease = now + 2 * leaseTimeOut; @@ -251,6 +253,18 @@ NodeBuilder async = builder.child(ASYNC); async.setProperty(leaseName, lease); mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name); + hasLease = true; + } + + protected void prepare(String afterCheckpoint) + throws CommitFailedException { + if (!hasLease) { + initLease(); + } + NodeState root = store.getRoot(); + NodeBuilder builder = root.builder(); + NodeBuilder async = builder.child(ASYNC); + updateTempCheckpoints(async, checkpoint, afterCheckpoint); mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name); @@ -311,6 +325,10 @@ } } } + + public void setCheckpoint(String checkpoint) { + this.checkpoint = checkpoint; + } } @Override @@ -390,9 +408,29 @@ // find the last indexed state, and check if there are recent changes NodeState before; String beforeCheckpoint = async.getString(name); + AsyncUpdateCallback callback = newAsyncUpdateCallback(store, + name, leaseTimeOut, beforeCheckpoint, indexStats, + forcedStopFlag); if (beforeCheckpoint != null) { NodeState state = store.retrieve(beforeCheckpoint); if (state == null) { + // to make sure we're not reading a stale root rev, we're + // attempting a write+read via the lease-grab mechanics + try { + callback.initLease(); + } catch (CommitFailedException e) { + indexStats.failed(e); + return; + } + root = store.getRoot(); + beforeCheckpoint = root.getChildNode(ASYNC).getString(name); + if (beforeCheckpoint != null) { + state = store.retrieve(beforeCheckpoint); + callback.setCheckpoint(beforeCheckpoint); + } + } + + if (state == null) { log.warn( "[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update", name, beforeCheckpoint); @@ -436,8 +474,8 @@ log.trace("Switching thread name to {}", newThreadName); threadNameChanged = true; Thread.currentThread().setName(newThreadName); - updatePostRunStatus = updateIndex(before, beforeCheckpoint, - after, afterCheckpoint, afterTime); + updatePostRunStatus = updateIndex(before, beforeCheckpoint, after, + afterCheckpoint, afterTime, callback); // the update succeeded, i.e. it no longer fails if (indexStats.isFailing()) { @@ -479,23 +517,21 @@ protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, String beforeCheckpoint, - String afterCheckpoint, AsyncIndexStats indexStats, + AsyncIndexStats indexStats, AtomicBoolean stopFlag) { return new AsyncUpdateCallback(store, name, leaseTimeOut, - beforeCheckpoint, afterCheckpoint, indexStats, stopFlag); + beforeCheckpoint, indexStats, stopFlag); } - private boolean updateIndex(NodeState before, String beforeCheckpoint, - NodeState after, String afterCheckpoint, String afterTime) - throws CommitFailedException { + protected boolean updateIndex(NodeState before, String beforeCheckpoint, + NodeState after, String afterCheckpoint, String afterTime, + AsyncUpdateCallback callback) throws CommitFailedException { Stopwatch watch = Stopwatch.createStarted(); boolean updatePostRunStatus = true; boolean progressLogged = false; - // create an update callback for tracking index updates + // prepare the update callback for tracking index updates // and maintaining the update lease - AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name, - leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, forcedStopFlag); - callback.prepare(); + callback.prepare(afterCheckpoint); // check for index tasks split requests, if a split happened, make // sure to not delete the reference checkpoint, as the other index Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1755975) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -1067,13 +1067,13 @@ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, - String beforeCheckpoint, String afterCheckpoint, + String beforeCheckpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag) { try { asyncLock.acquire(); } catch (InterruptedException ignore) { } - return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint, + return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, indexStats, stopFlag); } }; @@ -1137,13 +1137,13 @@ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, - String beforeCheckpoint, String afterCheckpoint, + String beforeCheckpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag) { try { asyncLock.acquire(); } catch (InterruptedException ignore) { } - return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint, + return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, indexStats, stopFlag); } }; @@ -1211,9 +1211,9 @@ final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) { @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, - String beforeCheckpoint, String afterCheckpoint, + String beforeCheckpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag) { - return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint, + return new AsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, indexStats, stopFlag){ @Override @@ -1310,4 +1310,56 @@ fail("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds: "); } + @Test + public void greedyLeaseReindex() throws Exception { + + MemoryNodeStore store = new MemoryNodeStore(); + IndexEditorProvider provider = new PropertyIndexEditorProvider(); + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + AsyncIndexUpdate pre = new AsyncIndexUpdate("async", store, provider); + pre.run(); + pre.close(); + + // rm all cps to simulate 'missing cp scenario' + for (String cp : store.listCheckpoints()) { + store.release(cp); + } + + final AtomicBoolean greedyLease = new AtomicBoolean(false); + final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, + provider) { + @Override + protected AsyncUpdateCallback newAsyncUpdateCallback( + NodeStore store, String name, long leaseTimeOut, + String beforeCheckpoint, AsyncIndexStats indexStats, + AtomicBoolean stopFlag) { + return new AsyncUpdateCallback(store, name, leaseTimeOut, + beforeCheckpoint, indexStats, stopFlag) { + + @Override + protected void initLease() throws CommitFailedException { + greedyLease.set(true); + super.initLease(); + } + + @Override + protected void prepare(String afterCheckpoint) + throws CommitFailedException { + assertTrue(greedyLease.get()); + super.prepare(afterCheckpoint); + } + }; + } + }; + async.run(); + async.close(); + assertTrue(greedyLease.get()); + } + } Index: oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTest.java =================================================================== --- oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTest.java (revision 0) +++ oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTest.java (revision 0) @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.index; + +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.Editor; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Closer; + +public class AsyncIndexUpdateClusterTest { + + private DocumentNodeStore ns1; + private DocumentNodeStore ns2; + + private MemoryDocumentStore ds; + private MemoryBlobStore bs; + + private Random random = new Random(); + private final List values = ImmutableList.of("a", "b", "c", "d", + "e"); + + private Closer closer = Closer.create(); + private final AtomicBoolean illegalReindex = new AtomicBoolean(false); + + @Before + public void before() throws Exception { + ns1 = create(0); + ns2 = create(1); + } + + @After + public void after() { + shutdown(); + ns1.dispose(); + ns2.dispose(); + assertFalse("Reindexing should not happen", illegalReindex.get()); + } + + private void shutdown() { + try { + closer.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Test + public void missingCheckpointDueToEventualConsistency() throws Exception { + IndexStatusListener l = new IndexStatusListener(); + + AsyncIndexUpdate async1 = createAsync(ns1, l); + closer.register(async1); + AsyncIndexUpdate async2 = createAsync(ns2, l); + closer.register(async2); + + // Phase 1 - Base setup - Index definition creation and + // performing initial indexing + // Create index definition on NS1 + NodeBuilder b1 = ns1.getRoot().builder(); + createIndexDefinition(b1); + ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + // Trigger indexing on NS1 + async1.run(); + l.initDone(); + + ScheduledExecutorService executorService = Executors + .newScheduledThreadPool(5); + closer.register(new ExecutorCloser(executorService)); + + executorService.scheduleWithFixedDelay(async1, 1, 3, TimeUnit.SECONDS); + executorService.scheduleWithFixedDelay(async2, 1, 2, TimeUnit.SECONDS); + executorService.scheduleWithFixedDelay( + new PropertyMutator(ns1, "node1"), 500, 500, + TimeUnit.MILLISECONDS); + executorService.scheduleWithFixedDelay( + new PropertyMutator(ns2, "node2"), 500, 500, + TimeUnit.MILLISECONDS); + + for (int i = 0; i < 4 && !illegalReindex.get(); i++) { + TimeUnit.SECONDS.sleep(5); + } + shutdown(); + } + + private static AsyncIndexUpdate createAsync(DocumentNodeStore ns, + final IndexStatusListener l) { + IndexEditorProvider p = new TestEditorProvider( + new PropertyIndexEditorProvider(), l); + AsyncIndexUpdate aiu = new AsyncIndexUpdate("async", ns, p) { + protected boolean updateIndex(NodeState before, + String beforeCheckpoint, NodeState after, + String afterCheckpoint, String afterTime, + AsyncUpdateCallback callback) throws CommitFailedException { + if (MISSING_NODE == before) { + l.reindexing(); + } + return super.updateIndex(before, beforeCheckpoint, after, + afterCheckpoint, afterTime, callback); + } + }; + aiu.setCloseTimeOut(1); + return aiu; + } + + private static void createIndexDefinition(NodeBuilder builder) { + IndexUtils.createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, "async"); + } + + private DocumentNodeStore create(int clusterId) { + DocumentMK.Builder builder = new DocumentMK.Builder(); + if (ds == null) { + ds = new MemoryDocumentStore(); + } + if (bs == null) { + bs = new MemoryBlobStore(); + } + builder.setDocumentStore(ds).setBlobStore(bs); + + DocumentNodeStore store = builder.setClusterId(++clusterId) + .setLeaseCheck(false).open().getNodeStore(); + return store; + } + + private class PropertyMutator implements Runnable { + private final NodeStore nodeStore; + private final String nodeName; + + public PropertyMutator(NodeStore nodeStore, String nodeName) { + this.nodeStore = nodeStore; + this.nodeName = nodeName; + } + + @Override + public void run() { + NodeBuilder b = nodeStore.getRoot().builder(); + b.child(nodeName).setProperty("foo", + values.get(random.nextInt(values.size()))); + try { + nodeStore.merge(b, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } catch (CommitFailedException e) { + e.printStackTrace(); + } + } + } + + private class IndexStatusListener { + + boolean reindexOk = true; + + public void reindexing() { + if (!reindexOk) { + illegalReindex.set(true); + shutdown(); + } + } + + public void initDone() { + reindexOk = false; + } + + public void waitRandomly() { + try { + TimeUnit.SECONDS.sleep(random.nextInt(1)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static class TestEditorProvider implements IndexEditorProvider { + private final IndexEditorProvider delegate; + private final IndexStatusListener listener; + + private TestEditorProvider(IndexEditorProvider delegate, + IndexStatusListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public Editor getIndexEditor(@Nonnull String type, + @Nonnull NodeBuilder definition, @Nonnull NodeState root, + @Nonnull IndexUpdateCallback callback) + throws CommitFailedException { + Editor e = delegate + .getIndexEditor(type, definition, root, callback); + if (e != null) { + e = new TestEditor(e, listener); + } + return e; + } + } + + private static class TestEditor implements Editor { + private final Editor editor; + private final TestEditor parent; + private final IndexStatusListener listener; + + TestEditor(Editor editor, IndexStatusListener listener) { + this(editor, listener, null); + } + + TestEditor(Editor editor, IndexStatusListener listener, + TestEditor parent) { + this.editor = editor; + this.listener = listener; + this.parent = parent; + } + + @Override + public void enter(NodeState before, NodeState after) + throws CommitFailedException { + if (MISSING_NODE == before && parent == null) { + listener.reindexing(); + } + editor.enter(before, after); + } + + @Override + public void leave(NodeState before, NodeState after) + throws CommitFailedException { + listener.waitRandomly(); + editor.leave(before, after); + } + + @Override + public void propertyAdded(PropertyState after) + throws CommitFailedException { + editor.propertyAdded(after); + } + + @Override + public void propertyChanged(PropertyState before, PropertyState after) + throws CommitFailedException { + editor.propertyChanged(before, after); + } + + @Override + public void propertyDeleted(PropertyState before) + throws CommitFailedException { + editor.propertyDeleted(before); + } + + @Override + public Editor childNodeAdded(String name, NodeState after) + throws CommitFailedException { + return createChildEditor(editor.childNodeAdded(name, after), name); + } + + @Override + public Editor childNodeChanged(String name, NodeState before, + NodeState after) throws CommitFailedException { + return createChildEditor( + editor.childNodeChanged(name, before, after), name); + } + + @Override + public Editor childNodeDeleted(String name, NodeState before) + throws CommitFailedException { + return createChildEditor(editor.childNodeDeleted(name, before), + name); + } + + private TestEditor createChildEditor(Editor editor, String name) { + if (editor == null) { + return null; + } else { + return new TestEditor(editor, listener, this); + } + } + } +} \ No newline at end of file Index: oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java =================================================================== --- oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (revision 1755975) +++ oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (working copy) @@ -400,10 +400,10 @@ @Override protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut, String checkpoint, - String afterCheckpoint, AsyncIndexStats indexStats, + AsyncIndexStats indexStats, AtomicBoolean stopFlag) { return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut, - checkpoint, afterCheckpoint, indexStats, stopFlag, listener); + checkpoint, indexStats, stopFlag, listener); } } @@ -412,16 +412,16 @@ private IndexStatusListener listener; public SpecialAsyncUpdateCallback(NodeStore store, String name, - long leaseTimeOut, String checkpoint, String afterCheckpoint, + long leaseTimeOut, String checkpoint, AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) { - super(store, name, leaseTimeOut, checkpoint, afterCheckpoint, indexStats, stopFlag); + super(store, name, leaseTimeOut, checkpoint, indexStats, stopFlag); this.listener = listener; } @Override - protected void prepare() throws CommitFailedException { + protected void prepare(String afterCheckpoint) throws CommitFailedException { listener.prePrepare(); - super.prepare(); + super.prepare(afterCheckpoint); listener.postPrepare(); }