From 5248201b31583adeb1a4a24fb9d6ffde382e0073 Mon Sep 17 00:00:00 2001
From: Vikas Saurabh <vsaurabh@adobe.com>
Date: Sat, 7 Jul 2018 19:42:15 +0530
Subject: [PATCH 2/3] OAK-7495: async,sync index not synchronous

Add a test
---
 .../index/lucene/LucenePropertyIndexTest.java | 136 +++++++++++++++++-
 1 file changed, 133 insertions(+), 3 deletions(-)

diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
index 8728ab664a..99e18ff47c 100644
--- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
@@ -36,8 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 import javax.jcr.PropertyType;
@@ -58,9 +60,11 @@ import org.apache.jackrabbit.oak.Oak;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.ContentRepository;
+import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.api.PropertyValue;
 import org.apache.jackrabbit.oak.api.Result;
 import org.apache.jackrabbit.oak.api.ResultRow;
+import org.apache.jackrabbit.oak.api.Root;
 import org.apache.jackrabbit.oak.api.Tree;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
@@ -73,10 +77,15 @@ import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
 import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText.ExtractionResult;
 import org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.CopyOnReadDirectory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.DocumentQueue;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexObserver;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.util.IndexDefinitionBuilder;
 import org.apache.jackrabbit.oak.plugins.index.lucene.util.fv.SimSearchUtils;
 import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.reference.ReferenceEditorProvider;
 import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
@@ -86,11 +95,14 @@ import org.apache.jackrabbit.oak.query.AbstractQueryTest;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
+import org.apache.jackrabbit.oak.spi.mount.Mounts;
 import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.apache.jackrabbit.util.ISO8601;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
@@ -100,6 +112,8 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.google.common.collect.ImmutableSet.of;
 import static com.google.common.collect.Lists.newArrayList;
