Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java (revision 1859435) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java (working copy) @@ -32,6 +32,12 @@ String REINDEX_PROPERTY_NAME = "reindex"; String REINDEX_COUNT = "reindexCount"; + + /** + * If set, queries will wait until the index is ready (until this property + * is removed or the reindex flag is false). + */ + String WAIT_FOR_INDEX = "waitForIndex"; String REINDEX_ASYNC_PROPERTY_NAME = "reindex-async"; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (revision 1859435) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java (working copy) @@ -297,6 +297,9 @@ definition.setProperty(ASYNC_PROPERTY_NAME, ASYNC_REINDEX_VALUE); } else { + if (definition.hasProperty(IndexConstants.WAIT_FOR_INDEX)) { + definition.removeProperty(IndexConstants.WAIT_FOR_INDEX); + } definition.setProperty(REINDEX_PROPERTY_NAME, false); incrementReIndexCount(definition); removeIndexState(definition); Index: oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java =================================================================== --- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java (revision 1859435) +++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java (working copy) @@ -28,6 +28,8 @@ import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.commons.PerfLogger; import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService; +import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate; +import org.apache.jackrabbit.oak.plugins.index.IndexConstants; import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory; import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory; import org.apache.jackrabbit.oak.plugins.index.lucene.reader.LuceneIndexReaderFactory; @@ -81,6 +83,8 @@ private volatile boolean refresh; + private boolean waitForIndexTimeout; + public IndexTracker() { this((IndexCopier)null); } @@ -262,7 +266,12 @@ } @Nullable - public LuceneIndexDefinition getIndexDefinition(String indexPath){ + public LuceneIndexDefinition getIndexDefinition(String indexPath) { + return getIndexDefinition(indexPath, false); + } + + @Nullable + public LuceneIndexDefinition getIndexDefinition(String indexPath, boolean waitForIndex) { LuceneIndexNodeManager indexNodeManager = indices.get(indexPath); if (indexNodeManager != null) { // Accessing the definition should not require @@ -271,20 +280,74 @@ } // fallback - create definition from scratch NodeState node = NodeStateUtils.getNode(root, indexPath); - if (!node.exists()) { + if (!isLuceneIndexNode(node)) { return null; } + if (waitForIndex) { + node = waitForIndex(node, indexPath); + if (node == null) { + return null; + } + } + // only if there exists a stored index definition if (!node.hasChildNode(INDEX_DEFINITION_NODE)) { return null; } - if (!isLuceneIndexNode(node)) { - return null; - } // this will internally use the stored index definition return new LuceneIndexDefinition(root, node, indexPath); } + /** + * If needed, wait until the index is available, that is, as long as the the + * property "waitForIndex" is set to "true" and at the same time "reindex" + * is set to "true". The "waitForIndex" property is removed once indexing + * has completed. It can also be removed or disabled manually. + * + * @param node the node state of the index definition + * @param indexPath the index path + * @return the (possibly new) node state of the index definition + */ + private NodeState waitForIndex(NodeState node, String indexPath) { + if (!node.hasProperty(IndexConstants.WAIT_FOR_INDEX)) { + // the normal case + return node; + } + if (waitForIndexTimeout) { + return node; + } + if (!root.hasChildNode(":async")) { + return node; + } + long waitUntil = System.currentTimeMillis() + 120 * 1000; + while (node.hasProperty(IndexConstants.WAIT_FOR_INDEX) && + node.getBoolean(IndexConstants.WAIT_FOR_INDEX) && + node.hasProperty(IndexConstants.REINDEX_PROPERTY_NAME) && + node.getBoolean(IndexConstants.REINDEX_PROPERTY_NAME) + ) { + log.info("Waiting for the index " + indexPath + " to be indexed (as long as the properties " + + IndexConstants.WAIT_FOR_INDEX + " and " + + IndexConstants.REINDEX_PROPERTY_NAME + " are true)."); + try { + // wait one second + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + if (System.currentTimeMillis() > waitUntil) { + log.info("Waiting for the index " + indexPath + " stopped after 2 minutes"); + waitForIndexTimeout = true; + break; + } + // the root may have changed, so we need to get the node state again + node = NodeStateUtils.getNode(root, indexPath); + if (!isLuceneIndexNode(node)) { + return null; + } + } + return node; + } + public Set getIndexNodePaths(){ return indices.keySet(); } Index: oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java =================================================================== --- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java (revision 1859435) +++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java (working copy) @@ -1612,7 +1612,7 @@ @Override public LuceneIndexDefinition getDefinition() { checkNotReleased(); - return tracker.getIndexDefinition(indexPath); + return tracker.getIndexDefinition(indexPath, true); } private LuceneIndexNode getIndexNode() { Index: oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneIndexHelper.java =================================================================== --- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneIndexHelper.java (revision 1859435) +++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneIndexHelper.java (working copy) @@ -137,6 +137,6 @@ } public static boolean isLuceneIndexNode(NodeState node){ - return IndexHelper.isIndexNodeOfType(node, TYPE_LUCENE); + return node.exists() && IndexHelper.isIndexNodeOfType(node, TYPE_LUCENE); } } Index: oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/WaitForLuceneIndexTest.java =================================================================== --- oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/WaitForLuceneIndexTest.java (nonexistent) +++ oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/WaitForLuceneIndexTest.java (working copy) @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import static org.apache.jackrabbit.oak.InitialContentHelper.INITIAL_CONTENT; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jcr.RepositoryException; +import javax.jcr.query.Query; +import javax.security.auth.login.LoginException; + +import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.QueryEngine; +import org.apache.jackrabbit.oak.api.Result; +import org.apache.jackrabbit.oak.api.ResultRow; +import org.apache.jackrabbit.oak.api.Root; +import org.apache.jackrabbit.oak.api.Tree; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.commons.junit.TemporarySystemProperty; +import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate; +import org.apache.jackrabbit.oak.plugins.index.IndexConstants; +import org.apache.jackrabbit.oak.plugins.index.counter.NodeCounterEditorProvider; +import org.apache.jackrabbit.oak.plugins.index.lucene.util.IndexDefinitionBuilder; +import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider; +import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; +import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; +import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; +import org.apache.jackrabbit.oak.query.QueryEngineSettings; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider; +import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; +import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import com.google.common.io.Closer; + +/** + * Test the "waitForIndex" feature. + */ +public class WaitForLuceneIndexTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target")); + + @Rule + public TemporarySystemProperty systemProperty = new TemporarySystemProperty(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private Closer closer; + + private Root root; + private QueryEngine qe; + private Whiteboard wb; + + private void createRepository() throws RepositoryException, IOException, LoginException { + ExecutorService executorService = Executors.newFixedThreadPool(2); + closer.register(new ExecutorCloser(executorService)); + IndexCopier copier = new IndexCopier(executorService, temporaryFolder.getRoot()); + LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider(copier); + LuceneIndexProvider queryIndexProvider = new LuceneIndexProvider(copier); + + QueryEngineSettings qeSettings = new QueryEngineSettings(); + + NodeStore nodeStore = new MemoryNodeStore(INITIAL_CONTENT); + Oak oak = new Oak(nodeStore) + .with(new OpenSecurityProvider()) + .with((QueryIndexProvider) queryIndexProvider) + .with((Observer) queryIndexProvider) + .with(new PropertyIndexEditorProvider()) + .with(new NodeTypeIndexProvider()) + .with(new NodeCounterEditorProvider()) + .with(editorProvider) + .with(qeSettings) + //Effectively disable async indexing auto run + //such that we can control run timing as per test requirement + .withAsyncIndexing("async", TimeUnit.DAYS.toSeconds(1)); + + wb = oak.getWhiteboard(); + root = oak.createContentRepository().login(null, null).getLatestRoot(); + qe = root.getQueryEngine(); + } + + @Before + public void setup() throws Exception { + closer = Closer.create(); + createRepository(); + createData(); + runAsyncIndex(); + createIndex(); + } + + @After + public void after() throws IOException { + closer.close(); + IndexDefinition.setDisableStoredIndexDefinition(false); + } + + private void createIndex() throws CommitFailedException { + IndexDefinitionBuilder idxBuilder = new IndexDefinitionBuilder(); + idxBuilder + .async("async") + .indexRule("nt:base") + .property("cons").propertyIndex(); + Tree oi = root.getTree("/oak:index"); + Tree t = idxBuilder.getBuilderTree(); + t.setProperty(LuceneIndexConstants.COMPAT_MODE, 2); + t.setProperty(IndexConstants.WAIT_FOR_INDEX, true); + idxBuilder.build(oi.addChild("testIndex")); + root.commit(); + } + + private void runAsyncIndex() { + AsyncIndexUpdate async = (AsyncIndexUpdate) WhiteboardUtils.getService(wb, + Runnable.class, input -> input instanceof AsyncIndexUpdate); + assertNotNull(async); + async.run(); + if (async.isFailing()) { + fail("AsyncIndexUpdate failed"); + } + root.refresh(); + } + + private void createData() throws CommitFailedException { + Tree par = root.getTree("/").addChild("parent"); + for (int i = 0; i < 10; i++) { + par.addChild("c" + i).setProperty("cons", "val"); + } + root.commit(); + } + + @Test + public void waitForIndex() throws Exception { + createData(); + String query = "SELECT * FROM [nt:base] WHERE [cons] = 'val' option(index name testIndex)"; + final AtomicBoolean done = new AtomicBoolean(); + Thread t = new Thread() { + public void run() { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + done.set(true); + runAsyncIndex(); + } + }; + t.start(); + Result result = qe.executeQuery(query, Query.JCR_SQL2, QueryEngine.NO_BINDINGS, QueryEngine.NO_MAPPINGS); + // this is only enabled after the 100 ms delay - + // so we are sure the query was waiting for at least one second + assertTrue(done.get()); + t.join(); + Iterator rows = result.getRows().iterator(); + // get few rows to open the cursor + for (int i = 0; i < 10; i++) { + assertTrue("Insufficient result rows. Current iteration count: " + i, rows.hasNext()); + rows.next(); + } + } +}