diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java index 9f8dbb6..7952cb3 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java @@ -247,8 +247,11 @@ public class AsyncIndexUpdate implements Runnable, Closeable { private List validatorProviders = Collections.emptyList(); - /** Expiration time of the last lease we committed */ - private long lease; + /** + * Expiration time of the last lease we committed, null if lease is + * disabled + */ + private Long lease = null; private boolean hasLease = false; @@ -269,19 +272,29 @@ public class AsyncIndexUpdate implements Runnable, Closeable { if (hasLease) { return; } - long now = System.currentTimeMillis(); - this.lease = now + 2 * leaseTimeOut; - NodeState root = store.getRoot(); - long beforeLease = root.getChildNode(ASYNC).getLong(leaseName); - if (beforeLease > now) { - throw CONCURRENT_UPDATE; - } + NodeState async = root.getChildNode(ASYNC); + if(isLeaseCheckEnabled(leaseTimeOut)) { + long now = System.currentTimeMillis(); + this.lease = now + 2 * leaseTimeOut; + long beforeLease = async.getLong(leaseName); + if (beforeLease > now) { + throw CONCURRENT_UPDATE; + } - NodeBuilder builder = root.builder(); - NodeBuilder async = builder.child(ASYNC); - async.setProperty(leaseName, lease); - mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, beforeLease, name); + NodeBuilder builder = root.builder(); + builder.child(ASYNC).setProperty(leaseName, lease); + mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, beforeLease, name); + } else { + lease = null; + // remove stale lease info if needed + if (async.hasProperty(leaseName)) { + NodeBuilder builder = root.builder(); + builder.child(ASYNC).removeProperty(leaseName); + mergeWithConcurrencyCheck(store, validatorProviders, + builder, checkpoint, null, name); + } + } hasLease = true; } @@ -330,10 +343,13 @@ public class AsyncIndexUpdate implements Runnable, Closeable { } void close() throws CommitFailedException { - NodeBuilder builder = store.getRoot().builder(); - NodeBuilder async = builder.child(ASYNC); - async.removeProperty(leaseName); - mergeWithConcurrencyCheck(store, validatorProviders, builder, async.getString(name), lease, name); + if (isLeaseCheckEnabled(leaseTimeOut)) { + NodeBuilder builder = store.getRoot().builder(); + NodeBuilder async = builder.child(ASYNC); + async.removeProperty(leaseName); + mergeWithConcurrencyCheck(store, validatorProviders, builder, + async.getString(name), lease, name); + } } @Override @@ -343,7 +359,7 @@ public class AsyncIndexUpdate implements Runnable, Closeable { throw INTERRUPTED; } - if (indexStats.incUpdates() % 100 == 0) { + if (indexStats.incUpdates() % 100 == 0 && isLeaseCheckEnabled(leaseTimeOut)) { long now = System.currentTimeMillis(); if (now + leaseTimeOut > lease) { long newLease = now + 2 * leaseTimeOut; @@ -422,17 +438,19 @@ public class AsyncIndexUpdate implements Runnable, Closeable { log.debug("[{}] Running background index task", name); NodeState root = store.getRoot(); - - // check for concurrent updates NodeState async = root.getChildNode(ASYNC); - long leaseEndTime = async.getLong(leasify(name)); - long currentTime = System.currentTimeMillis(); - if (leaseEndTime > currentTime) { - long leaseExpMsg = (leaseEndTime - currentTime) / 1000; - String err = String.format("Another copy of the index update is already running; skipping this update. " + - "Time left for lease to expire %d s. Indexing can resume by %tT", leaseExpMsg, leaseEndTime); - indexStats.failed(new Exception(err, CONCURRENT_UPDATE)); - return; + + if (isLeaseCheckEnabled(leaseTimeOut)) { + // check for concurrent updates + long leaseEndTime = async.getLong(leasify(name)); + long currentTime = System.currentTimeMillis(); + if (leaseEndTime > currentTime) { + long leaseExpMsg = (leaseEndTime - currentTime) / 1000; + String err = String.format("Another copy of the index update is already running; skipping this update. " + + "Time left for lease to expire %d s. Indexing can resume by %tT", leaseExpMsg, leaseEndTime); + indexStats.failed(new Exception(err, CONCURRENT_UPDATE)); + return; + } } // start collecting runtime statistics @@ -702,8 +720,12 @@ public class AsyncIndexUpdate implements Runnable, Closeable { return name + "-temp"; } + private static boolean isLeaseCheckEnabled(long leaseTimeOut) { + return leaseTimeOut > 0; + } + private static void mergeWithConcurrencyCheck(final NodeStore store, List validatorProviders, - NodeBuilder builder, final String checkpoint, final long lease, + NodeBuilder builder, final String checkpoint, final Long lease, final String name) throws CommitFailedException { CommitHook concurrentUpdateCheck = new CommitHook() { @Override @Nonnull @@ -712,9 +734,9 @@ public class AsyncIndexUpdate implements Runnable, Closeable { throws CommitFailedException { // check for concurrent updates by this async task NodeState async = before.getChildNode(ASYNC); - if ((checkpoint == null || Objects.equal(checkpoint, - async.getString(name))) - && lease == async.getLong(leasify(name))) { + if ((checkpoint == null || Objects.equal(checkpoint, async.getString(name))) + && + (lease == null || lease == async.getLong(leasify(name)))) { return after; } else { throw CONCURRENT_UPDATE; @@ -1217,7 +1239,7 @@ public class AsyncIndexUpdate implements Runnable, Closeable { this.newIndexTaskName = newIndexTaskName; } - void maybeSplit(@CheckForNull String refCheckpoint, long lease) + void maybeSplit(@CheckForNull String refCheckpoint, Long lease) throws CommitFailedException { if (paths == null) { return; @@ -1225,7 +1247,7 @@ public class AsyncIndexUpdate implements Runnable, Closeable { split(refCheckpoint, lease); } - private void split(@CheckForNull String refCheckpoint, long lease) throws CommitFailedException { + private void split(@CheckForNull String refCheckpoint, Long lease) throws CommitFailedException { NodeBuilder builder = store.getRoot().builder(); if (refCheckpoint != null) { String tempCpName = getTempCpName(name); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java index cedcb43..f6d9a2a 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java @@ -36,6 +36,7 @@ import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider; import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider; +import org.apache.jackrabbit.oak.spi.state.Clusterable; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider; @@ -97,6 +98,11 @@ public class AsyncIndexerService { long leaseTimeOutMin = PropertiesUtil.toInteger(config.get(PROP_LEASE_TIME_OUT), PROP_LEASE_TIMEOUT_DEFAULT); + if (!(nodeStore instanceof Clusterable)){ + leaseTimeOutMin = 0; + log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing"); + } + for (AsyncConfig c : asyncIndexerConfig) { AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, indexEditorProvider, statisticsProvider, false); diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java index 1773983..00499a0 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdateTest.CommitInfoCollector; @@ -35,6 +37,7 @@ import org.apache.jackrabbit.oak.spi.commit.CommitContext; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider; +import org.apache.jackrabbit.oak.spi.state.Clusterable; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.apache.jackrabbit.oak.stats.StatisticsProvider; @@ -53,19 +56,13 @@ public class AsyncIndexerServiceTest { @Rule public final OsgiContext context = new OsgiContext(); - private MemoryNodeStore nodeStore = new MemoryNodeStore(); + private MemoryNodeStore nodeStore = new FakeClusterableMemoryNodeStore(); private AsyncIndexerService service = new AsyncIndexerService(); - @Before - public void setUp() { - context.registerService(StatisticsProvider.class, StatisticsProvider.NOOP); - context.registerService(NodeStore.class, nodeStore); - context.registerService(ValidatorProvider.class, new ChangeCollectorProvider()); - MockOsgi.injectServices(service, context.bundleContext()); - } @Test public void asyncReg() throws Exception{ + injectDefaultServices(); Map config = ImmutableMap.of( "asyncConfigs", new String[] {"async:5"} ); @@ -79,6 +76,7 @@ public class AsyncIndexerServiceTest { @Test public void leaseTimeout() throws Exception{ + injectDefaultServices(); Map config = ImmutableMap.of( "asyncConfigs", new String[] {"async:5"}, "leaseTimeOutMinutes" , "20" @@ -90,6 +88,7 @@ public class AsyncIndexerServiceTest { @Test public void changeCollectionEnabled() throws Exception{ + injectDefaultServices(); Map config = ImmutableMap.of( "asyncConfigs", new String[] {"async:5"} ); @@ -117,8 +116,18 @@ public class AsyncIndexerServiceTest { assertNotNull(changeSet); } - private AsyncIndexUpdate getIndexUpdate(String name) { - return (AsyncIndexUpdate) context.getServices(Runnable.class, "(oak.async="+name+")")[0]; + @Test + public void nonClusterableNodeStoreAndLeaseTimeout() throws Exception{ + nodeStore = new MemoryNodeStore(); + injectDefaultServices(); + + Map config = ImmutableMap.of( + "asyncConfigs", new String[] {"async:5"}, + "leaseTimeOutMinutes" , "20" + ); + MockOsgi.activate(service, context.bundleContext(), config); + AsyncIndexUpdate indexUpdate = getIndexUpdate("async"); + assertEquals(0, indexUpdate.getLeaseTimeOut()); } @Test @@ -133,4 +142,23 @@ public class AsyncIndexerServiceTest { assertEquals("foo", configs.get(1).name); assertEquals(23, configs.get(1).timeIntervalInSecs); } + + private void injectDefaultServices() { + context.registerService(StatisticsProvider.class, StatisticsProvider.NOOP); + context.registerService(NodeStore.class, nodeStore); + context.registerService(ValidatorProvider.class, new ChangeCollectorProvider()); + MockOsgi.injectServices(service, context.bundleContext()); + } + + private AsyncIndexUpdate getIndexUpdate(String name) { + return (AsyncIndexUpdate) context.getServices(Runnable.class, "(oak.async="+name+")")[0]; + } + + private static class FakeClusterableMemoryNodeStore extends MemoryNodeStore implements Clusterable { + @Nonnull + @Override + public String getInstanceId() { + return "foo"; + } + } } \ No newline at end of file diff --git a/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java b/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java index df72e53..b656155 100644 --- a/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java +++ b/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java @@ -22,6 +22,7 @@ import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefi import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -349,6 +350,52 @@ public class AsyncIndexUpdateLeaseTest extends OakBaseTest { .setLeaseTimeOut(lease)); } + + @Test + public void testLeaseDisabled() throws Exception { + // take care of initial reindex before + AsyncIndexUpdate async = new AsyncIndexUpdate(name, store, provider).setLeaseTimeOut(0); + async.run(); + + testContent(store); + assertRunOk(async); + + testContent(store); + assertRunOk(async); + + executed.set(true); + } + + @Test + public void testLeaseExpiredToDisabled() throws Exception { + // take care of initial reindex before + new AsyncIndexUpdate(name, store, provider).run(); + + // add extra indexed content + testContent(store); + + // make it look like lease got stuck due to force shutdown + NodeBuilder builder = store.getRoot().builder(); + builder.getChildNode(AsyncIndexUpdate.ASYNC).setProperty( + AsyncIndexUpdate.leasify(name), + System.currentTimeMillis() + 500000); + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + final IndexStatusListener l1 = new IndexStatusListener() { + + @Override + protected void postIndexUpdate() { + executed.set(true); + } + }; + assertRunOk(new SpecialAsyncIndexUpdate(name, store, provider, l1) + .setLeaseTimeOut(0)); + + assertFalse("Stale lease info must be cleaned", + store.getRoot().getChildNode(AsyncIndexUpdate.ASYNC) + .hasProperty(AsyncIndexUpdate.leasify(name))); + } + // ------------------------------------------------------------------- private static String getReferenceCp(NodeStore store, String name) {