Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1718290) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -95,7 +95,7 @@ * timed out. Another node in cluster would wait for timeout before * taking over a running job */ - private static final long ASYNC_TIMEOUT; + private static final long DEFAULT_ASYNC_TIMEOUT; static { int value = 15; @@ -105,7 +105,7 @@ } catch (NumberFormatException e) { // use default } - ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(value); + DEFAULT_ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(value); } private final String name; @@ -137,15 +137,12 @@ private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy(); - /** - * Property name which stores the temporary checkpoint that need to be released on the next run - */ - private final String tempCpName; - private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter(); private IndexMBeanRegistration mbeanRegistration; + private long leaseTimeOut; + /** * Controls the length of the interval (in minutes) at which an indexing * error is logged as 'warning'. for the rest of the indexing cycles errors @@ -158,10 +155,10 @@ @Nonnull IndexEditorProvider provider, boolean switchOnSync) { this.name = checkNotNull(name); this.lastIndexedTo = name + "-LastIndexedTo"; - this.tempCpName = name + "-temp"; this.store = checkNotNull(store); this.provider = checkNotNull(provider); this.switchOnSync = switchOnSync; + this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT; } public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store, @@ -175,22 +172,47 @@ * * @see OAK-1292 */ - private class AsyncUpdateCallback implements IndexUpdateCallback { + protected static class AsyncUpdateCallback implements IndexUpdateCallback { + + private final NodeStore store; /** The base checkpoint */ private final String checkpoint; + private final String afterCheckpoint; + + /** + * Property name which stores the temporary checkpoint that need to be released on the next run + */ + private final String tempCpName; + + private final long leaseTimeOut; + + private final String name; + + private final String leaseName; + + private final AsyncIndexStats indexStats; + /** Expiration time of the last lease we committed */ private long lease; - private final String leaseName; + public AsyncUpdateCallback(NodeStore store, String name, + long leaseTimeOut, String checkpoint, String afterCheckpoint, + AsyncIndexStats indexStats) { + this.store = store; + this.name = name; + this.leaseTimeOut = leaseTimeOut; + this.checkpoint = checkpoint; + this.afterCheckpoint = afterCheckpoint; + this.tempCpName = getTempCpName(name); + this.indexStats = indexStats; + this.leaseName = leasify(name); + } - public AsyncUpdateCallback(String checkpoint, String afterCheckpoint) - throws CommitFailedException { + protected void prepare() throws CommitFailedException { long now = System.currentTimeMillis(); - this.checkpoint = checkpoint; - this.lease = now + 2 * ASYNC_TIMEOUT; - this.leaseName = name + "-lease"; + this.lease = now + 2 * leaseTimeOut; NodeState root = store.getRoot(); long beforeLease = root.getChildNode(ASYNC).getLong(leaseName); @@ -202,7 +224,7 @@ NodeBuilder async = builder.child(ASYNC); async.setProperty(leaseName, lease); updateTempCheckpoints(async, checkpoint, afterCheckpoint); - mergeWithConcurrencyCheck(builder, checkpoint, beforeLease); + mergeWithConcurrencyCheck(store, builder, checkpoint, beforeLease, name); // reset updates counter indexStats.resetUpdates(); @@ -240,18 +262,18 @@ NodeBuilder builder = store.getRoot().builder(); NodeBuilder async = builder.child(ASYNC); async.removeProperty(leaseName); - mergeWithConcurrencyCheck(builder, async.getString(name), lease); + mergeWithConcurrencyCheck(store, builder, async.getString(name), lease, name); } @Override public void indexUpdate() throws CommitFailedException { if (indexStats.incUpdates() % 100 == 0) { long now = System.currentTimeMillis(); - if (now + ASYNC_TIMEOUT > lease) { - long newLease = now + 2 * ASYNC_TIMEOUT; + if (now + leaseTimeOut > lease) { + long newLease = now + 2 * leaseTimeOut; NodeBuilder builder = store.getRoot().builder(); builder.child(ASYNC).setProperty(leaseName, newLease); - mergeWithConcurrencyCheck(builder, checkpoint, lease); + mergeWithConcurrencyCheck(store, builder, checkpoint, lease, name); lease = newLease; } } @@ -270,12 +292,13 @@ // check for concurrent updates NodeState async = root.getChildNode(ASYNC); - long leaseEndTime = async.getLong(name + "-lease"); + long leaseEndTime = async.getLong(leasify(name)); long currentTime = System.currentTimeMillis(); if (leaseEndTime > currentTime) { - log.debug( - "[{}] Another copy of the index update is already running; skipping this update. Time left for lease to expire {}s", - name, (leaseEndTime - currentTime) / 1000); + long leaseExpMsg = (leaseEndTime - currentTime) / 1000; + String err = "Another copy of the index update is already running; skipping this update. Time left for lease to expire " + + leaseExpMsg + "s"; + indexStats.failed(new Exception(err, CONCURRENT_UPDATE)); return; } @@ -331,7 +354,6 @@ log.trace("Switching thread name to {}", newThreadName); threadNameChanged = true; Thread.currentThread().setName(newThreadName); - updatePostRunStatus = updateIndex(before, beforeCheckpoint, after, afterCheckpoint, afterTime); @@ -373,8 +395,14 @@ } } - private boolean updateIndex( - NodeState before, String beforeCheckpoint, + protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, + String name, long leaseTimeOut, String beforeCheckpoint, + String afterCheckpoint, AsyncIndexStats indexStats) { + return new AsyncUpdateCallback(store, name, leaseTimeOut, + beforeCheckpoint, afterCheckpoint, indexStats); + } + + private boolean updateIndex(NodeState before, String beforeCheckpoint, NodeState after, String afterCheckpoint, String afterTime) throws CommitFailedException { Stopwatch watch = Stopwatch.createStarted(); @@ -382,8 +410,9 @@ boolean progressLogged = false; // create an update callback for tracking index updates // and maintaining the update lease - AsyncUpdateCallback callback = - new AsyncUpdateCallback(beforeCheckpoint, afterCheckpoint); + AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name, + leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats); + callback.prepare(); // check for index tasks split requests, if a split happened, make // sure to not delete the reference checkpoint, as the other index @@ -431,7 +460,8 @@ } updatePostRunStatus = true; } - mergeWithConcurrencyCheck(builder, beforeCheckpoint, callback.lease); + mergeWithConcurrencyCheck(store, builder, beforeCheckpoint, + callback.lease, name); if (indexUpdate.isReindexingPerformed()) { log.info("[{}] Reindexing completed for indexes: {} in {}", name, indexUpdate.getReindexStats(), watch); @@ -454,9 +484,17 @@ return updatePostRunStatus; } - private void mergeWithConcurrencyCheck( - NodeBuilder builder, final String checkpoint, final long lease) - throws CommitFailedException { + private static String leasify(String name) { + return name + "-lease"; + } + + private static String getTempCpName(String name) { + return name + "-temp"; + } + + private static void mergeWithConcurrencyCheck(final NodeStore store, + NodeBuilder builder, final String checkpoint, final long lease, + final String name) throws CommitFailedException { CommitHook concurrentUpdateCheck = new CommitHook() { @Override @Nonnull public NodeState processCommit( @@ -465,7 +503,7 @@ // check for concurrent updates by this async task NodeState async = before.getChildNode(ASYNC); if (checkpoint == null || Objects.equal(checkpoint, async.getString(name)) - && lease == async.getLong(name + "-lease")) { + && lease == async.getLong(leasify(name))) { return after; } else { throw CONCURRENT_UPDATE; @@ -488,6 +526,14 @@ } } + /** + * Milliseconds for the timeout + */ + protected AsyncIndexUpdate setLeaseTimeOut(long leaseTimeOut) { + this.leaseTimeOut = leaseTimeOut; + return this; + } + private static void preAsyncRunStatsStats(AsyncIndexStats stats) { stats.start(now()); } @@ -919,6 +965,7 @@ private void split(@CheckForNull String refCheckpoint, long lease) throws CommitFailedException { NodeBuilder builder = store.getRoot().builder(); if (refCheckpoint != null) { + String tempCpName = getTempCpName(name); NodeBuilder async = builder.child(ASYNC); // add new reference async.setProperty(newIndexTaskName, refCheckpoint); @@ -948,7 +995,7 @@ } if (!updated.isEmpty()) { - mergeWithConcurrencyCheck(builder, refCheckpoint, lease); + mergeWithConcurrencyCheck(store, builder, refCheckpoint, lease, name); log.info( "[{}] Successfully split index definitions {} to async task named {} with referenced checkpoint {}.", name, updated, newIndexTaskName, refCheckpoint); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (revision 0) @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index; + +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats; +import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncUpdateCallback; +import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; +import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; + +public class AsyncIndexUpdateLeaseTest { + + // TODO scenarios + // * rendexing + // * more lease expired tests ? + + private final String name = "async"; + private MemoryNodeStore store; + private IndexEditorProvider provider; + + private final AtomicBoolean executed = new AtomicBoolean(false); + + @Before + public void setup() throws Exception { + store = new MemoryNodeStore(); + provider = new PropertyIndexEditorProvider(); + + NodeBuilder builder = store.getRoot().builder(); + createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME), + "rootIndex", true, false, ImmutableSet.of("foo"), null) + .setProperty(ASYNC_PROPERTY_NAME, name); + builder.child("testRoot").setProperty("foo", "abc"); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + executed.set(false); + } + + @After + public void cleanup() throws Exception { + assertTrue("Test method was not executed", executed.get()); + String referenced = getReferenceCp(store, name); + assertNotNull("Reference checkpoint doesn't exist", referenced); + assertNotNull( + "Failed indexer must not clean successful indexer's checkpoint", + store.retrieve(referenced)); + } + + @Test + public void testPrePrepare() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void prePrepare() { + executed.set(true); + assertRunOk(new AsyncIndexUpdate(name, store, provider)); + } + }; + assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)); + } + + @Test + public void testPostPrepare() { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void postPrepare() { + executed.set(true); + // lease must prevent this run + assertRunKo(new AsyncIndexUpdate(name, store, provider)); + } + }; + assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1)); + } + + @Test + public void testPreIndexUpdate() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + testContent(store); + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void preIndexUpdate() { + executed.set(true); + assertRunKo(new AsyncIndexUpdate(name, store, provider)); + } + }; + assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1)); + } + + @Test + public void testPostIndexUpdate() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + testContent(store); + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void postIndexUpdate() { + executed.set(true); + assertRunKo(new AsyncIndexUpdate(name, store, provider)); + } + }; + assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1)); + } + + @Test + public void testPreClose() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + testContent(store); + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void preClose() { + executed.set(true); + assertRunKo(new AsyncIndexUpdate(name, store, provider)); + } + }; + assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1)); + } + + @Test + public void testPreIndexUpdateLeaseExpired() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + // add extra indexed content + testContent(store); + + final long lease = 50; + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void preIndexUpdate() { + executed.set(true); + try { + TimeUnit.MILLISECONDS.sleep(lease * 3); + } catch (InterruptedException e) { + // + } + assertRunOk(new AsyncIndexUpdate(name, store, provider)); + } + }; + + assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1) + .setLeaseTimeOut(lease)); + } + + @Test + public void testPostIndexUpdateLeaseExpired() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + // add extra indexed content + testContent(store); + + final long lease = 50; + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void postIndexUpdate() { + executed.set(true); + try { + TimeUnit.MILLISECONDS.sleep(lease * 3); + } catch (InterruptedException e) { + // + } + assertRunOk(new AsyncIndexUpdate(name, store, provider)); + } + }; + + assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1) + .setLeaseTimeOut(lease)); + } + + @Test + public void testPrePrepareRexinde() throws Exception { + + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void prePrepare() { + executed.set(true); + assertRunOk(new AsyncIndexUpdate(name, store, provider)); + } + }; + assertRunKo(new SpecialAsyncIndexUpdate(name, store, provider, l1)); + } + + private static String getReferenceCp(NodeStore store, String name) { + return store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC) + .getString(name); + } + + private void assertRunOk(AsyncIndexUpdate a) { + assertRun(a, false); + } + + private void assertRunKo(AsyncIndexUpdate a) { + assertRun(a, true); + assertConcurrentUpdate(a.getIndexStats()); + } + + private void assertRun(AsyncIndexUpdate a, boolean status) { + a.run(); + assertEquals("Unexpected failiure flag", status, a.isFailing()); + } + + private void assertConcurrentUpdate(AsyncIndexStats stats) { + assertTrue("Error must be of type 'Concurrent update'", stats + .getLatestError().contains("Concurrent update detected")); + } + + private static void testContent(NodeStore store) throws Exception { + NodeBuilder builder = store.getRoot().builder(); + builder.child("testRoot").setProperty("foo", + "abc " + System.currentTimeMillis()); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + } + + private static class SpecialAsyncIndexUpdate extends AsyncIndexUpdate { + + private final IndexStatusListener listener; + + public SpecialAsyncIndexUpdate(String name, NodeStore store, + IndexEditorProvider provider, IndexStatusListener listener) { + super(name, store, provider); + this.listener = listener; + } + + @Override + public synchronized void run() { + super.run(); + } + + @Override + protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, + String name, long leaseTimeOut, String checkpoint, + String afterCheckpoint, AsyncIndexStats indexStats) { + return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut, + checkpoint, afterCheckpoint, indexStats, listener); + } + } + + private static class SpecialAsyncUpdateCallback extends AsyncUpdateCallback { + + private IndexStatusListener listener; + + public SpecialAsyncUpdateCallback(NodeStore store, String name, + long leaseTimeOut, String checkpoint, String afterCheckpoint, + AsyncIndexStats indexStats, IndexStatusListener listener) { + super(store, name, leaseTimeOut, checkpoint, afterCheckpoint, + indexStats); + this.listener = listener; + } + + @Override + protected void prepare() throws CommitFailedException { + listener.prePrepare(); + super.prepare(); + listener.postPrepare(); + } + + @Override + public void indexUpdate() throws CommitFailedException { + listener.preIndexUpdate(); + super.indexUpdate(); + listener.postIndexUpdate(); + } + + @Override + void close() throws CommitFailedException { + listener.preClose(); + super.close(); + listener.postClose(); + } + + } + + private abstract static class IndexStatusListener { + + protected void prePrepare() { + } + + protected void postPrepare() { + } + + protected void preIndexUpdate() { + } + + protected void postIndexUpdate() { + } + + protected void preClose() { + } + + protected void postClose() { + } + } + +}