@@ -153,6 +167,7 @@ import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("ArraysAsListWithZeroOrOneArgument")
 public class LucenePropertyIndexTest extends AbstractQueryTest {
+    private static final Logger LOG = LoggerFactory.getLogger(LucenePropertyIndexTest.class);
     /**
      * Set the size to twice the batch size to test the pagination with sorting
      */
@@ -176,6 +191,8 @@ public class LucenePropertyIndexTest extends AbstractQueryTest {
 
     private ResultCountingIndexProvider queryIndexProvider;
 
+    ContentRepository repository = null;
+
     @After
     public void after() {
         new ExecutorCloser(executorService).close();
@@ -190,20 +207,36 @@ public class LucenePropertyIndexTest extends AbstractQueryTest {
     @Override
     protected ContentRepository createRepository() {
         IndexCopier copier = createIndexCopier();
-        editorProvider = new LuceneIndexEditorProvider(copier, new ExtractedTextCache(10* FileUtils.ONE_MB, 100));
-        provider = new LuceneIndexProvider(copier);
+        MountInfoProvider mountInfoProvider = Mounts.defaultMountInfoProvider();
+        StatisticsProvider statsProvider = StatisticsProvider.NOOP;
+        NRTIndexFactory nrtFactory = new NRTIndexFactory(copier, statsProvider);
+        IndexTracker tracker = new IndexTracker(new DefaultIndexReaderFactory(mountInfoProvider, copier), nrtFactory);
+        ExtractedTextCache extractedTextCache = new ExtractedTextCache(10* FileUtils.ONE_MB, 100);
+
+        DocumentQueue queue = new DocumentQueue(1000, tracker, executorService, statsProvider);
+        LocalIndexObserver localIndexObserver = new LocalIndexObserver(queue, statsProvider);
+
+        editorProvider = new LuceneIndexEditorProvider(copier, tracker, extractedTextCache, null, mountInfoProvider);
+        editorProvider.setIndexingQueue(queue);
+
+        provider = new LuceneIndexProvider(tracker);
         queryIndexProvider = new ResultCountingIndexProvider(provider);
+
         nodeStore = new MemoryNodeStore();
-        return new Oak(nodeStore)
+        repository = new Oak(nodeStore)
                 .with(new InitialContent())
                 .with(new OpenSecurityProvider())
                 .with(queryIndexProvider)
+                .with(localIndexObserver)
                 .with((Observer) provider)
                 .with(editorProvider)
                 .with(optionalEditorProvider)
                 .with(new PropertyIndexEditorProvider())
+                .with(new ReferenceEditorProvider())
                 .with(new NodeTypeIndexProvider())
                 .createContentRepository();
+
+        return repository;
     }
 
     private IndexCopier createIndexCopier() {
@@ -256,6 +289,103 @@ public class LucenePropertyIndexTest extends AbstractQueryTest {
         executorService.shutdown();
     }
 
+    @Test
+    public void asyncSyncIndex() throws Exception {
+        Tree idx = root.getTree("/").addChild(INDEX_DEFINITIONS_NAME).addChild("jobIndex");
+        IndexDefinitionBuilder defBuilder = new IndexDefinitionBuilder();
+        defBuilder.indexRule("nt:base").property("job-id").propertyIndex();
+        defBuilder.build(idx);
+        idx.setProperty("async", of("async", "sync"), STRINGS);
+        root.commit();
+
+        root.getTree("/").addChild("test").setProperty("job-id", "bar");
+        root.commit();
+
+        // Make sure our index is being used
+        String query = "select * from [nt:base] where [job-id]='bar'";
+        String explanation = explain(query);
+        assertTrue("Incorrect index used. Plan: " + explanation, explanation.contains("lucene:jobIndex"));
+
+        // Make sure we are getting result without running async cycle
+        assertQuery(query, asList("/test"));
+
+        final ConcurrentLinkedQueue<String> jobIds = new ConcurrentLinkedQueue<>();
+        AtomicBoolean producerDone = new AtomicBoolean(false);
+        AtomicBoolean consumerDone = new AtomicBoolean(false);
+        final ConcurrentLinkedQueue<Throwable> exceptionList = new ConcurrentLinkedQueue<>();
+
+        Thread jobProducer = new Thread(() -> {
+            ContentSession pSession;
+            Root pRoot;
+            try {
+                pSession = repository.login(null, null);
+                pRoot = pSession.getLatestRoot();
+
+                for (int i = 0; i < 5000 && !consumerDone.get(); i++) {
+                    String jobId = "job" + i;
+                    pRoot.getTree("/").addChild("child-" + jobId).setProperty("job-id", jobId);
+                    pRoot.commit();
+                    LOG.info("Saved job: {}", jobId);
+
+                    jobIds.add(jobId);
+
+//                    //sleep a bit
+//                    try {
+//                        Thread.sleep(10);
+//                    } catch (Exception e) {
+//                        //ignored
+//                    }
+                }
+            } catch (Exception e) {
+                exceptionList.add(e);
+            } finally {
+                producerDone.set(true);
+            }
+        });
+        jobProducer.setDaemon(true);
+        jobProducer.setName("jobProducer");
+
+        Thread jobConsumer = new Thread(() -> {
+            try {
+                while (!producerDone.get() || !jobIds.isEmpty()) {
+                    if (!jobIds.isEmpty()) {
+                        String jobId = jobIds.remove();
+                        root = session.getLatestRoot();
+                        qe = root.getQueryEngine();
+                        String cQuery = "select * from [nt:base] where [job-id] = '" + jobId + "'";
+                        assertQuery(cQuery, asList("/child-" + jobId));
+//                    } else {
+//                        try {
+//                            Thread.sleep(20);
+//                        } catch (Exception e) {
+//                            //ignored
+//                        }
+                    }
+                }
+            } catch (AssertionError ae) {
+                exceptionList.add(ae);
+            } catch (Exception e) {
+                exceptionList.add(e);
+            } finally {
+                consumerDone.set(true);
+            }
+        });
+        jobConsumer.setDaemon(true);
+        jobConsumer.setName("jobConsumer");
+
+        jobConsumer.start();
+        jobProducer.start();
+
+        jobConsumer.join();
+        jobProducer.join();
+
+//        root = session.getLatestRoot();
+//        qe = root.getQueryEngine();
+//        List<String> allJobs = executeQuery("SELECT [jcr:path] FROM [nt:base] WHERE [job-id] IS NOT NULL", SQL2);
+//        System.out.println(allJobs);
+        assertEquals("Exceptions thrown: " + exceptionList, 0, exceptionList.size());
+    }
+
     @Test
     public void fulltextSearchWithCustomAnalyzer() throws Exception {
         Tree idx = createFulltextIndex(root.getTree("/"), "test");
-- 
2.17.1

