Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java (revision 1777538) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java (working copy) @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugins.index; +import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -34,13 +35,19 @@ import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; +import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean; import org.apache.jackrabbit.oak.commons.PropertiesUtil; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; +import org.apache.jackrabbit.oak.plugins.index.property.jmx.PropertyIndexAsyncReindex; +import org.apache.jackrabbit.oak.plugins.index.property.jmx.PropertyIndexAsyncReindexMBean; import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider; import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider; import org.apache.jackrabbit.oak.spi.state.Clusterable; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration; +import org.apache.jackrabbit.oak.spi.whiteboard.Registration; import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; +import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor; import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider; import org.apache.jackrabbit.oak.stats.StatisticsProvider; import org.osgi.framework.BundleContext; @@ -48,6 +55,7 @@ import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; @Component( policy = ConfigurationPolicy.REQUIRE, @@ -110,6 +118,8 @@ private final Closer closer = Closer.create(); + private WhiteboardExecutor executor; + @Activate public void activate(BundleContext bundleContext, Map config) { List asyncIndexerConfig = getAsyncConfig(PropertiesUtil.toStringArray(config.get @@ -117,6 +127,8 @@ Whiteboard whiteboard = new OsgiWhiteboard(bundleContext); indexRegistration = new IndexMBeanRegistration(whiteboard); indexEditorProvider.start(whiteboard); + executor = new WhiteboardExecutor(); + executor.start(whiteboard); long leaseTimeOutMin = PropertiesUtil.toInteger(config.get(PROP_LEASE_TIME_OUT), PROP_LEASE_TIMEOUT_DEFAULT); @@ -137,15 +149,38 @@ indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs); closer.register(task); } + registerAsyncReindexSupport(whiteboard); log.info("Configured async indexers {} ", asyncIndexerConfig); log.info("Lease time: {} mins and AsyncIndexUpdate configured with {}", leaseTimeOutMin, validatorProvider.getClass().getName()); } + private void registerAsyncReindexSupport(Whiteboard whiteboard) { + // async reindex + String name = IndexConstants.ASYNC_REINDEX_VALUE; + AsyncIndexUpdate task = new AsyncIndexUpdate(name, nodeStore, indexEditorProvider, statisticsProvider, true); + PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(task, executor); + + final Registration reg = new CompositeRegistration( + registerMBean(whiteboard, PropertyIndexAsyncReindexMBean.class, asyncPI, + PropertyIndexAsyncReindexMBean.TYPE, "async"), + registerMBean(whiteboard, IndexStatsMBean.class, task.getIndexStats(), IndexStatsMBean.TYPE, name)); + closer.register(new Closeable() { + @Override + public void close() throws IOException { + reg.unregister(); + } + }); + } + @Deactivate public void deactivate() throws IOException { if (indexRegistration != null) { indexRegistration.unregister(); } + if (executor != null) { + executor.stop(); + executor = null; + } //Close the task *after* unregistering the jobs closer.close();