diff --git a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticFullTextNotGlobalSearchTest.java b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticFullTextNotGlobalSearchTest.java index b095c4bb6b..6e4ac4f6f5 100644 --- a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticFullTextNotGlobalSearchTest.java +++ b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticFullTextNotGlobalSearchTest.java @@ -26,7 +26,7 @@ import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture; import org.apache.jackrabbit.oak.fixture.RepositoryFixture; import org.apache.jackrabbit.oak.jcr.Jcr; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider; import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider; @@ -61,7 +61,7 @@ public class ElasticFullTextNotGlobalSearchTest extends SearchTest { .with(new PropertyIndexEditorProvider()) .with(new NodeTypeIndexProvider()) .with(new PropertyFullTextTest.FullTextPropertyInitialiser("elasticText", of("text"), - ElasticsearchIndexConstants.TYPE_ELASTICSEARCH).nodeScope().analyzed()); + ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH).nodeScope().analyzed()); return new Jcr(oak); } }); diff --git a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextSeparated.java b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextSeparated.java index ce804acd3f..aa0f4e2ad6 100644 --- a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextSeparated.java +++ b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextSeparated.java @@ -27,12 +27,10 @@ import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture; import org.apache.jackrabbit.oak.fixture.RepositoryFixture; import org.apache.jackrabbit.oak.jcr.Jcr; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider; import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.jcr.Repository; import java.io.File; @@ -82,7 +80,7 @@ public class ElasticPropertyFullTextSeparated extends PropertyFullTextTest { .with((new ElasticGlobalInitializer(ELASTIC_GLOBAL_INDEX, storageEnabled)).async("fulltext-async")) // the WikipediaImporter set a property `title` .with(new FullTextPropertyInitialiser("elasticTitle", of("title"), - ElasticsearchIndexConstants.TYPE_ELASTICSEARCH).async()) + ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH).async()) .withAsyncIndexing("async", 5) .withAsyncIndexing("fulltext-async", 5); return new Jcr(oak); diff --git a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextTest.java b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextTest.java index a9e2645e97..8c84fc6094 100644 --- a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextTest.java +++ b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyFullTextTest.java @@ -26,7 +26,7 @@ import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture; import org.apache.jackrabbit.oak.fixture.RepositoryFixture; import org.apache.jackrabbit.oak.jcr.Jcr; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider; import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache; @@ -106,7 +106,7 @@ public class ElasticPropertyFullTextTest extends PropertyFullTextTest { .with((new ElasticGlobalInitializer(ELASTIC_GLOBAL_INDEX, storageEnabled)).async()) // the WikipediaImporter set a property `title` .with(new FullTextPropertyInitialiser("elasticTitle", of("title"), - ElasticsearchIndexConstants.TYPE_ELASTICSEARCH).async()) + ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH).async()) .withAsyncIndexing("async", 5); return new Jcr(oak); } diff --git a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyTextSearchTest.java b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyTextSearchTest.java index 5512ba055d..f466b85ecb 100644 --- a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyTextSearchTest.java +++ b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticPropertyTextSearchTest.java @@ -26,7 +26,7 @@ import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture; import org.apache.jackrabbit.oak.fixture.RepositoryFixture; import org.apache.jackrabbit.oak.jcr.Jcr; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.query.ElasticsearchIndexProvider; import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider; @@ -77,7 +77,7 @@ public class ElasticPropertyTextSearchTest extends SearchTest { .with(new PropertyIndexEditorProvider()) .with(new NodeTypeIndexProvider()) .with(new PropertyFullTextTest.FullTextPropertyInitialiser("elasticText", of("text"), - ElasticsearchIndexConstants.TYPE_ELASTICSEARCH)); + ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH)); return new Jcr(oak); } }); diff --git a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/util/ElasticGlobalInitializer.java b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/util/ElasticGlobalInitializer.java index 2924cf0cb3..499fdf5db7 100644 --- a/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/util/ElasticGlobalInitializer.java +++ b/oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/util/ElasticGlobalInitializer.java @@ -20,7 +20,7 @@ package org.apache.jackrabbit.oak.benchmark.util; import org.apache.jackrabbit.oak.plugins.index.IndexUtils; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.util.IndexHelper; import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; @@ -105,7 +105,7 @@ public class ElasticGlobalInitializer implements RepositoryInitializer { // do nothing } else { IndexHelper.newFTIndexDefinition(IndexUtils.getOrCreateOakIndex(builder), - this.name, ElasticsearchIndexConstants.TYPE_ELASTICSEARCH, + this.name, ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH, propertyTypes, excludes, async, storageEnabled); } } diff --git a/oak-search-elastic/pom.xml b/oak-search-elastic/pom.xml index 87dec1ad61..b09c0a17cd 100644 --- a/oak-search-elastic/pom.xml +++ b/oak-search-elastic/pom.xml @@ -187,11 +187,21 @@ + + ch.qos.logback + logback-classic + test + junit junit test + + org.mockito + mockito-core + test + org.apache.jackrabbit oak-core diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexConstants.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexConstants.java deleted file mode 100644 index 8e0c5aa3d0..0000000000 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexConstants.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.jackrabbit.oak.plugins.index.elasticsearch; - -public class ElasticsearchIndexConstants { - public static final String TYPE_ELASTICSEARCH = "elasticsearch"; -} diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java index 1eeade244f..525d058abc 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java @@ -18,11 +18,126 @@ */ package org.apache.jackrabbit.oak.plugins.index.elasticsearch; +import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; import org.apache.jackrabbit.oak.spi.state.NodeState; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.jackrabbit.oak.plugins.index.search.util.ConfigUtil.getOptionalValue; +import static org.elasticsearch.common.Strings.INVALID_FILENAME_CHARS; + public class ElasticsearchIndexDefinition extends IndexDefinition { + + public static final String TYPE_ELASTICSEARCH = "elasticsearch"; + + public static final String BULK_ACTIONS = "bulkActions"; + public static final int BULK_ACTIONS_DEFAULT = 250; + + public static final String BULK_SIZE_BYTES = "bulkSizeBytes"; + public static final long BULK_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024; // 2MB + + public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs"; + public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000; + + public static final String BULK_RETRIES = "bulkRetries"; + public static final int BULK_RETRIES_DEFAULT = 3; + + public static final String BULK_RETRIES_BACKOFF = "bulkRetriesBackoff"; + public static final long BULK_RETRIES_BACKOFF_DEFAULT = 200; + + private static final int MAX_NAME_LENGTH = 255; + + private static final String INVALID_CHARS_REGEX = Pattern.quote(INVALID_FILENAME_CHARS + .stream() + .map(Object::toString) + .collect(Collectors.joining(""))); + + private final String remoteIndexName; + + public final int bulkActions; + public final long bulkSizeBytes; + public final long bulkFlushIntervalMs; + public final int bulkRetries; + public final long bulkRetriesBackoff; + public ElasticsearchIndexDefinition(NodeState root, NodeState defn, String indexPath) { super(root, getIndexDefinitionState(defn), determineIndexFormatVersion(defn), determineUniqueId(defn), indexPath); + this.remoteIndexName = setupIndexName(); + + this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT); + this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT); + this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT); + this.bulkRetries = getOptionalValue(defn, BULK_RETRIES, BULK_RETRIES_DEFAULT); + this.bulkRetriesBackoff = getOptionalValue(defn, BULK_RETRIES_BACKOFF, BULK_RETRIES_BACKOFF_DEFAULT); + } + + /** + * Returns the index identifier on the Elasticsearch cluster. Notice this can be different from the value returned + * from {@code getIndexName}. + * @return the Elasticsearch index identifier + */ + public String getRemoteIndexName() { + return remoteIndexName; + } + + private String setupIndexName() { + // TODO: implement advanced remote index name strategy that takes into account multiple tenants and re-index process + return getESSafeIndexName(getIndexPath() + "-" + getReindexCount()); + } + + /** + * + *

+ * The resulting file name would be truncated to MAX_NAME_LENGTH + */ + private static String getESSafeIndexName(String indexPath) { + String name = StreamSupport + .stream(PathUtils.elements(indexPath).spliterator(), false) + .limit(3) //Max 3 nodeNames including oak:index which is the immediate parent for any indexPath + .filter(p -> !"oak:index".equals(p)) + .map(ElasticsearchIndexDefinition::getESSafeName) + .collect(Collectors.joining("_")); + + if (name.length() > MAX_NAME_LENGTH) { + name = name.substring(0, MAX_NAME_LENGTH); + } + return name; + } + + /** + * Convert {@code e} to Elasticsearch safe index name. + * Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html + */ + private static String getESSafeName(String suggestedIndexName) { + return suggestedIndexName.replaceAll(INVALID_CHARS_REGEX, "").toLowerCase(); + } + + /** + * Class to help with {@link ElasticsearchIndexDefinition} creation. + * The built object represents the index definition only without the node structure. + */ + public static class Builder extends IndexDefinition.Builder { + @Override + public ElasticsearchIndexDefinition build() { + return (ElasticsearchIndexDefinition) super.build(); + } + + @Override + public ElasticsearchIndexDefinition.Builder reindex() { + super.reindex(); + return this; + } + + @Override + protected IndexDefinition createInstance(NodeState indexDefnStateToUse) { + return new ElasticsearchIndexDefinition(root, indexDefnStateToUse, indexPath); + } } } diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDescriptor.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDescriptor.java deleted file mode 100644 index cac4015d62..0000000000 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDescriptor.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.jackrabbit.oak.plugins.index.elasticsearch; - -import org.apache.jackrabbit.oak.commons.PathUtils; -import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; -import org.elasticsearch.client.RestHighLevelClient; -import org.jetbrains.annotations.NotNull; - -import java.util.Objects; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static org.elasticsearch.common.Strings.INVALID_FILENAME_CHARS; - -public class ElasticsearchIndexDescriptor { - - private static final int MAX_NAME_LENGTH = 255; - - private static final String INVALID_CHARS_REGEX = Pattern.quote(INVALID_FILENAME_CHARS - .stream() - .map(Object::toString) - .collect(Collectors.joining(""))); - - private final ElasticsearchConnection connection; - private final String indexName; - - public ElasticsearchIndexDescriptor(@NotNull ElasticsearchConnection connection, IndexDefinition indexDefinition) { - this.connection = connection; - this.indexName = getRemoteIndexName(indexDefinition); - } - - public RestHighLevelClient getClient() { - return connection.getClient(); - } - - public String getIndexName() { - return indexName; - } - - @Override - public int hashCode() { - return Objects.hash(connection, indexName); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ElasticsearchIndexDescriptor that = (ElasticsearchIndexDescriptor) o; - return connection.equals(that.connection) && - indexName.equals(that.indexName); - } - - private String getRemoteIndexName(IndexDefinition definition) { - String suffix = definition.getUniqueId(); - - if (suffix == null) { - suffix = String.valueOf(definition.getReindexCount()); - } - - return getESSafeIndexName(definition.getIndexPath() + "-" + suffix); - } - - /** - *

- *

- * The resulting file name would be truncated to MAX_NAME_LENGTH - */ - private static String getESSafeIndexName(String indexPath) { - String name = StreamSupport - .stream(PathUtils.elements(indexPath).spliterator(), false) - .limit(3) //Max 3 nodeNames including oak:index which is the immediate parent for any indexPath - .filter(p -> !"oak:index".equals(p)) - .map(ElasticsearchIndexDescriptor::getESSafeName) - .collect(Collectors.joining("_")); - - if (name.length() > MAX_NAME_LENGTH) { - name = name.substring(0, MAX_NAME_LENGTH); - } - return name; - } - - /** - * Convert {@code e} to Elasticsearch safe index name. - * Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html - */ - private static String getESSafeName(String suggestedIndexName) { - return suggestedIndexName.replaceAll(INVALID_CHARS_REGEX, "").toLowerCase(); - } - -} diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java index 7d3789fca7..1870699b67 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java @@ -166,7 +166,7 @@ public class ElasticsearchIndexProviderService { ElasticsearchIndexProvider indexProvider = new ElasticsearchIndexProvider(elasticsearchConnection); Dictionary props = new Hashtable<>(); - props.put("type", ElasticsearchIndexConstants.TYPE_ELASTICSEARCH); + props.put("type", ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH); regs.add(bundleContext.registerService(QueryIndexProvider.class.getName(), indexProvider, props)); } @@ -174,7 +174,7 @@ public class ElasticsearchIndexProviderService { ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(elasticsearchConnection, extractedTextCache); Dictionary props = new Hashtable<>(); - props.put("type", ElasticsearchIndexConstants.TYPE_ELASTICSEARCH); + props.put("type", ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH); regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, props)); // oakRegs.add(registerMBean(whiteboard, // TextExtractionStatsMBean.class, diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java index 524d19911b..939aaabd7a 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java @@ -32,7 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class ElasticsearchDocument { +class ElasticsearchDocument { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDocument.class); // id should only be useful for logging (at least as of now) diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java index b1b14406a6..6210a37d74 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java @@ -34,7 +34,7 @@ import org.jetbrains.annotations.Nullable; import java.io.IOException; import java.util.List; -public class ElasticsearchDocumentMaker extends FulltextDocumentMaker { +class ElasticsearchDocumentMaker extends FulltextDocumentMaker { ElasticsearchDocumentMaker(@Nullable FulltextBinaryTextExtractor textExtractor, @NotNull IndexDefinition definition, diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java index 9ff586e787..fd5ca91849 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java @@ -25,7 +25,7 @@ import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexEd * {@link IndexEditor} implementation that is responsible for keeping the * corresponding Elasticsearch index up to date */ -public class ElasticsearchIndexEditor extends FulltextIndexEditor { +class ElasticsearchIndexEditor extends FulltextIndexEditor { ElasticsearchIndexEditor(FulltextIndexEditorContext context) throws CommitFailedException { super(context); } diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java index b3a8a2bb4a..b215301510 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java @@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index; import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback; import org.apache.jackrabbit.oak.plugins.index.IndexingContext; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache; import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.DocumentMaker; @@ -28,7 +29,8 @@ import org.jetbrains.annotations.Nullable; import java.io.IOException; -public class ElasticsearchIndexEditorContext extends FulltextIndexEditorContext { +class ElasticsearchIndexEditorContext extends FulltextIndexEditorContext { + ElasticsearchIndexEditorContext(NodeState root, NodeBuilder definition, @Nullable IndexDefinition indexDefinition, IndexUpdateCallback updateCallback, @@ -41,7 +43,7 @@ public class ElasticsearchIndexEditorContext extends FulltextIndexEditorContext< @Override public IndexDefinition.Builder newDefinitionBuilder() { - return new IndexDefinition.Builder(); + return new ElasticsearchIndexDefinition.Builder(); } @Override diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java index 4a55b83662..777722ead6 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java @@ -30,7 +30,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeState; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants.TYPE_ELASTICSEARCH; +import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH; public class ElasticsearchIndexEditorProvider implements IndexEditorProvider { diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java index 78b739264a..69312ea0a1 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java @@ -17,76 +17,89 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.FieldNames; -import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.TestOnly; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.stream.Collectors; import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId; -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; -import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -public class ElasticsearchIndexWriter implements FulltextIndexWriter { +class ElasticsearchIndexWriter implements FulltextIndexWriter { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIndexWriter.class); - private final ElasticsearchIndexDescriptor indexDescriptor; + private final ElasticsearchConnection elasticsearchConnection; + private final ElasticsearchIndexDefinition indexDefinition; - private final boolean isAsync; + private final BulkProcessor bulkProcessor; - // TODO: use bulk API - https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html - ElasticsearchIndexWriter(@NotNull IndexDefinition indexDefinition, - @NotNull ElasticsearchConnection elasticsearchConnection) { - indexDescriptor = new ElasticsearchIndexDescriptor(elasticsearchConnection, indexDefinition); + ElasticsearchIndexWriter(@NotNull ElasticsearchConnection elasticsearchConnection, + @NotNull ElasticsearchIndexDefinition indexDefinition) { + this.elasticsearchConnection = elasticsearchConnection; + this.indexDefinition = indexDefinition; + bulkProcessor = initBulkProcessor(); + } + + @TestOnly + ElasticsearchIndexWriter(@NotNull ElasticsearchConnection elasticsearchConnection, + @NotNull ElasticsearchIndexDefinition indexDefinition, + @NotNull BulkProcessor bulkProcessor) { + this.elasticsearchConnection = elasticsearchConnection; + this.indexDefinition = indexDefinition; + this.bulkProcessor = bulkProcessor; + } - // TODO: ES indexing put another bit delay before docs appear in search. - // For test without "async" indexing, we can use following hack BUT those where we - // would setup async, we'd need to find another way. - isAsync = indexDefinition.getDefinitionNodeState().getProperty("async") != null; + private BulkProcessor initBulkProcessor() { + return BulkProcessor.builder((request, bulkListener) -> + elasticsearchConnection.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), + new OakBulkProcessorLister()) + .setBulkActions(indexDefinition.bulkActions) + .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes)) + .setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs)) + .setBackoffPolicy(BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff), indexDefinition.bulkRetries) + ) + .build(); } @Override public void updateDocument(String path, ElasticsearchDocument doc) throws IOException { - IndexRequest request = new IndexRequest(indexDescriptor.getIndexName()) + IndexRequest request = new IndexRequest(indexDefinition.getRemoteIndexName()) .id(pathToId(path)) - // immediate refresh would slow indexing response such that next - // search would see the effect of this indexed doc. Must only get - // enabled in tests (hopefully there are no non-async indexes in real life) - .setRefreshPolicy(isAsync ? NONE : IMMEDIATE) .source(doc.build(), XContentType.JSON); - IndexResponse response = indexDescriptor.getClient().index(request, RequestOptions.DEFAULT); - LOG.trace("update {} - {}. Response: {}", path, doc, response); + bulkProcessor.add(request); } @Override public void deleteDocuments(String path) throws IOException { - DeleteRequest request = new DeleteRequest(indexDescriptor.getIndexName()) - .id(pathToId(path)) - // immediate refresh would slow indexing response such that next - // search would see the effect of this indexed doc. Must only get - // enabled in tests (hopefully there are no non-async indexes in real life) - .setRefreshPolicy(isAsync ? NONE : IMMEDIATE); - DeleteResponse response = indexDescriptor.getClient().delete(request, RequestOptions.DEFAULT); - LOG.trace("delete {}. Response: {}", path, response); - + DeleteRequest request = new DeleteRequest(indexDefinition.getRemoteIndexName()) + .id(pathToId(path)); + bulkProcessor.add(request); } @Override @@ -99,7 +112,7 @@ public class ElasticsearchIndexWriter implements FulltextIndexWriter {}", executionId, bulkRequest.getDescription()); + if (LOG.isTraceEnabled()) { + LOG.trace("Bulk Requests: \n{}", bulkRequest.requests() + .stream() + .map(DocWriteRequest::toString) + .collect(Collectors.joining("\n")) + ); + } + } - LOG.info("Updated settings {}. Response acknowledged: {}", requestMsg, response.isAcknowledged()); + @Override + public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) { + LOG.info("Bulk with id {} processed with status {} in {}", executionId, bulkResponse.status(), bulkResponse.getTook()); + if (LOG.isTraceEnabled()) { + try { + LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(), EMPTY_PARAMS))); + } catch (IOException e) { + LOG.error("Error decoding bulk response", e); + } + } + if (bulkResponse.hasFailures()) { // check if some operations failed to execute + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (bulkItemResponse.isFailed()) { + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + LOG.error("Bulk item with id {} failed", failure.getId(), failure.getCause()); + } + } + } + } + + @Override + public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) { + LOG.error("Bulk with id {} thrown an error", executionId, throwable); + } } } diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java index bac172c64d..e1540c1e2e 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java @@ -17,12 +17,13 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition; import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.jetbrains.annotations.NotNull; -public class ElasticsearchIndexWriterFactory implements FulltextIndexWriterFactory { +class ElasticsearchIndexWriterFactory implements FulltextIndexWriterFactory { private final ElasticsearchConnection elasticsearchConnection; ElasticsearchIndexWriterFactory(@NotNull ElasticsearchConnection elasticsearchConnection) { @@ -31,6 +32,10 @@ public class ElasticsearchIndexWriterFactory implements FulltextIndexWriterFacto @Override public ElasticsearchIndexWriter newInstance(IndexDefinition definition, NodeBuilder definitionBuilder, boolean reindex) { - return new ElasticsearchIndexWriter(definition, elasticsearchConnection); + if (!(definition instanceof ElasticsearchIndexDefinition)) { + throw new IllegalArgumentException("IndexDefinition must be of type ElasticsearchIndexDefinition " + + "instead of " + definition.getClass().getName()); + } + return new ElasticsearchIndexWriter(elasticsearchConnection, (ElasticsearchIndexDefinition) definition); } } diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java index 4e103a5252..61da4b83a0 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java @@ -35,9 +35,9 @@ import java.util.WeakHashMap; import java.util.function.Predicate; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME; -import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants.TYPE_ELASTICSEARCH; +import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH; -public class ElasticsearchIndex extends FulltextIndex { +class ElasticsearchIndex extends FulltextIndex { private static final Predicate ELASTICSEARCH_INDEX_DEFINITION_PREDICATE = state -> TYPE_ELASTICSEARCH.equals(state.getString(TYPE_PROPERTY_NAME)); private static final Map estimators = new WeakHashMap<>(); diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java index 11d589ccf2..e87647e53a 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java @@ -18,7 +18,6 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.query; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor; import org.apache.jackrabbit.oak.plugins.index.search.IndexNode; import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics; import org.apache.jackrabbit.oak.spi.state.NodeState; @@ -26,16 +25,16 @@ import org.apache.jackrabbit.oak.spi.state.NodeStateUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public class ElasticsearchIndexNode implements IndexNode { +class ElasticsearchIndexNode implements IndexNode { + private final ElasticsearchConnection elasticsearchConnection; private final ElasticsearchIndexDefinition indexDefinition; - private final ElasticsearchIndexDescriptor indexDescriptor; - protected ElasticsearchIndexNode(@NotNull NodeState root, @NotNull String indexPath, + ElasticsearchIndexNode(@NotNull NodeState root, @NotNull String indexPath, @NotNull ElasticsearchConnection elasticsearchConnection) { final NodeState indexNS = NodeStateUtils.getNode(root, indexPath); this.indexDefinition = new ElasticsearchIndexDefinition(root, indexNS, indexPath); - this.indexDescriptor = new ElasticsearchIndexDescriptor(elasticsearchConnection, indexDefinition); + this.elasticsearchConnection = elasticsearchConnection; } @Override @@ -48,8 +47,8 @@ public class ElasticsearchIndexNode implements IndexNode { return indexDefinition; } - public ElasticsearchIndexDescriptor getIndexDescriptor() { - return indexDescriptor; + public ElasticsearchConnection getConnection() { + return elasticsearchConnection; } @Override @@ -61,6 +60,6 @@ public class ElasticsearchIndexNode implements IndexNode { @Override public @Nullable IndexStatistics getIndexStatistics() { - return new ElasticsearchIndexStatistics(indexDescriptor); + return new ElasticsearchIndexStatistics(elasticsearchConnection); } } diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java index caf1eef994..a898044ecd 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java @@ -16,20 +16,21 @@ */ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.query; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.index.query.QueryBuilders; +import org.jetbrains.annotations.NotNull; import java.io.IOException; -public class ElasticsearchIndexStatistics implements IndexStatistics { - private final ElasticsearchIndexDescriptor elasticsearchIndexDescriptor; +class ElasticsearchIndexStatistics implements IndexStatistics { + private final ElasticsearchConnection elasticsearchConnection; - ElasticsearchIndexStatistics(ElasticsearchIndexDescriptor elasticsearchIndexDescriptor) { - this.elasticsearchIndexDescriptor = elasticsearchIndexDescriptor; + ElasticsearchIndexStatistics(@NotNull ElasticsearchConnection elasticsearchConnection) { + this.elasticsearchConnection = elasticsearchConnection; } @Override @@ -37,7 +38,7 @@ public class ElasticsearchIndexStatistics implements IndexStatistics { CountRequest countRequest = new CountRequest(); countRequest.query(QueryBuilders.matchAllQuery()); try { - CountResponse count = elasticsearchIndexDescriptor.getClient().count(countRequest, RequestOptions.DEFAULT); + CountResponse count = elasticsearchConnection.getClient().count(countRequest, RequestOptions.DEFAULT); return (int) count.getCount(); } catch (IOException e) { // ignore failure @@ -50,7 +51,7 @@ public class ElasticsearchIndexStatistics implements IndexStatistics { CountRequest countRequest = new CountRequest(); countRequest.query(QueryBuilders.existsQuery(key)); try { - CountResponse count = elasticsearchIndexDescriptor.getClient().count(countRequest, RequestOptions.DEFAULT); + CountResponse count = elasticsearchConnection.getClient().count(countRequest, RequestOptions.DEFAULT); return (int) count.getCount(); } catch (IOException e) { // ignore failure diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java index ddc5a53e98..fb3c5bd98a 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java @@ -83,7 +83,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; -public class ElasticsearchResultRowIterator implements Iterator { +class ElasticsearchResultRowIterator implements Iterator { private static final Logger LOG = LoggerFactory .getLogger(ElasticsearchResultRowIterator.class); private static final PerfLogger PERF_LOGGER = diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java index c69e954b4e..cb05399d3a 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java @@ -16,7 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.query; -import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor; import org.apache.jackrabbit.oak.plugins.index.search.FieldNames; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; @@ -27,11 +26,11 @@ import org.jetbrains.annotations.NotNull; import java.io.IOException; -public class ElasticsearchSearcher { - private final ElasticsearchIndexDescriptor indexDescriptor; +class ElasticsearchSearcher { + private final ElasticsearchIndexNode indexNode; ElasticsearchSearcher(@NotNull ElasticsearchIndexNode indexNode) { - indexDescriptor = indexNode.getIndexDescriptor(); + this.indexNode = indexNode; } public SearchResponse search(QueryBuilder query, int batchSize) throws IOException { @@ -41,9 +40,9 @@ public class ElasticsearchSearcher { .storedField(FieldNames.PATH) .size(batchSize); - SearchRequest request = new SearchRequest(indexDescriptor.getIndexName()) + SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexName()) .source(searchSourceBuilder); - return indexDescriptor.getClient().search(request, RequestOptions.DEFAULT); + return indexNode.getConnection().getClient().search(request, RequestOptions.DEFAULT); } } diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java index d41b766fb3..389dc1c653 100644 --- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java +++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java @@ -18,8 +18,11 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.util; import org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder; -import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants.TYPE_ELASTICSEARCH; +import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH; +/** + * Utility class to create Elasticsearch Index Definitions along with the relevant node structure. + */ public class ElasticsearchIndexDefinitionBuilder extends IndexDefinitionBuilder { @Override protected String getIndexType() { diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java index d72ff7a54e..8145e8db5b 100644 --- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java +++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java @@ -35,7 +35,6 @@ import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.elasticsearch.Version; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -48,6 +47,7 @@ import java.util.Arrays; import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME; +import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT; import static org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROPDEF_PROP_NODE_NAME; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; @@ -118,12 +118,15 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { root.commit(); String propaQuery = "select [jcr:path] from [nt:base] where [propa] = 'foo'"; + + assertEventually(() -> { assertThat(explain(propaQuery), containsString("elasticsearch:test1")); assertThat(explain("select [jcr:path] from [nt:base] where [propc] = 'foo'"), containsString("elasticsearch:test2")); assertQuery(propaQuery, Arrays.asList("/test/a", "/test/b")); assertQuery("select [jcr:path] from [nt:base] where [propa] = 'foo2'", singletonList("/test/c")); assertQuery("select [jcr:path] from [nt:base] where [propc] = 'foo'", singletonList("/test/d")); + }); } //OAK-3825 @@ -148,10 +151,13 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { String queryPrefix = "select [jcr:path] from [nt:base] where ISDESCENDANTNODE('/test') AND "; //test String propabQuery = queryPrefix + "LOCALNAME() = 'foo'"; + + assertEventually(() -> { String explanation = explain(propabQuery); - Assert.assertThat(explanation, containsString("elasticsearch:test1(/oak:index/test1) ")); - Assert.assertThat(explanation, containsString("{\"term\":{\":nodeName\":{\"value\":\"foo\",")); + assertThat(explanation, containsString("elasticsearch:test1(/oak:index/test1) ")); + assertThat(explanation, containsString("{\"term\":{\":nodeName\":{\"value\":\"foo\",")); assertQuery(propabQuery, singletonList("/test/foo")); + assertQuery(queryPrefix + "LOCALNAME() = 'bar'", singletonList("/test/sc/bar")); assertQuery(queryPrefix + "LOCALNAME() LIKE 'foo'", singletonList("/test/foo")); assertQuery(queryPrefix + "LOCALNAME() LIKE 'camel%'", singletonList("/test/camelCase")); @@ -159,6 +165,7 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { assertQuery(queryPrefix + "NAME() = 'bar'", singletonList("/test/sc/bar")); assertQuery(queryPrefix + "NAME() LIKE 'foo'", singletonList("/test/foo")); assertQuery(queryPrefix + "NAME() LIKE 'camel%'", singletonList("/test/camelCase")); + }); } @Test @@ -171,7 +178,8 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { test.addChild("b"); root.commit(); - Assert.assertThat(explain("select [jcr:path] from [nt:base] where [propa] = 'foo'"), containsString("elasticsearch:test1")); + assertEventually(() -> assertThat(explain("select [jcr:path] from [nt:base] where [propa] = 'foo'"), + containsString("elasticsearch:test1"))); } @Test @@ -184,7 +192,8 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { test.addChild("c").setProperty("propb", "e"); root.commit(); - assertQuery("select [jcr:path] from [nt:base] where propa is not null", Arrays.asList("/test/a", "/test/b")); + assertEventually(() -> assertQuery("select [jcr:path] from [nt:base] where propa is not null", + Arrays.asList("/test/a", "/test/b"))); } private static IndexDefinitionBuilder createIndex(String... propNames) { @@ -204,4 +213,36 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { String explain = "explain " + query; return executeQuery(explain, "JCR-SQL2").get(0); } + + private static void assertEventually(Runnable r) { + assertEventually(r, BULK_FLUSH_INTERVAL_MS_DEFAULT * 3); + } + + private static void assertEventually(Runnable r, long timeoutMillis) { + final long start = System.currentTimeMillis(); + long lastAttempt = 0; + int attempts = 0; + + while (true) { + try { + attempts++; + lastAttempt = System.currentTimeMillis(); + r.run(); + return; + } catch (Throwable e) { + long elapsedTime = lastAttempt - start; + if (elapsedTime >= timeoutMillis) { + String msg = String.format("Condition not satisfied after %1.2f seconds and %d attempts", + elapsedTime / 1000d, attempts); + throw new AssertionError(msg, e); + } + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + + } + } + } } diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java new file mode 100644 index 0000000000..bab1a211e6 --- /dev/null +++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index; + +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection; +import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.net.URLEncoder; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ElasticsearchIndexWriterTest { + + @Mock + private ElasticsearchConnection elasticsearchConnectionMock; + + @Mock + private ElasticsearchIndexDefinition indexDefinitionMock; + + @Mock + private BulkProcessor bulkProcessorMock; + + private ElasticsearchIndexWriter indexWriter; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(indexDefinitionMock.getRemoteIndexName()).thenReturn("test-index"); + indexWriter = new ElasticsearchIndexWriter(elasticsearchConnectionMock, indexDefinitionMock, bulkProcessorMock); + } + + @Test + public void singleUpdateDocument() throws IOException { + indexWriter.updateDocument("/foo", new ElasticsearchDocument("/foo")); + + ArgumentCaptor acIndexRequest = ArgumentCaptor.forClass(IndexRequest.class); + verify(bulkProcessorMock).add(acIndexRequest.capture()); + + IndexRequest request = acIndexRequest.getValue(); + assertEquals(request.index(), "test-index"); + assertEquals(request.id(), URLEncoder.encode("/foo", "UTF-8")); + } + + @Test + public void singleDeleteDocument() throws IOException { + indexWriter.deleteDocuments("/bar"); + + ArgumentCaptor acDeleteRequest = ArgumentCaptor.forClass(DeleteRequest.class); + verify(bulkProcessorMock).add(acDeleteRequest.capture()); + + DeleteRequest request = acDeleteRequest.getValue(); + assertEquals(request.index(), "test-index"); + assertEquals(request.id(), URLEncoder.encode("/bar", "UTF-8")); + } + + @Test + public void multiRequests() throws IOException { + indexWriter.updateDocument("/foo", new ElasticsearchDocument("/foo")); + indexWriter.updateDocument("/bar", new ElasticsearchDocument("/bar")); + indexWriter.deleteDocuments("/foo"); + indexWriter.deleteDocuments("/bar"); + + ArgumentCaptor acIndexRequest = ArgumentCaptor.forClass(IndexRequest.class); + verify(bulkProcessorMock, times(2)).add(acIndexRequest.capture()); + ArgumentCaptor acDeleteRequest = ArgumentCaptor.forClass(DeleteRequest.class); + verify(bulkProcessorMock, times(2)).add(acDeleteRequest.capture()); + } + +} diff --git a/oak-search-elastic/src/test/resources/logback-test.xml b/oak-search-elastic/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..e92401c93e --- /dev/null +++ b/oak-search-elastic/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ + + + + + + %date{HH:mm:ss.SSS} %-5level %-40([%thread] %F:%L) %msg%n + + + + + target/unit-tests.log + + %date{HH:mm:ss.SSS} %-5level %-40([%thread] %F:%L) %msg%n + + + + + + + + +