diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/HybridIndexPlan.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/HybridIndexPlan.java new file mode 100644 index 0000000..aa3446f --- /dev/null +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/HybridIndexPlan.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import org.apache.jackrabbit.oak.spi.query.Filter; +import org.apache.jackrabbit.oak.spi.query.QueryIndex; +import org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +import javax.annotation.Nonnull; +import java.util.List; + +public class HybridIndexPlan implements IndexPlan { + + private final IndexPlan asyncLucenePlan; + + private final IndexPlan memoryLucenePlan; + + private Filter filter; + + public HybridIndexPlan(@Nonnull IndexPlan asyncLucenePlan, @Nonnull IndexPlan memoryLucenePlan) { + this.asyncLucenePlan = asyncLucenePlan; + this.memoryLucenePlan = memoryLucenePlan; + this.filter = asyncLucenePlan.getFilter(); + } + + @Override + public double getCostPerExecution() { + return asyncLucenePlan.getCostPerExecution(); + } + + @Override + public double getCostPerEntry() { + return asyncLucenePlan.getCostPerEntry(); + } + + @Override + public long getEstimatedEntryCount() { + return asyncLucenePlan.getEstimatedEntryCount() + memoryLucenePlan.getEstimatedEntryCount(); + } + + @Override + public Filter getFilter() { + return filter; + } + + @Override + public void setFilter(Filter filter) { + this.filter = filter; + } + + @Override + public boolean isDelayed() { + return false; + } + + @Override + public boolean isFulltextIndex() { + return asyncLucenePlan.isFulltextIndex(); + } + + @Override + public boolean includesNodeData() { + return asyncLucenePlan.includesNodeData() && memoryLucenePlan.includesNodeData(); + } + + @Override + public List getSortOrder() { + return asyncLucenePlan.getSortOrder(); + } + + @Override + public NodeState getDefinition() { + return asyncLucenePlan.getDefinition(); + } + + @Override + public String getPathPrefix() { + return asyncLucenePlan.getPathPrefix(); + } + + @Override + public Filter.PropertyRestriction getPropertyRestriction() { + return asyncLucenePlan.getPropertyRestriction(); + } + + @Override + public IndexPlan copy() { + try { + return (IndexPlan) super.clone(); + } catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } + + @Override + public Object getAttribute(String name) { + return asyncLucenePlan.getAttribute(name); + } + + @Override + public String getPlanName() { + return asyncLucenePlan.getPlanName() + "," + memoryLucenePlan.getPlanName(); + } + + IndexPlan getAsyncLucenePlan() { + return asyncLucenePlan; + } + + IndexPlan getMemoryLucenePlan() { + return memoryLucenePlan; + } +} diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java index 2f5462b..f5443bf 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java @@ -234,6 +234,8 @@ public final class IndexDefinition implements Aggregate.AggregateMapper { @Nullable private final String uid; + private final boolean isHybridIndex; + public IndexDefinition(NodeState root, NodeBuilder defn) { this(root, defn.getBaseState(), defn); } @@ -303,6 +305,8 @@ public final class IndexDefinition implements Aggregate.AggregateMapper { this.secureFacets = defn.hasChildNode(FACETS) && getOptionalValue(defn.getChildNode(FACETS), PROP_SECURE_FACETS, true); this.suggestEnabled = evaluateSuggestionEnabled(); this.spellcheckEnabled = evaluateSpellcheckEnabled(); + + this.isHybridIndex = getOptionalValue(defn, LuceneIndexConstants.PROP_HYBRID_INDEX, false); } public NodeState getDefinitionNodeState() { @@ -437,6 +441,10 @@ public final class IndexDefinition implements Aggregate.AggregateMapper { return uid; } + public boolean isHybridIndex() { + return isHybridIndex; + } + @Override public String toString() { return "Lucene Index : " + indexName; diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java index 12d0457..3e58206 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java @@ -42,12 +42,17 @@ import org.apache.lucene.store.FSDirectory; class IndexNode { - static IndexNode open(String indexPath, NodeState root, NodeState defnNodeState, @Nullable IndexCopier cloner) + static IndexNode open(String indexPath, NodeState root, NodeState defnNodeState, @Nullable IndexCopier cloner, + @Nullable MemoryDirectoryStorage directoryStorage, boolean inMemory) throws IOException { Directory directory = null; IndexDefinition definition = new IndexDefinition(root, defnNodeState); NodeState data = defnNodeState.getChildNode(INDEX_DATA_CHILD_NAME); - if (data.exists()) { + if (inMemory) { + if (directoryStorage != null) { + directory = directoryStorage.getDirectory(indexPath); + } + } else if (data.exists()) { directory = new OakDirectory(new ReadOnlyBuilder(defnNodeState), definition, true); if (cloner != null) { directory = cloner.wrapForRead(indexPath, definition, directory); @@ -66,7 +71,7 @@ class IndexNode { suggestDirectory = new OakDirectory(defnNodeState.builder(), ":suggest-data", definition, false); } - IndexNode index = new IndexNode(PathUtils.getName(indexPath), definition, directory, suggestDirectory); + IndexNode index = new IndexNode(PathUtils.getName(indexPath), definition, directory, suggestDirectory, inMemory); directory = null; // closed in Index.close() return index; } finally { @@ -95,9 +100,11 @@ class IndexNode { private final AnalyzingInfixSuggester lookup; + private final boolean inMemory; + private boolean closed = false; - IndexNode(String name, IndexDefinition definition, Directory directory, final OakDirectory suggestDirectory) + IndexNode(String name, IndexDefinition definition, Directory directory, final OakDirectory suggestDirectory, boolean inMemory) throws IOException { this.name = name; this.definition = definition; @@ -110,6 +117,7 @@ class IndexNode { } else { this.lookup = null; } + this.inMemory = inMemory; } String getName() { @@ -132,6 +140,10 @@ class IndexNode { return lookup; } + boolean isInMemory() { + return inMemory; + } + boolean acquire() { lock.readLock().lock(); if (closed) { diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlanner.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlanner.java index d9d72db..ee3f40b 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlanner.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlanner.java @@ -53,6 +53,7 @@ import static org.apache.jackrabbit.JcrConstants.JCR_SCORE; import static org.apache.jackrabbit.oak.commons.PathUtils.getAncestorPath; import static org.apache.jackrabbit.oak.commons.PathUtils.getDepth; import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LucenePropertyIndex.ATTR_IN_MEMORY; import static org.apache.jackrabbit.oak.spi.query.Filter.PropertyRestriction; import static org.apache.jackrabbit.oak.spi.query.QueryIndex.IndexPlan; import static org.apache.jackrabbit.oak.spi.query.QueryIndex.OrderEntry; @@ -94,7 +95,12 @@ class IndexPlanner { } } - return builder != null ? builder.build() : null; + if (builder == null) { + return null; + } else { + builder.setAttribute(ATTR_IN_MEMORY, indexNode.isInMemory()); + return builder.build(); + } } @Override diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java index b0a0e06..8f1c011 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTracker.java @@ -25,11 +25,13 @@ import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.Maps.filterValues; import static com.google.common.collect.Maps.newHashMap; import static java.util.Collections.emptyMap; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_HYBRID_INDEX; import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE; import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.isLuceneIndexNode; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -55,20 +57,36 @@ class IndexTracker { private static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(IndexTracker.class.getName() + ".perf")); + private final MemoryDirectoryStorage directoryStorage; + private final IndexCopier cloner; + private final MonitoringBackgroundObserver memoryIndexObserver; + private NodeState root = EMPTY_NODE; private volatile Map indices = emptyMap(); + private volatile Map memoryIndices = emptyMap(); + private volatile boolean refresh; IndexTracker() { - this(null); + this(null, null, null); + } + + IndexTracker(MemoryDirectoryStorage directoryStorage) { + this(directoryStorage, null, null); } - IndexTracker(IndexCopier cloner){ + IndexTracker(IndexCopier cloner) { + this(null, cloner, null); + } + + IndexTracker(MemoryDirectoryStorage directoryStorage, IndexCopier cloner, MonitoringBackgroundObserver memoryIndexObserver) { + this.directoryStorage = directoryStorage; this.cloner = cloner; + this.memoryIndexObserver = memoryIndexObserver; } synchronized void close() { @@ -91,12 +109,13 @@ class IndexTracker { refresh = false; log.info("Refreshed the opened indexes"); } else { - diffAndUpdate(root); + diffAndUpdate(this.root, root, false); + this.root = root; } } - private synchronized void diffAndUpdate(final NodeState root) { - Map original = indices; + void diffAndUpdate(final NodeState before, final NodeState after, final boolean inMemory) { + final Map original = getIndices(inMemory); final Map updates = newHashMap(); List editors = newArrayListWithCapacity(original.size()); @@ -108,7 +127,7 @@ class IndexTracker { public void leave(NodeState before, NodeState after) { try { long start = PERF_LOGGER.start(); - IndexNode index = IndexNode.open(path, root, after, cloner); + IndexNode index = IndexNode.open(path, root, after, cloner, directoryStorage, inMemory); PERF_LOGGER.end(start, -1, "[{}] Index found to be updated. Reopening the IndexNode", path); updates.put(path, index); // index can be null } catch (IOException e) { @@ -118,15 +137,32 @@ class IndexTracker { }, Iterables.toArray(PathUtils.elements(path), String.class))); } - EditorDiff.process(CompositeEditor.compose(editors), this.root, root); - this.root = root; + EditorDiff.process(CompositeEditor.compose(editors), before, after); if (!updates.isEmpty()) { - indices = ImmutableMap.builder() + Set purged = new HashSet(); + for (String path : updates.keySet()) { + IndexNode index = original.get(path); + + if (!inMemory && index.getDefinition().isHybridIndex()) { + purgeMemoryIndex(path); + purged.add(path); + } + } + + Map result = ImmutableMap.builder() .putAll(filterKeys(original, not(in(updates.keySet())))) .putAll(filterValues(updates, notNull())) .build(); + setIndices(result, inMemory); + + if (!purged.isEmpty()) { + memoryIndices = ImmutableMap.builder() + .putAll(filterKeys(memoryIndices, not(in(purged)))) + .build(); + } + //This might take some time as close need to acquire the //write lock which might be held by current running searches //Given that Tracker is now invoked from a BackgroundObserver @@ -147,11 +183,23 @@ class IndexTracker { } IndexNode acquireIndexNode(String path) { - IndexNode index = indices.get(path); + return acquireIndexNode(path, false); + } + + + IndexNode acquireIndexNode(String path, boolean inMemory) { + if (inMemory && memoryIndexObserver != null) { + try { + memoryIndexObserver.waitUntilProcessingIsFinished(); + } catch (InterruptedException e) { + log.error("The memory index may not contain the recent changes", e); + } + } + IndexNode index = getIndices(inMemory).get(path); if (index != null && index.acquire()) { return index; } else { - return findIndexNode(path); + return findIndexNode(path, inMemory); } } @@ -159,11 +207,11 @@ class IndexTracker { return indices.keySet(); } - private synchronized IndexNode findIndexNode(String path) { + private synchronized IndexNode findIndexNode(String path, boolean inMemory) { // Retry the lookup from acquireIndexNode now that we're // synchronized. The acquire() call is guaranteed to succeed // since the close() method is also synchronized. - IndexNode index = indices.get(path); + IndexNode index = getIndices(inMemory).get(path); if (index != null) { checkState(index.acquire()); return index; @@ -175,14 +223,11 @@ class IndexTracker { } try { - if (isLuceneIndexNode(node)) { - index = IndexNode.open(path, root, node, cloner); + if (isLuceneIndexNode(node) && (!inMemory || node.getBoolean(PROP_HYBRID_INDEX))) { + index = IndexNode.open(path, root, node, cloner, directoryStorage, inMemory); if (index != null) { checkState(index.acquire()); - indices = ImmutableMap.builder() - .putAll(indices) - .put(path, index) - .build(); + putIndex(path, index, inMemory); return index; } } else if (node.exists()) { @@ -194,4 +239,36 @@ class IndexTracker { return null; } + + private void purgeMemoryIndex(String indexPath) { + directoryStorage.purge(indexPath); + IndexNode node = memoryIndices.get(indexPath); + try { + if (node != null) { + node.close(); + } + } catch (IOException e) { + log.error("Can't close node", e); + } + } + + private void setIndices(Map indices, boolean inMemory) { + if (inMemory) { + this.memoryIndices = indices; + } else { + this.indices = indices; + } + } + + private Map getIndices(boolean inMemory) { + return inMemory ? memoryIndices : indices; + } + + private void putIndex(String path, IndexNode index, boolean inMemory) { + setIndices(ImmutableMap.builder() + .putAll(getIndices(inMemory)) + .put(path, index) + .build(), inMemory); + } + } diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java index 3ad7512..75afd38 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java @@ -16,9 +16,7 @@ */ package org.apache.jackrabbit.oak.plugins.index.lucene; -import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.analysis.util.AbstractAnalysisFactory; import org.apache.lucene.util.Version; @@ -141,6 +139,8 @@ public interface LuceneIndexConstants { String PROP_SCORER_PROVIDER = "scorerProviderName"; + String PROP_HYBRID_INDEX = "hybridIndex"; + /** * Integer property indicating that LuceneIndex should be * used in compat mode to specific version diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java index d347559..c738d90 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java @@ -45,7 +45,6 @@ import org.apache.jackrabbit.oak.plugins.index.lucene.Aggregate.Matcher; import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; import org.apache.jackrabbit.oak.plugins.memory.StringPropertyState; import org.apache.jackrabbit.oak.plugins.tree.TreeFactory; -import org.apache.jackrabbit.oak.query.QueryImpl; import org.apache.jackrabbit.oak.spi.commit.Editor; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -122,15 +121,17 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot { private final PathFilter.Result pathFilterResult; LuceneIndexEditor(NodeState root, NodeBuilder definition, - IndexUpdateCallback updateCallback, - @Nullable IndexCopier indexCopier, - ExtractedTextCache extractedTextCache, - IndexAugmentorFactory augmentorFactory) throws CommitFailedException { + IndexUpdateCallback updateCallback, + @Nullable IndexCopier indexCopier, + @Nullable MemoryDirectoryStorage directoryStorage, + ExtractedTextCache extractedTextCache, + IndexAugmentorFactory augmentorFactory, + boolean inMemory) throws CommitFailedException { this.parent = null; this.name = null; this.path = "/"; this.context = new LuceneIndexEditorContext(root, definition, - updateCallback, indexCopier, extractedTextCache, augmentorFactory); + updateCallback, indexCopier, directoryStorage, extractedTextCache, augmentorFactory, inMemory); this.root = root; this.isDeleted = false; this.matcherState = MatcherState.NONE; diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java index 7944034..dade16f 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java @@ -145,6 +145,8 @@ public class LuceneIndexEditorContext { private Directory directory; + private MemoryDirectoryStorage directoryStorage; + private final TextExtractionStats textExtractionStats = new TextExtractionStats(); private final ExtractedTextCache extractedTextCache; @@ -152,6 +154,9 @@ public class LuceneIndexEditorContext { private final IndexAugmentorFactory augmentorFactory; private final NodeState root; + + private final boolean inMemory; + /** * The media types supported by the parser used. */ @@ -162,13 +167,15 @@ public class LuceneIndexEditorContext { private static Clock clock = Clock.SIMPLE; LuceneIndexEditorContext(NodeState root, NodeBuilder definition, IndexUpdateCallback updateCallback, - @Nullable IndexCopier indexCopier, ExtractedTextCache extractedTextCache, - IndexAugmentorFactory augmentorFactory) { + @Nullable IndexCopier indexCopier, @Nullable MemoryDirectoryStorage directoryStorage, + ExtractedTextCache extractedTextCache, IndexAugmentorFactory augmentorFactory, + boolean inMemory) { configureUniqueId(definition); this.root = root; this.definitionBuilder = definition; this.indexCopier = indexCopier; this.definition = new IndexDefinition(root, definition); + this.directoryStorage = directoryStorage; this.indexedNodes = 0; this.updateCallback = updateCallback; this.extractedTextCache = extractedTextCache; @@ -177,6 +184,7 @@ public class LuceneIndexEditorContext { IndexDefinition.updateDefinition(definition); } this.facetsConfig = FacetHelper.getFacetsConfig(definition); + this.inMemory = inMemory; } Parser getParser() { @@ -189,7 +197,11 @@ public class LuceneIndexEditorContext { IndexWriter getWriter() throws IOException { if (writer == null) { final long start = PERF_LOGGER.start(); - directory = newIndexDirectory(definition, definitionBuilder); + if (inMemory) { + directory = directoryStorage.getOrCreateDirectory(this.definition.getIndexPathFromConfig()); + } else { + directory = newIndexDirectory(definition, definitionBuilder); + } IndexWriterConfig config; if (indexCopier != null){ directory = indexCopier.wrapForWrite(definition, directory, reindex); diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java index 24a8c2c..0d5ebcc 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java @@ -69,7 +69,7 @@ public class LuceneIndexEditorProvider implements IndexEditorProvider { @Nonnull IndexUpdateCallback callback) throws CommitFailedException { if (TYPE_LUCENE.equals(type)) { - return new LuceneIndexEditor(root, definition, callback, indexCopier, extractedTextCache, augmentorFactory); + return new LuceneIndexEditor(root, definition, callback, indexCopier, null, extractedTextCache, augmentorFactory, false); } return null; } diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProvider.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProvider.java index 438900c..9b4e717 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProvider.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProvider.java @@ -48,11 +48,19 @@ public class LuceneIndexProvider implements QueryIndexProvider, Observer, Closea IndexAugmentorFactory augmentorFactory; public LuceneIndexProvider() { - this(new IndexTracker()); + this(new MemoryDirectoryStorage()); + } + + public LuceneIndexProvider(MemoryDirectoryStorage directoryStorage) { + this(directoryStorage, null); } public LuceneIndexProvider(IndexCopier indexCopier) { - this(new IndexTracker(indexCopier)); + this(null, indexCopier); + } + + public LuceneIndexProvider(MemoryDirectoryStorage directoryStorage, IndexCopier indexCopier) { + this(new IndexTracker(directoryStorage, indexCopier, null)); } public LuceneIndexProvider(IndexTracker tracker) { diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java index 19989ab..ee88c11 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java @@ -59,6 +59,7 @@ import org.apache.jackrabbit.oak.spi.gc.GCMonitor; import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider; import org.apache.jackrabbit.oak.spi.whiteboard.Registration; import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; +import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils; import org.apache.lucene.analysis.util.CharFilterFactory; import org.apache.lucene.analysis.util.TokenFilterFactory; import org.apache.lucene.analysis.util.TokenizerFactory; @@ -110,6 +111,12 @@ public class LuceneIndexProviderService { @Property( boolValue = true, + label = "Enable Hybrid Index", + description = "Adds support for the " + LuceneIndexConstants.PROP_HYBRID_INDEX + " property") + private static final String PROP_HYBRID = "enableHybridIndexSupport"; + + @Property( + boolValue = true, label = "Enable CopyOnRead", description = "Enable copying of Lucene index to local file system to improve query performance" ) @@ -231,7 +238,22 @@ public class LuceneIndexProviderService { whiteboard = new OsgiWhiteboard(bundleContext); threadPoolSize = PropertiesUtil.toInteger(config.get(PROP_THREAD_POOL_SIZE), PROP_THREAD_POOL_SIZE_DEFAULT); initializeExtractedTextCache(bundleContext, config); - indexProvider = new LuceneIndexProvider(createTracker(bundleContext, config), scorerFactory, augmentorFactory); + + IndexTracker tracker; + + boolean hybridSupportEnabled = PropertiesUtil.toBoolean(config.get(PROP_HYBRID), true); + if (hybridSupportEnabled) { + MemoryDirectoryStorage directoryStorage = new MemoryDirectoryStorage(); + MonitoringBackgroundObserver monitoringMemoryObserver = new MonitoringBackgroundObserver(getExecutorService()); + tracker = createTracker(bundleContext, config, directoryStorage, monitoringMemoryObserver); + LuceneMemoryUpdater memoryObserver = new LuceneMemoryUpdater(directoryStorage, tracker); + monitoringMemoryObserver.addObserver(memoryObserver); + oakRegs.add(WhiteboardUtils.registerObserver(whiteboard, monitoringMemoryObserver)); + } else { + tracker = createTracker(bundleContext, config, null, null); + } + + indexProvider = new LuceneIndexProvider(tracker, scorerFactory, augmentorFactory); initializeLogging(config); initialize(); @@ -326,15 +348,15 @@ public class LuceneIndexProviderService { "TextExtraction statistics")); } - private IndexTracker createTracker(BundleContext bundleContext, Map config) throws IOException { + private IndexTracker createTracker(BundleContext bundleContext, Map config, MemoryDirectoryStorage directoryStorage, MonitoringBackgroundObserver memoryIndexObserver) throws IOException { boolean enableCopyOnRead = PropertiesUtil.toBoolean(config.get(PROP_COPY_ON_READ), true); if (enableCopyOnRead){ initializeIndexCopier(bundleContext, config); log.info("Enabling CopyOnRead support. Index files would be copied under {}", indexDir.getAbsolutePath()); - return new IndexTracker(indexCopier); + return new IndexTracker(directoryStorage, indexCopier, memoryIndexObserver); } - return new IndexTracker(); + return new IndexTracker(directoryStorage, null, memoryIndexObserver); } private void initializeIndexCopier(BundleContext bundleContext, Map config) throws IOException { diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneMemoryIndexEditorProvider.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneMemoryIndexEditorProvider.java new file mode 100644 index 0000000..4ac2f3b --- /dev/null +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneMemoryIndexEditorProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.index.IndexEditor; +import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider; +import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback; +import org.apache.jackrabbit.oak.spi.commit.Editor; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +import javax.annotation.Nonnull; + +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_HYBRID_INDEX; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE; + +/** + * Service that provides Lucene based {@link IndexEditor}s + * + * @see LuceneIndexEditor + * @see IndexEditorProvider + * + */ +public class LuceneMemoryIndexEditorProvider implements IndexEditorProvider { + private final ExtractedTextCache extractedTextCache; + private final IndexAugmentorFactory augmentorFactory; + + private final MemoryDirectoryStorage directoryStorage; + + public LuceneMemoryIndexEditorProvider(MemoryDirectoryStorage directoryStorage) { + //Disable the cache by default in ExtractedTextCache + this(directoryStorage, new ExtractedTextCache(0, 0)); + } + + public LuceneMemoryIndexEditorProvider(MemoryDirectoryStorage directoryStorage, + ExtractedTextCache extractedTextCache) { + this(directoryStorage, extractedTextCache, null); + } + + public LuceneMemoryIndexEditorProvider(MemoryDirectoryStorage directoryStorage, + ExtractedTextCache extractedTextCache, + IndexAugmentorFactory augmentorFactory) { + this.directoryStorage = directoryStorage; + this.extractedTextCache = extractedTextCache; + this.augmentorFactory = augmentorFactory; + } + + @Override + public Editor getIndexEditor( + @Nonnull String type, @Nonnull NodeBuilder definition, @Nonnull NodeState root, + @Nonnull IndexUpdateCallback callback) + throws CommitFailedException { + if (TYPE_LUCENE.equals(type) && definition.getBoolean(PROP_HYBRID_INDEX)) { + return new LuceneIndexEditor(root, definition, callback, null, directoryStorage, extractedTextCache, augmentorFactory, true); + } + return null; + } +} diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneMemoryUpdater.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneMemoryUpdater.java new file mode 100644 index 0000000..4f48000 --- /dev/null +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneMemoryUpdater.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.index.IndexUpdate; +import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EditorDiff; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE; +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.apache.jackrabbit.oak.spi.state.EqualsDiff.modified; + +/** + * Update the lucene-memory indexes. + */ +public class LuceneMemoryUpdater implements Observer { + + private static final Logger log = LoggerFactory.getLogger(LuceneMemoryUpdater.class); + + private static final IndexUpdateCallback NOOP_CALLBACK = + new IndexUpdateCallback() { + @Override + public void indexUpdate() { + // do nothing + } + }; + + private final LuceneMemoryIndexEditorProvider memoryEditorProvider; + + private final IndexTracker tracker; + + private final LocalBranch indexDefBranch; + + private NodeState before; + + public LuceneMemoryUpdater(MemoryDirectoryStorage directoryStorage, IndexTracker tracker) { + this.memoryEditorProvider = new LuceneMemoryIndexEditorProvider(directoryStorage); + this.tracker = tracker; + this.indexDefBranch = new LocalBranch(); + } + + @Override + public void contentChanged(@Nonnull final NodeState root, @Nullable CommitInfo info) { + if (before != null) { + process(root); + } else { + before = indexDefBranch.rebase(root); + } + } + + private void process(NodeState root) { + NodeState after = indexDefBranch.rebase(root); + NodeBuilder rootBuilder = after.builder(); + + // update the memory indices + IndexUpdate updatingEditor = new IndexUpdate(memoryEditorProvider, null, after, rootBuilder, NOOP_CALLBACK).withMissingProviderStrategy(new IndexUpdate.MissingIndexProviderStrategy() { + @Override + public void onMissingIndex(String type, NodeBuilder definition, String indexPath) throws CommitFailedException { + if (TYPE_LUCENE.equals(type)) { // ignore index types different than lucene + super.onMissingIndex(type, definition, indexPath); + } + } + }); + EditorDiff.process(updatingEditor, before, after); + NodeState newState = rootBuilder.getNodeState(); + + // merge the definition changes made by the update process into the branch + indexDefBranch.merge(newState); + + // refresh the index nodes in the tracker + tracker.diffAndUpdate(after, newState, true); + before = newState; + } + + + /** + * Updating an index involves modifying its definition in /oak:index. We don't want to do this for the lucene-memory + * indexes, which are stored locally and not shared across the cluster. That's why we have a local branch, containing + * the lucene-memory definition modifications. + */ + private static class LocalBranch { + + private NodeState branchMemoryIndexDefs; + + private NodeState baseMemoryIndexDefs; + + /** + * Rebase the index definition modifications from this branch onto the root. If there's a conflict caused by the + * external change of the definitions, the branch will be reset. + * + * @param root the node state root to rebase the changes onto + * @return rebased branch root + */ + public NodeState rebase(NodeState root) { + NodeState newIndexDefs = extractHybridDefinitions(root); + if (baseMemoryIndexDefs == null || modified(baseMemoryIndexDefs, newIndexDefs)) { + baseMemoryIndexDefs = newIndexDefs; + branchMemoryIndexDefs = transformDefinitions(newIndexDefs); + log.debug("The memory index definition has been modified externally. Resetting the observer branch."); + } + + NodeBuilder rootBuilder = root.builder(); + NodeBuilder indexDefs = rootBuilder.getChildNode(INDEX_DEFINITIONS_NAME); + for (ChildNodeEntry memoryIndexDef : branchMemoryIndexDefs.getChildNodeEntries()) { + indexDefs.setChildNode(memoryIndexDef.getName(), memoryIndexDef.getNodeState()); + } + return rootBuilder.getNodeState(); + } + + /** + * Merge the definitions changes. + * + * @param root the new root of the branch. + */ + public void merge(NodeState root) { + branchMemoryIndexDefs = extractHybridDefinitions(root); + } + + private static NodeState extractHybridDefinitions(NodeState root) { + NodeBuilder builder = EMPTY_NODE.builder(); + NodeState indexDefs = root.getChildNode(INDEX_DEFINITIONS_NAME); + for (ChildNodeEntry entry : indexDefs.getChildNodeEntries()) { + if (entry.getNodeState().getBoolean(LuceneIndexConstants.PROP_HYBRID_INDEX)) { + builder.setChildNode(entry.getName(), entry.getNodeState()); + } + } + return builder.getNodeState(); + } + + private NodeState transformDefinitions(NodeState indexDefs) { + NodeBuilder builder = indexDefs.builder(); + for (String name : builder.getChildNodeNames()) { + NodeBuilder def = builder.getChildNode(name); + def.removeProperty(ASYNC_PROPERTY_NAME); + } + return builder.getNodeState(); + } + } +} \ No newline at end of file diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java index 1ca79b8..4c3618c 100644 --- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndex.java @@ -35,6 +35,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.collect.Sets; @@ -189,6 +190,8 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati static final String ATTR_PLAN_RESULT = "oak.lucene.planResult"; + static final String ATTR_IN_MEMORY = "oak.lucene.inMemory"; + /** * Batch size for fetching results from Lucene queries. */ @@ -235,12 +238,22 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati List plans = Lists.newArrayListWithCapacity(indexPaths.size()); for (String path : indexPaths) { IndexNode indexNode = null; + IndexNode memoryIndexNode = null; try { indexNode = tracker.acquireIndexNode(path); if (indexNode != null) { IndexPlan plan = new IndexPlanner(indexNode, path, filter, sortOrder).getPlan(); if (plan != null) { + if (indexNode.getDefinition().isHybridIndex()) { + memoryIndexNode = tracker.acquireIndexNode(path, true); + if (memoryIndexNode != null) { + IndexPlan memoryPlan = new IndexPlanner(memoryIndexNode, path, filter, sortOrder).getPlan(); + if (memoryPlan != null) { + plan = enhanceWithMemoryIndexPlan(plan, memoryPlan); + } + } + } plans.add(plan); } } @@ -248,11 +261,18 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati if (indexNode != null) { indexNode.release(); } + if (memoryIndexNode != null) { + memoryIndexNode.release(); + } } } return plans; } + private IndexPlan enhanceWithMemoryIndexPlan(IndexPlan plan, IndexPlan memoryPlan) { + return new HybridIndexPlan(plan, memoryPlan); + } + @Override public double getCost(Filter filter, NodeState root) { throw new UnsupportedOperationException("Not supported as implementing AdvancedQueryIndex"); @@ -271,11 +291,18 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati try { FullTextExpression ft = filter.getFullTextConstraint(); StringBuilder sb = new StringBuilder("lucene:"); - String path = getPlanResult(plan).indexPath; - sb.append(getIndexName(plan)) - .append("(") - .append(path) - .append(") "); + if (plan instanceof HybridIndexPlan) { + HybridIndexPlan hybridPlan = (HybridIndexPlan) plan; + IndexPlan asyncPlan = hybridPlan.getAsyncLucenePlan(); + IndexPlan memoryPlan = hybridPlan.getMemoryLucenePlan(); + sb.append("hybrid:[") + .append(getBasicPlanDescription(plan)) + .append(":").append(asyncPlan.getEstimatedEntryCount()) + .append(",").append(memoryPlan.getEstimatedEntryCount()) + .append("] "); + } else { + sb.append(getBasicPlanDescription(plan)).append(" "); + } sb.append(getLuceneRequest(plan, augmentorFactory, null)); if (plan.getSortOrder() != null && !plan.getSortOrder().isEmpty()) { sb.append(" ordering:").append(plan.getSortOrder()); @@ -289,6 +316,16 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati } } + private static String getBasicPlanDescription(IndexPlan plan) { + StringBuilder sb = new StringBuilder(); + String path = getPlanResult(plan).indexPath; + sb.append(getIndexName(plan)) + .append("(") + .append(path) + .append(")"); + return sb.toString(); + } + @Override public Cursor query(final Filter filter, final NodeState root) { throw new UnsupportedOperationException("Not supported as implementing AdvancedQueryIndex"); @@ -296,11 +333,43 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati @Override public Cursor query(final IndexPlan plan, NodeState rootState) { + Filter filter = plan.getFilter(); + QueryEngineSettings settings = filter.getQueryEngineSettings(); + + Iterator itr; + SizeEstimator sizeEstimator; + if (plan instanceof HybridIndexPlan) { + HybridIndexPlan hybridPlan = (HybridIndexPlan) plan; + Iterator luceneIterator = getResultRowIterator(hybridPlan.getAsyncLucenePlan()); + Iterator memoryIterator = getResultRowIterator(hybridPlan.getMemoryLucenePlan()); + itr = Iterators.concat(luceneIterator, memoryIterator); + + final SizeEstimator asyncSizeEstimator = getSizeEstimator(hybridPlan.getAsyncLucenePlan()); + final SizeEstimator memorySizeEstimator = getSizeEstimator(hybridPlan.getMemoryLucenePlan()); + sizeEstimator = new SizeEstimator() { + @Override + public long getSize() { + long asyncSize = asyncSizeEstimator.getSize(); + long memorySize = memorySizeEstimator.getSize(); + if (asyncSize == -1 || memorySize == 1) { + return -1; + } else { + return asyncSize + memorySize; + } + } + }; + } else { + itr = getResultRowIterator(plan); + sizeEstimator = getSizeEstimator(plan); + } + return new LucenePathCursor(itr, plan, settings, sizeEstimator); + } + + private Iterator getResultRowIterator(final IndexPlan plan) { final Filter filter = plan.getFilter(); final Sort sort = getSort(plan); final PlanResult pr = getPlanResult(plan); - QueryEngineSettings settings = filter.getQueryEngineSettings(); - Iterator itr = new AbstractIterator() { + return new AbstractIterator() { private final Deque queue = Queues.newArrayDeque(); private final Set seenPaths = Sets.newHashSet(); private ScoreDoc lastDoc; @@ -353,6 +422,7 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati /** * Loads the lucene documents in batches + * * @return true if any document is loaded */ private boolean loadDocs() { @@ -364,7 +434,9 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati ScoreDoc lastDocToRecord = null; final IndexNode indexNode = acquireIndexNode(plan); - checkState(indexNode != null); + if (indexNode == null) { + return false; + } try { IndexSearcher searcher = indexNode.getSearcher(); LuceneRequestFacade luceneRequestFacade = getLuceneRequest(plan, augmentorFactory, searcher.getIndexReader()); @@ -532,11 +604,16 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati this.lastSearchIndexerVersion = currentVersion; } }; - SizeEstimator sizeEstimator = new SizeEstimator() { + } + + private SizeEstimator getSizeEstimator(final IndexPlan plan) { + return new SizeEstimator() { @Override public long getSize() { IndexNode indexNode = acquireIndexNode(plan); - checkState(indexNode != null); + if (indexNode == null) { + return 0; + } try { IndexSearcher searcher = indexNode.getSearcher(); LuceneRequestFacade luceneRequestFacade = getLuceneRequest(plan, augmentorFactory, searcher.getIndexReader()); @@ -557,7 +634,6 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati return -1; } }; - return new LucenePathCursor(itr, plan, settings, sizeEstimator); } private static Query addDescendantClauseIfRequired(Query query, IndexPlan plan) { @@ -667,7 +743,13 @@ public class LucenePropertyIndex implements AdvancedQueryIndex, QueryIndex, Nati } private IndexNode acquireIndexNode(IndexPlan plan) { - return tracker.acquireIndexNode(getPlanResult(plan).indexPath); + String indexPath = getPlanResult(plan).indexPath; + boolean inMemory = Boolean.TRUE.equals(plan.getAttribute(ATTR_IN_MEMORY)); + IndexNode node = tracker.acquireIndexNode(indexPath, inMemory); + if (!inMemory) { + checkState(node != null); + } + return node; } private static Sort getSort(IndexPlan plan) { diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/MemoryDirectoryStorage.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/MemoryDirectoryStorage.java new file mode 100644 index 0000000..7197943 --- /dev/null +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/MemoryDirectoryStorage.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional debugrmation regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.RAMDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class MemoryDirectoryStorage { + + private static final Logger log = LoggerFactory.getLogger(IndexTracker.class); + + final Map directories; + + public MemoryDirectoryStorage() { + this.directories = new HashMap(); + } + + @Nonnull + public Directory getOrCreateDirectory(@Nonnull final String indexPath) { + checkNotNull(indexPath); + + log.debug("getOrCreate {}", indexPath); + final RAMDirectory directory; + synchronized (this) { + if (directories.containsKey(indexPath)) { + directory = directories.get(indexPath); + log.debug("returned {} directory: {}", indexPath, directory); + } else { + directory = new RAMDirectory(); + log.debug("created {} directory: {}", indexPath, directory); + } + } + return new UnclosableDirectory(directory) { + @Override + public void close() throws IOException { + log.debug("stored {} directory on close(): {}", indexPath, directory); + directories.put(indexPath, directory); + } + }; + } + + @CheckForNull + public Directory getDirectory(String indexPath) { + log.debug("get {}", indexPath); + + synchronized (this) { + RAMDirectory directory = directories.get(indexPath); + if (directory == null) { + log.debug("there's no {} directory", indexPath); + return null; + } else { + log.debug("returned {} directory: {}", indexPath, directory); + return new UnclosableDirectory(directory); + } + } + } + + public void purge(String indexPath) { + log.debug("purge {}", indexPath); + + RAMDirectory directory; + synchronized (this) { + directory = directories.remove(indexPath); + } + if (directory != null) { + directory.close(); + } + } + + private static class UnclosableDirectory extends Directory { + + private final Directory directory; + + public UnclosableDirectory(Directory directory) { + this.directory = directory; + } + + @Override + public String[] listAll() throws IOException { + return directory.listAll(); + } + + @Override + public boolean fileExists(String name) throws IOException { + return directory.fileExists(name); + } + + @Override + public void deleteFile(String name) throws IOException { + directory.deleteFile(name); + } + + @Override + public long fileLength(String name) throws IOException { + return directory.fileLength(name); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + return directory.createOutput(name, context); + } + + @Override + public void sync(Collection names) throws IOException { + directory.sync(names); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + return directory.openInput(name, context); + } + + @Override + public Lock makeLock(String name) { + return directory.makeLock(name); + } + + @Override + public void clearLock(String name) throws IOException { + directory.clearLock(name); + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public void setLockFactory(LockFactory lockFactory) throws IOException { + directory.setLockFactory(lockFactory); + } + + @Override + public LockFactory getLockFactory() { + return directory.getLockFactory(); + } + } +} \ No newline at end of file diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/MonitoringBackgroundObserver.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/MonitoringBackgroundObserver.java new file mode 100644 index 0000000..d1cbac6 --- /dev/null +++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/MonitoringBackgroundObserver.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.stats.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; + +/** + * This observer wrapper runs processes the incoming content changes in the + * background. There's no queue - if there are many content changes, only + * the latest snapshot is passed to the configured observers. + *

+ * The distinctive feature of this class is that it allows to wait until + * the incoming changes are processed. One can call {@link #waitUntilProcessingIsFinished()} + * method and it'll block until the underlying observers are up-to-date + * with the repository state from the moment of calling this method. Any changes + * introduced to the repository after the {@link #waitUntilProcessingIsFinished()} + * is called will be ignored - in other words, the method is not interested + * in processing the future repository updates. + */ +public class MonitoringBackgroundObserver implements Observer { + + private static final Logger log = LoggerFactory.getLogger(MonitoringBackgroundObserver.class); + + private final Executor executor; + + private final List observers = new CopyOnWriteArrayList(); + + private final Deque processingCallbacks = new ArrayDeque(); + + private final Clock clock = Clock.SIMPLE; + + private boolean updateInProgress; + + private ObservationEvent waitingEvent; + + public MonitoringBackgroundObserver(Executor executor) { + this.executor = executor; + } + + @Override + public synchronized void contentChanged(@Nonnull final NodeState root, @Nullable CommitInfo info) { + final ObservationEvent event = new ObservationEvent(root, info, startProcessing()); + if (updateInProgress) { + waitingEvent = event; + } else { + updateInProgress = true; + executor.execute(new Runnable() { + @Override + public void run() { + processChange(event); + } + }); + } + } + + public void addObserver(Observer observer) { + observers.add(observer); + } + + private void processChange(ObservationEvent event) { + ObservationEvent nextEvent = event; + + while (nextEvent != null) { + for (Observer o : observers) { + o.contentChanged(nextEvent.root, nextEvent.commitInfo); + } + processingFinished(nextEvent.processingState); + + synchronized (this) { + nextEvent = waitingEvent; + waitingEvent = null; + + if (nextEvent == null) { + updateInProgress = false; + } + } + } + } + + private ProcessingState startProcessing() { + synchronized (processingCallbacks) { + ProcessingState newProcess = new ProcessingState(); + processingCallbacks.addLast(newProcess); + return newProcess; + } + } + + private void processingFinished(ProcessingState process) { + synchronized (processingCallbacks) { + while (!processingCallbacks.isEmpty() && process.compareTo(processingCallbacks.peekFirst()) >= 0) { + processingCallbacks.removeFirst().markAsDone(); + } + } + } + + public void waitUntilProcessingIsFinished() throws InterruptedException { + ProcessingState monitor = null; + synchronized (processingCallbacks) { + if (!processingCallbacks.isEmpty()) { + monitor = processingCallbacks.getLast(); + } + } + if (monitor != null) { + monitor.waitUntilDone(); + } + } + + private static class ObservationEvent { + + private final NodeState root; + + private final CommitInfo commitInfo; + + private final ProcessingState processingState; + + private ObservationEvent(NodeState root, CommitInfo commitInfo, ProcessingState processingState) { + this.root = root; + this.commitInfo = commitInfo; + this.processingState = processingState; + } + } + + /** + * This class represents processing a single event. + */ + private class ProcessingState implements Comparable { + + /** + * When the processing started (this value is unique and always increasing) + */ + private final long startTime; + + /** + * Whether this or any younger process has been finished. + */ + private volatile boolean isDone; + + private ProcessingState() { + long time; + try { + time = clock.getTimeIncreasing(); + } catch (InterruptedException e) { + log.error("Can't get time from the clock", e); + time = System.currentTimeMillis(); + } + startTime = time; + } + + @Override + public int compareTo(ProcessingState o) { + return Long.compare(startTime, o.startTime); + } + + /** + * Informs all the clients waiting in the {@link #waitUntilDone()} method + * that the changes are already processed. + */ + private void markAsDone() { + synchronized (this) { + isDone = true; + notifyAll(); + } + } + + /** + * Wait until all changes are applied. + * + * @throws InterruptedException + */ + private void waitUntilDone() throws InterruptedException { + if (isDone) { + return; + } + synchronized (this) { + while (!isDone) { + wait(); + } + } + } + } +} \ No newline at end of file diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java index a213a55..85b4909 100644 --- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java +++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexPlannerTest.java @@ -586,11 +586,11 @@ public class IndexPlannerTest { //------ END - Suggestion/spellcheck plan tests private IndexNode createIndexNode(IndexDefinition defn, long numOfDocs) throws IOException { - return new IndexNode("foo", defn, createSampleDirectory(numOfDocs), null); + return new IndexNode("foo", defn, createSampleDirectory(numOfDocs), null, false); } private IndexNode createIndexNode(IndexDefinition defn) throws IOException { - return new IndexNode("foo", defn, createSampleDirectory(), null); + return new IndexNode("foo", defn, createSampleDirectory(), null, false); } private FilterImpl createFilter(String nodeTypeName) { diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneHybridTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneHybridTest.java new file mode 100644 index 0000000..915b24e --- /dev/null +++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneHybridTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import com.google.common.base.Function; +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.api.ContentRepository; +import org.apache.jackrabbit.oak.api.ContentSession; +import org.apache.jackrabbit.oak.api.Root; +import org.apache.jackrabbit.oak.api.Tree; +import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent; +import org.apache.jackrabbit.oak.query.AbstractQueryTest; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer; +import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider; +import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; + +import static com.google.common.collect.ImmutableSet.of; +import static com.google.common.collect.Lists.transform; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static org.apache.commons.lang3.StringUtils.substringAfterLast; +import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE; +import static org.apache.jackrabbit.oak.api.Type.NAME; +import static org.apache.jackrabbit.oak.api.Type.STRINGS; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NODE_TYPE; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.FULL_TEXT_ENABLED; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INCLUDE_PROPERTY_NAMES; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_PATH; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_HYBRID_INDEX; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_NAME; +import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE; +import static org.apache.jackrabbit.oak.plugins.memory.PropertyStates.createProperty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class LuceneHybridTest extends AbstractQueryTest { + + private static final Logger log = LoggerFactory.getLogger(LuceneHybridTest.class); + + private static final int ASYNC_LUCENE_DELAY = 2; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private DocumentStore documentStore; + + private BlobStore blobStore; + + private boolean initialized; + + private int clusterId; + + private MemoryDirectoryStorage directoryStorage; + + @Before + @Override + public void before() throws Exception { + documentStore = new MemoryDocumentStore(); + blobStore = new MemoryBlobStore(); + initialized = false; + clusterId = 1; + + session = createRepository().login(null, null); + root = session.getLatestRoot(); + qe = root.getQueryEngine(); + createTestIndexNode(); + } + + @Override + protected ContentRepository createRepository() { + MemoryDirectoryStorage directoryStorage = new MemoryDirectoryStorage(); + LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider(null, new ExtractedTextCache(10 * FileUtils.ONE_MB, 100), null); + + MonitoringBackgroundObserver monitoringMemoryObserver = new MonitoringBackgroundObserver(Executors.newSingleThreadExecutor()); + IndexTracker tracker = new IndexTracker(directoryStorage, null, monitoringMemoryObserver); + LuceneMemoryUpdater memoryObserver = new LuceneMemoryUpdater(directoryStorage, tracker); + monitoringMemoryObserver.addObserver(memoryObserver); + + LuceneIndexProvider provider = new LuceneIndexProvider(tracker); + + if (this.directoryStorage == null) { + this.directoryStorage = directoryStorage; + } + + DocumentMK.Builder builder = new DocumentMK.Builder(); + builder.setDocumentStore(documentStore); + builder.setClusterId(clusterId++); + builder.setLeaseCheck(false); + builder.setBlobStore(blobStore); + + NodeStore ns = builder.getNodeStore(); + + Oak oak = new Oak(ns); + if (!initialized) { + initialized = true; + oak.with(new InitialContent()); + oak.with(new RepositoryInitializer() { + @Override + public void initialize(@Nonnull NodeBuilder builder) { + createIndexes(builder); + } + + }); + oak.withAsyncIndexing("async-lucene", ASYNC_LUCENE_DELAY); + } else { + oak.withAsyncIndexing("async-lucene", Integer.MAX_VALUE); + } + + return oak + .with(new OpenSecurityProvider()) + .with((QueryIndexProvider) provider) + .with((Observer) provider) + .with(editorProvider) + .with(monitoringMemoryObserver) + .createContentRepository(); + } + + private void createIndexes(NodeBuilder root) { + NodeBuilder asyncDef = createIndex(root, "test-index-async", of("myprop"), TYPE_LUCENE, "async-lucene"); + asyncDef.setProperty(PROP_HYBRID_INDEX, true); + } + + @Test + public void testHybridIndex() throws Exception { + ContentRepository extRepo = createRepository(); + ContentSession extSession = extRepo.login(null, null); + + Tree localTest = root.getTree("/").addChild("test-local"); + localTest.addChild("z").setProperty("myprop", "xyz", Type.STRING); + root.commit(); + + waitUntilTraverseIndexed("The initial node hasn't been indexed", 1); + waitUntilHybridIndexed("The initial node hasn't been indexed", 1, 1, 1); + + localTest = root.getTree("/").addChild("test2"); + localTest.addChild("a").setProperty("myprop", "xyz", Type.STRING); + localTest.addChild("b").setProperty("myprop", "xyz", Type.STRING); + localTest.addChild("c").setProperty("myprop", "xyz", Type.STRING); + root.commit(); + + waitUntilHybridIndexed("The memory index hasn't been used for local changes", 4, 1, 4); + waitUntilAsyncIndexed("The memory index hasn't been purged for local changes", 4); + + Root extRoot = extSession.getLatestRoot(); + Tree test = extRoot.getTree("/").addChild("test-remote"); + test.addChild("a").setProperty("myprop", "xyz", Type.STRING); + test.addChild("b").setProperty("myprop", "xyz", Type.STRING); + test.addChild("c").setProperty("myprop", "xyz", Type.STRING); + extRoot.commit(); + + waitUntilHybridIndexed("The memory index hasn't been used for remote changes", 7, 4, 3); + waitUntilAsyncIndexed("The memory index hasn't been purged for remote changes", 7); + } + + @Test + public void testResultsOrder() throws Exception { + Tree localTest = root.getTree("/").addChild("test-local"); + localTest.addChild("fourth").setProperty("myprop", "aaa004", Type.STRING); + root.commit(); + waitUntilTraverseIndexed("The initial node hasn't been indexed", 1); + waitUntilHybridIndexed("The initial node hasn't been indexed", 1, 1, 1); + + localTest.addChild("fourth").setProperty("myprop", "aaa004", Type.STRING); + localTest.addChild("second").setProperty("myprop", "aaa002", Type.STRING); + root.commit(); + waitUntilAsyncIndexed("Asynchronous nodes hasn't been indexed", 2); + + localTest.addChild("third").setProperty("myprop", "aaa003", Type.STRING); + localTest.addChild("first").setProperty("myprop", "aaa001", Type.STRING); + root.commit(); + + List paths = waitUntilIndexed("Hybrid index hasn't been used", 4, "lucene:hybrid:", 2, 2, "order by s.myprop"); + assertEquals(asList("first", "second", "third", "fourth"), transform(paths, new Function() { + @Nullable + @Override + public String apply(@Nullable String input) { + return substringAfterLast(input, "/"); + } + })); + } + + @Test + public void testChangesVisibility() throws Exception { + Tree localTest = root.getTree("/").addChild("test-local"); + localTest.addChild("z").setProperty("myprop", "xyz", Type.STRING); + root.commit(); + + waitUntilTraverseIndexed("The initial node hasn't been indexed", 1); + waitUntilHybridIndexed("The initial node hasn't been indexed", 1, 1, 1); + + localTest = root.getTree("/").addChild("test2"); + for (int i = 0; i < 500; i++) { + String query = "select [jcr:path] from [nt:base] as s where s.[myprop] = 'xyz" + i + "'"; + + localTest.addChild("c-" + i).setProperty("myprop", "xyz" + i, Type.STRING); + root.commit(); + root.refresh(); + List result = executeQuery(query, "JCR-SQL2"); + if (!singletonList("/test2/c-" + i).equals(result)) { + fail("The changes are not visible immediately"); + } + } + } + + + private List waitUntilTraverseIndexed(String failMessage, int howManyResults) throws InterruptedException { + return waitUntilIndexed(failMessage, howManyResults, "traverse", -1, -1, null); + } + + private List waitUntilAsyncIndexed(String failMessage, int howManyResults) throws InterruptedException { + return waitUntilIndexed(failMessage, howManyResults, "lucene:test-index-async", -1, -1, null); + } + + private List waitUntilHybridIndexed(String failMessage, int howManyResults, int asyncCount, int memoryCount) throws InterruptedException { + return waitUntilIndexed(failMessage, howManyResults, "lucene:hybrid:", asyncCount, memoryCount, null); + } + + private List waitUntilIndexed(String failMessage, int howManyResults, String indexType, int asyncCount, int memoryCount, String order) throws InterruptedException { + StringBuilder builder = new StringBuilder("select [jcr:path] from [nt:base] as s where s.[myprop] IS NOT NULL"); + if (order != null) { + builder.append(" ").append(order); + } + String query = builder.toString(); + + long start = System.currentTimeMillis(); + + String description = null; + List paths = Collections.emptyList(); + + while (System.currentTimeMillis() - start < ASYNC_LUCENE_DELAY * 1000 + 500) { + root.refresh(); + + description = explain(query); + paths = executeQuery(query, "JCR-SQL2"); + + boolean valid = true; + valid &= paths.size() == howManyResults; + valid &= description.contains(indexType); + if (asyncCount > -1 && memoryCount > -1) { + valid &= description.contains(String.format("):%d,%d]", asyncCount, memoryCount)); + } + + if (valid) { + log.info("Valid results: {}", paths); + log.info("Valid explanation: {}", description); + return paths; + } + + Thread.sleep(100); + } + + log.info(description); + log.info(paths.toString()); + + fail(failMessage); + return paths; + } + + private String explain(String query) { + String explain = "explain " + query; + return executeQuery(explain, "JCR-SQL2").get(0); + } + + private static NodeBuilder createIndex(NodeBuilder root, String name, Set includes, String type, String async) { + NodeBuilder defParent = root.getChildNode(INDEX_DEFINITIONS_NAME); + NodeBuilder def = defParent.child(name); + def.setProperty(JCR_PRIMARYTYPE, INDEX_DEFINITIONS_NODE_TYPE, NAME); + def.setProperty(PROP_NAME, name); + def.setProperty(TYPE_PROPERTY_NAME, type); + def.setProperty(REINDEX_PROPERTY_NAME, true); + def.setProperty(FULL_TEXT_ENABLED, false); + def.setProperty(createProperty(INCLUDE_PROPERTY_NAMES, includes, STRINGS)); + def.setProperty(INDEX_PATH, String.format("/%s/%s", INDEX_DEFINITIONS_NAME, name)); + if (async != null) { + def.setProperty(ASYNC_PROPERTY_NAME, async); + } + NodeBuilder updatedDef = IndexDefinition.updateDefinition(def.getNodeState().builder()); + defParent.setChildNode(name, updatedDef.getNodeState()); + return defParent.getChildNode(name); + } +} \ No newline at end of file diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java index 72edb58..8ae6685 100644 --- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java +++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java @@ -81,7 +81,9 @@ public class LuceneIndexProviderServiceTest { assertNotNull("CopyOnRead should be enabled by default", context.getService(CopyOnReadStatsMBean.class)); assertNotNull(context.getService(CacheStatsMBean.class)); - assertTrue(context.getService(Observer.class) instanceof BackgroundObserver); + Observer[] observers = context.getServices(Observer.class, null); + assertTrue(observers[0] instanceof MonitoringBackgroundObserver); + assertTrue(observers[1] instanceof BackgroundObserver); assertEquals(InfoStream.NO_OUTPUT, InfoStream.getDefault()); assertEquals(1024, BooleanQuery.getMaxClauseCount()); @@ -95,7 +97,9 @@ public class LuceneIndexProviderServiceTest { config.put("enableOpenIndexAsync", false); MockOsgi.activate(service, context.bundleContext(), config); - assertTrue(context.getService(Observer.class) instanceof LuceneIndexProvider); + Observer[] observers = context.getServices(Observer.class, null); + assertTrue(observers[0] instanceof MonitoringBackgroundObserver); + assertTrue(observers[1] instanceof LuceneIndexProvider); MockOsgi.deactivate(service); } diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MonitoringBackgroundObserverTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MonitoringBackgroundObserverTest.java new file mode 100644 index 0000000..7a27dcb --- /dev/null +++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/MonitoringBackgroundObserverTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene; + +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MonitoringBackgroundObserverTest { + + private MonitoringBackgroundObserver mbo; + + @Before + public void createObserver() { + mbo = new MonitoringBackgroundObserver(Executors.newSingleThreadExecutor()); + } + + @Test(timeout = 10) + public void testNoProcessing() throws InterruptedException { + mbo.waitUntilProcessingIsFinished(); + } + + @Test + public void testWaitForFinish() throws InterruptedException { + final AtomicBoolean observerProcessed = new AtomicBoolean(); + mbo.addObserver(new Observer() { + @Override + public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) { + sleep(50); + observerProcessed.set(true); + } + }); + mbo.contentChanged(null, null); + + mbo.waitUntilProcessingIsFinished(); + assertTrue(observerProcessed.get()); + } + + @Test + public void testProcessOnlyLatest() throws InterruptedException { + final List processed = new ArrayList(); + mbo.addObserver(new Observer() { + @Override + public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) { + sleep(50); + processed.add(info.getSessionId()); + } + }); + mbo.contentChanged(null, new CommitInfo("c1", "")); + mbo.contentChanged(null, new CommitInfo("c2", "")); // this should be ignored + mbo.contentChanged(null, new CommitInfo("c3", "")); // this will be processed + mbo.waitUntilProcessingIsFinished(); + + assertEquals(asList("c1", "c3"), processed); + } + + @Test + public void testXyz() throws InterruptedException { + final List processed = new ArrayList(); + mbo.addObserver(new Observer() { + @Override + public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) { + sleep(100); + processed.add(info.getSessionId()); + } + }); + mbo.contentChanged(null, new CommitInfo("c1", "")); + mbo.contentChanged(null, new CommitInfo("c2", "")); // this should be ignored + + // add a new content after 50 millis, while we'll be waiting + new Thread(new Runnable() { + @Override + public void run() { + sleep(50); + mbo.contentChanged(null, new CommitInfo("c3", "")); // this will be ignored + mbo.contentChanged(null, new CommitInfo("c4", "")); // this will be processed + } + }).start(); + + // wait for the c2 processing to be finished - in the meantime, the c3 and c4 + // will be added + mbo.waitUntilProcessingIsFinished(); + + assertEquals(asList("c1", "c4"), processed); + } + + private static void sleep(long timeout) { + try { + Thread.sleep(timeout); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +}