org.apache.jackrabbit
oak-core
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexConstants.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexConstants.java
deleted file mode 100644
index 8e0c5aa3d0..0000000000
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexConstants.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.index.elasticsearch;
-
-public class ElasticsearchIndexConstants {
- public static final String TYPE_ELASTICSEARCH = "elasticsearch";
-}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
index 1eeade244f..525d058abc 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDefinition.java
@@ -18,11 +18,126 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elasticsearch;
+import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.jackrabbit.oak.plugins.index.search.util.ConfigUtil.getOptionalValue;
+import static org.elasticsearch.common.Strings.INVALID_FILENAME_CHARS;
+
public class ElasticsearchIndexDefinition extends IndexDefinition {
+
+ public static final String TYPE_ELASTICSEARCH = "elasticsearch";
+
+ public static final String BULK_ACTIONS = "bulkActions";
+ public static final int BULK_ACTIONS_DEFAULT = 250;
+
+ public static final String BULK_SIZE_BYTES = "bulkSizeBytes";
+ public static final long BULK_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024; // 2MB
+
+ public static final String BULK_FLUSH_INTERVAL_MS = "bulkFlushIntervalMs";
+ public static final long BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;
+
+ public static final String BULK_RETRIES = "bulkRetries";
+ public static final int BULK_RETRIES_DEFAULT = 3;
+
+ public static final String BULK_RETRIES_BACKOFF = "bulkRetriesBackoff";
+ public static final long BULK_RETRIES_BACKOFF_DEFAULT = 200;
+
+ private static final int MAX_NAME_LENGTH = 255;
+
+ private static final String INVALID_CHARS_REGEX = Pattern.quote(INVALID_FILENAME_CHARS
+ .stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("")));
+
+ private final String remoteIndexName;
+
+ public final int bulkActions;
+ public final long bulkSizeBytes;
+ public final long bulkFlushIntervalMs;
+ public final int bulkRetries;
+ public final long bulkRetriesBackoff;
+
public ElasticsearchIndexDefinition(NodeState root, NodeState defn, String indexPath) {
super(root, getIndexDefinitionState(defn), determineIndexFormatVersion(defn), determineUniqueId(defn), indexPath);
+ this.remoteIndexName = setupIndexName();
+
+ this.bulkActions = getOptionalValue(defn, BULK_ACTIONS, BULK_ACTIONS_DEFAULT);
+ this.bulkSizeBytes = getOptionalValue(defn, BULK_SIZE_BYTES, BULK_SIZE_BYTES_DEFAULT);
+ this.bulkFlushIntervalMs = getOptionalValue(defn, BULK_FLUSH_INTERVAL_MS, BULK_FLUSH_INTERVAL_MS_DEFAULT);
+ this.bulkRetries = getOptionalValue(defn, BULK_RETRIES, BULK_RETRIES_DEFAULT);
+ this.bulkRetriesBackoff = getOptionalValue(defn, BULK_RETRIES_BACKOFF, BULK_RETRIES_BACKOFF_DEFAULT);
+ }
+
+ /**
+ * Returns the index identifier on the Elasticsearch cluster. Notice this can be different from the value returned
+ * from {@code getIndexName}.
+ * @return the Elasticsearch index identifier
+ */
+ public String getRemoteIndexName() {
+ return remoteIndexName;
+ }
+
+ private String setupIndexName() {
+ // TODO: implement advanced remote index name strategy that takes into account multiple tenants and re-index process
+ return getESSafeIndexName(getIndexPath() + "-" + getReindexCount());
+ }
+
+ /**
+ *
+ * - abc -> abc
+ * - xy:abc -> xyabc
+ * - /oak:index/abc -> abc
+ *
+ *
+ * The resulting file name would be truncated to MAX_NAME_LENGTH
+ */
+ private static String getESSafeIndexName(String indexPath) {
+ String name = StreamSupport
+ .stream(PathUtils.elements(indexPath).spliterator(), false)
+ .limit(3) //Max 3 nodeNames including oak:index which is the immediate parent for any indexPath
+ .filter(p -> !"oak:index".equals(p))
+ .map(ElasticsearchIndexDefinition::getESSafeName)
+ .collect(Collectors.joining("_"));
+
+ if (name.length() > MAX_NAME_LENGTH) {
+ name = name.substring(0, MAX_NAME_LENGTH);
+ }
+ return name;
+ }
+
+ /**
+ * Convert {@code e} to Elasticsearch safe index name.
+ * Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
+ */
+ private static String getESSafeName(String suggestedIndexName) {
+ return suggestedIndexName.replaceAll(INVALID_CHARS_REGEX, "").toLowerCase();
+ }
+
+ /**
+ * Class to help with {@link ElasticsearchIndexDefinition} creation.
+ * The built object represents the index definition only without the node structure.
+ */
+ public static class Builder extends IndexDefinition.Builder {
+ @Override
+ public ElasticsearchIndexDefinition build() {
+ return (ElasticsearchIndexDefinition) super.build();
+ }
+
+ @Override
+ public ElasticsearchIndexDefinition.Builder reindex() {
+ super.reindex();
+ return this;
+ }
+
+ @Override
+ protected IndexDefinition createInstance(NodeState indexDefnStateToUse) {
+ return new ElasticsearchIndexDefinition(root, indexDefnStateToUse, indexPath);
+ }
}
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDescriptor.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDescriptor.java
deleted file mode 100644
index cac4015d62..0000000000
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexDescriptor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.index.elasticsearch;
-
-import org.apache.jackrabbit.oak.commons.PathUtils;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.jetbrains.annotations.NotNull;
-
-import java.util.Objects;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-import static org.elasticsearch.common.Strings.INVALID_FILENAME_CHARS;
-
-public class ElasticsearchIndexDescriptor {
-
- private static final int MAX_NAME_LENGTH = 255;
-
- private static final String INVALID_CHARS_REGEX = Pattern.quote(INVALID_FILENAME_CHARS
- .stream()
- .map(Object::toString)
- .collect(Collectors.joining("")));
-
- private final ElasticsearchConnection connection;
- private final String indexName;
-
- public ElasticsearchIndexDescriptor(@NotNull ElasticsearchConnection connection, IndexDefinition indexDefinition) {
- this.connection = connection;
- this.indexName = getRemoteIndexName(indexDefinition);
- }
-
- public RestHighLevelClient getClient() {
- return connection.getClient();
- }
-
- public String getIndexName() {
- return indexName;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(connection, indexName);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- ElasticsearchIndexDescriptor that = (ElasticsearchIndexDescriptor) o;
- return connection.equals(that.connection) &&
- indexName.equals(that.indexName);
- }
-
- private String getRemoteIndexName(IndexDefinition definition) {
- String suffix = definition.getUniqueId();
-
- if (suffix == null) {
- suffix = String.valueOf(definition.getReindexCount());
- }
-
- return getESSafeIndexName(definition.getIndexPath() + "-" + suffix);
- }
-
- /**
- *
- * - abc -> abc
- * - xy:abc -> xyabc
- * - /oak:index/abc -> abc
- *
- *
- * The resulting file name would be truncated to MAX_NAME_LENGTH
- */
- private static String getESSafeIndexName(String indexPath) {
- String name = StreamSupport
- .stream(PathUtils.elements(indexPath).spliterator(), false)
- .limit(3) //Max 3 nodeNames including oak:index which is the immediate parent for any indexPath
- .filter(p -> !"oak:index".equals(p))
- .map(ElasticsearchIndexDescriptor::getESSafeName)
- .collect(Collectors.joining("_"));
-
- if (name.length() > MAX_NAME_LENGTH) {
- name = name.substring(0, MAX_NAME_LENGTH);
- }
- return name;
- }
-
- /**
- * Convert {@code e} to Elasticsearch safe index name.
- * Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
- */
- private static String getESSafeName(String suggestedIndexName) {
- return suggestedIndexName.replaceAll(INVALID_CHARS_REGEX, "").toLowerCase();
- }
-
-}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java
index 7d3789fca7..1870699b67 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java
@@ -166,7 +166,7 @@ public class ElasticsearchIndexProviderService {
ElasticsearchIndexProvider indexProvider = new ElasticsearchIndexProvider(elasticsearchConnection);
Dictionary props = new Hashtable<>();
- props.put("type", ElasticsearchIndexConstants.TYPE_ELASTICSEARCH);
+ props.put("type", ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH);
regs.add(bundleContext.registerService(QueryIndexProvider.class.getName(), indexProvider, props));
}
@@ -174,7 +174,7 @@ public class ElasticsearchIndexProviderService {
ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(elasticsearchConnection, extractedTextCache);
Dictionary props = new Hashtable<>();
- props.put("type", ElasticsearchIndexConstants.TYPE_ELASTICSEARCH);
+ props.put("type", ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH);
regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, props));
// oakRegs.add(registerMBean(whiteboard,
// TextExtractionStatsMBean.class,
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java
index 524d19911b..939aaabd7a 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocument.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ElasticsearchDocument {
+class ElasticsearchDocument {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDocument.class);
// id should only be useful for logging (at least as of now)
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java
index b1b14406a6..6210a37d74 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchDocumentMaker.java
@@ -34,7 +34,7 @@ import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.util.List;
-public class ElasticsearchDocumentMaker extends FulltextDocumentMaker {
+class ElasticsearchDocumentMaker extends FulltextDocumentMaker {
ElasticsearchDocumentMaker(@Nullable FulltextBinaryTextExtractor textExtractor,
@NotNull IndexDefinition definition,
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java
index 9ff586e787..fd5ca91849 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditor.java
@@ -25,7 +25,7 @@ import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexEd
* {@link IndexEditor} implementation that is responsible for keeping the
* corresponding Elasticsearch index up to date
*/
-public class ElasticsearchIndexEditor extends FulltextIndexEditor {
+class ElasticsearchIndexEditor extends FulltextIndexEditor {
ElasticsearchIndexEditor(FulltextIndexEditorContext context) throws CommitFailedException {
super(context);
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java
index b3a8a2bb4a..b215301510 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorContext.java
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
+import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.DocumentMaker;
@@ -28,7 +29,8 @@ import org.jetbrains.annotations.Nullable;
import java.io.IOException;
-public class ElasticsearchIndexEditorContext extends FulltextIndexEditorContext {
+class ElasticsearchIndexEditorContext extends FulltextIndexEditorContext {
+
ElasticsearchIndexEditorContext(NodeState root,
NodeBuilder definition, @Nullable IndexDefinition indexDefinition,
IndexUpdateCallback updateCallback,
@@ -41,7 +43,7 @@ public class ElasticsearchIndexEditorContext extends FulltextIndexEditorContext<
@Override
public IndexDefinition.Builder newDefinitionBuilder() {
- return new IndexDefinition.Builder();
+ return new ElasticsearchIndexDefinition.Builder();
}
@Override
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java
index 4a55b83662..777722ead6 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexEditorProvider.java
@@ -30,7 +30,7 @@ import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants.TYPE_ELASTICSEARCH;
+import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH;
public class ElasticsearchIndexEditorProvider implements IndexEditorProvider {
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
index 78b739264a..69312ea0a1 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriter.java
@@ -17,76 +17,89 @@
package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
-import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor;
+import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
-import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.stream.Collectors;
import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.index.ElasticsearchDocument.pathToId;
-import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
-import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE;
import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-public class ElasticsearchIndexWriter implements FulltextIndexWriter {
+class ElasticsearchIndexWriter implements FulltextIndexWriter {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchIndexWriter.class);
- private final ElasticsearchIndexDescriptor indexDescriptor;
+ private final ElasticsearchConnection elasticsearchConnection;
+ private final ElasticsearchIndexDefinition indexDefinition;
- private final boolean isAsync;
+ private final BulkProcessor bulkProcessor;
- // TODO: use bulk API - https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html
- ElasticsearchIndexWriter(@NotNull IndexDefinition indexDefinition,
- @NotNull ElasticsearchConnection elasticsearchConnection) {
- indexDescriptor = new ElasticsearchIndexDescriptor(elasticsearchConnection, indexDefinition);
+ ElasticsearchIndexWriter(@NotNull ElasticsearchConnection elasticsearchConnection,
+ @NotNull ElasticsearchIndexDefinition indexDefinition) {
+ this.elasticsearchConnection = elasticsearchConnection;
+ this.indexDefinition = indexDefinition;
+ bulkProcessor = initBulkProcessor();
+ }
+
+ @TestOnly
+ ElasticsearchIndexWriter(@NotNull ElasticsearchConnection elasticsearchConnection,
+ @NotNull ElasticsearchIndexDefinition indexDefinition,
+ @NotNull BulkProcessor bulkProcessor) {
+ this.elasticsearchConnection = elasticsearchConnection;
+ this.indexDefinition = indexDefinition;
+ this.bulkProcessor = bulkProcessor;
+ }
- // TODO: ES indexing put another bit delay before docs appear in search.
- // For test without "async" indexing, we can use following hack BUT those where we
- // would setup async, we'd need to find another way.
- isAsync = indexDefinition.getDefinitionNodeState().getProperty("async") != null;
+ private BulkProcessor initBulkProcessor() {
+ return BulkProcessor.builder((request, bulkListener) ->
+ elasticsearchConnection.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
+ new OakBulkProcessorLister())
+ .setBulkActions(indexDefinition.bulkActions)
+ .setBulkSize(new ByteSizeValue(indexDefinition.bulkSizeBytes))
+ .setFlushInterval(TimeValue.timeValueMillis(indexDefinition.bulkFlushIntervalMs))
+ .setBackoffPolicy(BackoffPolicy.exponentialBackoff(
+ TimeValue.timeValueMillis(indexDefinition.bulkRetriesBackoff), indexDefinition.bulkRetries)
+ )
+ .build();
}
@Override
public void updateDocument(String path, ElasticsearchDocument doc) throws IOException {
- IndexRequest request = new IndexRequest(indexDescriptor.getIndexName())
+ IndexRequest request = new IndexRequest(indexDefinition.getRemoteIndexName())
.id(pathToId(path))
- // immediate refresh would slow indexing response such that next
- // search would see the effect of this indexed doc. Must only get
- // enabled in tests (hopefully there are no non-async indexes in real life)
- .setRefreshPolicy(isAsync ? NONE : IMMEDIATE)
.source(doc.build(), XContentType.JSON);
- IndexResponse response = indexDescriptor.getClient().index(request, RequestOptions.DEFAULT);
- LOG.trace("update {} - {}. Response: {}", path, doc, response);
+ bulkProcessor.add(request);
}
@Override
public void deleteDocuments(String path) throws IOException {
- DeleteRequest request = new DeleteRequest(indexDescriptor.getIndexName())
- .id(pathToId(path))
- // immediate refresh would slow indexing response such that next
- // search would see the effect of this indexed doc. Must only get
- // enabled in tests (hopefully there are no non-async indexes in real life)
- .setRefreshPolicy(isAsync ? NONE : IMMEDIATE);
- DeleteResponse response = indexDescriptor.getClient().delete(request, RequestOptions.DEFAULT);
- LOG.trace("delete {}. Response: {}", path, response);
-
+ DeleteRequest request = new DeleteRequest(indexDefinition.getRemoteIndexName())
+ .id(pathToId(path));
+ bulkProcessor.add(request);
}
@Override
@@ -99,7 +112,7 @@ public class ElasticsearchIndexWriter implements FulltextIndexWriter {}", executionId, bulkRequest.getDescription());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Bulk Requests: \n{}", bulkRequest.requests()
+ .stream()
+ .map(DocWriteRequest::toString)
+ .collect(Collectors.joining("\n"))
+ );
+ }
+ }
- LOG.info("Updated settings {}. Response acknowledged: {}", requestMsg, response.isAcknowledged());
+ @Override
+ public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
+ LOG.info("Bulk with id {} processed with status {} in {}", executionId, bulkResponse.status(), bulkResponse.getTook());
+ if (LOG.isTraceEnabled()) {
+ try {
+ LOG.trace(Strings.toString(bulkResponse.toXContent(jsonBuilder(), EMPTY_PARAMS)));
+ } catch (IOException e) {
+ LOG.error("Error decoding bulk response", e);
+ }
+ }
+ if (bulkResponse.hasFailures()) { // check if some operations failed to execute
+ for (BulkItemResponse bulkItemResponse : bulkResponse) {
+ if (bulkItemResponse.isFailed()) {
+ BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
+ LOG.error("Bulk item with id {} failed", failure.getId(), failure.getCause());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
+ LOG.error("Bulk with id {} thrown an error", executionId, throwable);
+ }
}
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java
index bac172c64d..e1540c1e2e 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterFactory.java
@@ -17,12 +17,13 @@
package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
+import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriterFactory;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.jetbrains.annotations.NotNull;
-public class ElasticsearchIndexWriterFactory implements FulltextIndexWriterFactory {
+class ElasticsearchIndexWriterFactory implements FulltextIndexWriterFactory {
private final ElasticsearchConnection elasticsearchConnection;
ElasticsearchIndexWriterFactory(@NotNull ElasticsearchConnection elasticsearchConnection) {
@@ -31,6 +32,10 @@ public class ElasticsearchIndexWriterFactory implements FulltextIndexWriterFacto
@Override
public ElasticsearchIndexWriter newInstance(IndexDefinition definition, NodeBuilder definitionBuilder, boolean reindex) {
- return new ElasticsearchIndexWriter(definition, elasticsearchConnection);
+ if (!(definition instanceof ElasticsearchIndexDefinition)) {
+ throw new IllegalArgumentException("IndexDefinition must be of type ElasticsearchIndexDefinition " +
+ "instead of " + definition.getClass().getName());
+ }
+ return new ElasticsearchIndexWriter(elasticsearchConnection, (ElasticsearchIndexDefinition) definition);
}
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java
index 4e103a5252..61da4b83a0 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndex.java
@@ -35,9 +35,9 @@ import java.util.WeakHashMap;
import java.util.function.Predicate;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants.TYPE_ELASTICSEARCH;
+import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH;
-public class ElasticsearchIndex extends FulltextIndex {
+class ElasticsearchIndex extends FulltextIndex {
private static final Predicate ELASTICSEARCH_INDEX_DEFINITION_PREDICATE =
state -> TYPE_ELASTICSEARCH.equals(state.getString(TYPE_PROPERTY_NAME));
private static final Map estimators = new WeakHashMap<>();
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java
index 11d589ccf2..e87647e53a 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexNode.java
@@ -18,7 +18,6 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.query;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
-import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor;
import org.apache.jackrabbit.oak.plugins.index.search.IndexNode;
import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -26,16 +25,16 @@ import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-public class ElasticsearchIndexNode implements IndexNode {
+class ElasticsearchIndexNode implements IndexNode {
+ private final ElasticsearchConnection elasticsearchConnection;
private final ElasticsearchIndexDefinition indexDefinition;
- private final ElasticsearchIndexDescriptor indexDescriptor;
- protected ElasticsearchIndexNode(@NotNull NodeState root, @NotNull String indexPath,
+ ElasticsearchIndexNode(@NotNull NodeState root, @NotNull String indexPath,
@NotNull ElasticsearchConnection elasticsearchConnection) {
final NodeState indexNS = NodeStateUtils.getNode(root, indexPath);
this.indexDefinition = new ElasticsearchIndexDefinition(root, indexNS, indexPath);
- this.indexDescriptor = new ElasticsearchIndexDescriptor(elasticsearchConnection, indexDefinition);
+ this.elasticsearchConnection = elasticsearchConnection;
}
@Override
@@ -48,8 +47,8 @@ public class ElasticsearchIndexNode implements IndexNode {
return indexDefinition;
}
- public ElasticsearchIndexDescriptor getIndexDescriptor() {
- return indexDescriptor;
+ public ElasticsearchConnection getConnection() {
+ return elasticsearchConnection;
}
@Override
@@ -61,6 +60,6 @@ public class ElasticsearchIndexNode implements IndexNode {
@Override
public @Nullable IndexStatistics getIndexStatistics() {
- return new ElasticsearchIndexStatistics(indexDescriptor);
+ return new ElasticsearchIndexStatistics(elasticsearchConnection);
}
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
index caf1eef994..a898044ecd 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchIndexStatistics.java
@@ -16,20 +16,21 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elasticsearch.query;
-import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor;
+import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
import org.apache.jackrabbit.oak.plugins.index.search.IndexStatistics;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.index.query.QueryBuilders;
+import org.jetbrains.annotations.NotNull;
import java.io.IOException;
-public class ElasticsearchIndexStatistics implements IndexStatistics {
- private final ElasticsearchIndexDescriptor elasticsearchIndexDescriptor;
+class ElasticsearchIndexStatistics implements IndexStatistics {
+ private final ElasticsearchConnection elasticsearchConnection;
- ElasticsearchIndexStatistics(ElasticsearchIndexDescriptor elasticsearchIndexDescriptor) {
- this.elasticsearchIndexDescriptor = elasticsearchIndexDescriptor;
+ ElasticsearchIndexStatistics(@NotNull ElasticsearchConnection elasticsearchConnection) {
+ this.elasticsearchConnection = elasticsearchConnection;
}
@Override
@@ -37,7 +38,7 @@ public class ElasticsearchIndexStatistics implements IndexStatistics {
CountRequest countRequest = new CountRequest();
countRequest.query(QueryBuilders.matchAllQuery());
try {
- CountResponse count = elasticsearchIndexDescriptor.getClient().count(countRequest, RequestOptions.DEFAULT);
+ CountResponse count = elasticsearchConnection.getClient().count(countRequest, RequestOptions.DEFAULT);
return (int) count.getCount();
} catch (IOException e) {
// ignore failure
@@ -50,7 +51,7 @@ public class ElasticsearchIndexStatistics implements IndexStatistics {
CountRequest countRequest = new CountRequest();
countRequest.query(QueryBuilders.existsQuery(key));
try {
- CountResponse count = elasticsearchIndexDescriptor.getClient().count(countRequest, RequestOptions.DEFAULT);
+ CountResponse count = elasticsearchConnection.getClient().count(countRequest, RequestOptions.DEFAULT);
return (int) count.getCount();
} catch (IOException e) {
// ignore failure
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java
index ddc5a53e98..fb3c5bd98a 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchResultRowIterator.java
@@ -83,7 +83,7 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-public class ElasticsearchResultRowIterator implements Iterator {
+class ElasticsearchResultRowIterator implements Iterator {
private static final Logger LOG = LoggerFactory
.getLogger(ElasticsearchResultRowIterator.class);
private static final PerfLogger PERF_LOGGER =
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
index c69e954b4e..cb05399d3a 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/query/ElasticsearchSearcher.java
@@ -16,7 +16,6 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elasticsearch.query;
-import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDescriptor;
import org.apache.jackrabbit.oak.plugins.index.search.FieldNames;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@@ -27,11 +26,11 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
-public class ElasticsearchSearcher {
- private final ElasticsearchIndexDescriptor indexDescriptor;
+class ElasticsearchSearcher {
+ private final ElasticsearchIndexNode indexNode;
ElasticsearchSearcher(@NotNull ElasticsearchIndexNode indexNode) {
- indexDescriptor = indexNode.getIndexDescriptor();
+ this.indexNode = indexNode;
}
public SearchResponse search(QueryBuilder query, int batchSize) throws IOException {
@@ -41,9 +40,9 @@ public class ElasticsearchSearcher {
.storedField(FieldNames.PATH)
.size(batchSize);
- SearchRequest request = new SearchRequest(indexDescriptor.getIndexName())
+ SearchRequest request = new SearchRequest(indexNode.getDefinition().getRemoteIndexName())
.source(searchSourceBuilder);
- return indexDescriptor.getClient().search(request, RequestOptions.DEFAULT);
+ return indexNode.getConnection().getClient().search(request, RequestOptions.DEFAULT);
}
}
diff --git a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java
index d41b766fb3..389dc1c653 100644
--- a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java
+++ b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/util/ElasticsearchIndexDefinitionBuilder.java
@@ -18,8 +18,11 @@ package org.apache.jackrabbit.oak.plugins.index.elasticsearch.util;
import org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
-import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexConstants.TYPE_ELASTICSEARCH;
+import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.TYPE_ELASTICSEARCH;
+/**
+ * Utility class to create Elasticsearch Index Definitions along with the relevant node structure.
+ */
public class ElasticsearchIndexDefinitionBuilder extends IndexDefinitionBuilder {
@Override
protected String getIndexType() {
diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
index d72ff7a54e..8145e8db5b 100644
--- a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
+++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java
@@ -35,7 +35,6 @@ import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.elasticsearch.Version;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -48,6 +47,7 @@ import java.util.Arrays;
import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT;
import static org.apache.jackrabbit.oak.plugins.index.search.FulltextIndexConstants.PROPDEF_PROP_NODE_NAME;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -118,12 +118,15 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest {
root.commit();
String propaQuery = "select [jcr:path] from [nt:base] where [propa] = 'foo'";
+
+ assertEventually(() -> {
assertThat(explain(propaQuery), containsString("elasticsearch:test1"));
assertThat(explain("select [jcr:path] from [nt:base] where [propc] = 'foo'"), containsString("elasticsearch:test2"));
assertQuery(propaQuery, Arrays.asList("/test/a", "/test/b"));
assertQuery("select [jcr:path] from [nt:base] where [propa] = 'foo2'", singletonList("/test/c"));
assertQuery("select [jcr:path] from [nt:base] where [propc] = 'foo'", singletonList("/test/d"));
+ });
}
//OAK-3825
@@ -148,10 +151,13 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest {
String queryPrefix = "select [jcr:path] from [nt:base] where ISDESCENDANTNODE('/test') AND ";
//test
String propabQuery = queryPrefix + "LOCALNAME() = 'foo'";
+
+ assertEventually(() -> {
String explanation = explain(propabQuery);
- Assert.assertThat(explanation, containsString("elasticsearch:test1(/oak:index/test1) "));
- Assert.assertThat(explanation, containsString("{\"term\":{\":nodeName\":{\"value\":\"foo\","));
+ assertThat(explanation, containsString("elasticsearch:test1(/oak:index/test1) "));
+ assertThat(explanation, containsString("{\"term\":{\":nodeName\":{\"value\":\"foo\","));
assertQuery(propabQuery, singletonList("/test/foo"));
+
assertQuery(queryPrefix + "LOCALNAME() = 'bar'", singletonList("/test/sc/bar"));
assertQuery(queryPrefix + "LOCALNAME() LIKE 'foo'", singletonList("/test/foo"));
assertQuery(queryPrefix + "LOCALNAME() LIKE 'camel%'", singletonList("/test/camelCase"));
@@ -159,6 +165,7 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest {
assertQuery(queryPrefix + "NAME() = 'bar'", singletonList("/test/sc/bar"));
assertQuery(queryPrefix + "NAME() LIKE 'foo'", singletonList("/test/foo"));
assertQuery(queryPrefix + "NAME() LIKE 'camel%'", singletonList("/test/camelCase"));
+ });
}
@Test
@@ -171,7 +178,8 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest {
test.addChild("b");
root.commit();
- Assert.assertThat(explain("select [jcr:path] from [nt:base] where [propa] = 'foo'"), containsString("elasticsearch:test1"));
+ assertEventually(() -> assertThat(explain("select [jcr:path] from [nt:base] where [propa] = 'foo'"),
+ containsString("elasticsearch:test1")));
}
@Test
@@ -184,7 +192,8 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest {
test.addChild("c").setProperty("propb", "e");
root.commit();
- assertQuery("select [jcr:path] from [nt:base] where propa is not null", Arrays.asList("/test/a", "/test/b"));
+ assertEventually(() -> assertQuery("select [jcr:path] from [nt:base] where propa is not null",
+ Arrays.asList("/test/a", "/test/b")));
}
private static IndexDefinitionBuilder createIndex(String... propNames) {
@@ -204,4 +213,36 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest {
String explain = "explain " + query;
return executeQuery(explain, "JCR-SQL2").get(0);
}
+
+ private static void assertEventually(Runnable r) {
+ assertEventually(r, BULK_FLUSH_INTERVAL_MS_DEFAULT * 3);
+ }
+
+ private static void assertEventually(Runnable r, long timeoutMillis) {
+ final long start = System.currentTimeMillis();
+ long lastAttempt = 0;
+ int attempts = 0;
+
+ while (true) {
+ try {
+ attempts++;
+ lastAttempt = System.currentTimeMillis();
+ r.run();
+ return;
+ } catch (Throwable e) {
+ long elapsedTime = lastAttempt - start;
+ if (elapsedTime >= timeoutMillis) {
+ String msg = String.format("Condition not satisfied after %1.2f seconds and %d attempts",
+ elapsedTime / 1000d, attempts);
+ throw new AssertionError(msg, e);
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+
+ }
+ }
+ }
}
diff --git a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
new file mode 100644
index 0000000000..bab1a211e6
--- /dev/null
+++ b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/index/ElasticsearchIndexWriterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elasticsearch.index;
+
+import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchConnection;
+import org.apache.jackrabbit.oak.plugins.index.elasticsearch.ElasticsearchIndexDefinition;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchIndexWriterTest {
+
+ @Mock
+ private ElasticsearchConnection elasticsearchConnectionMock;
+
+ @Mock
+ private ElasticsearchIndexDefinition indexDefinitionMock;
+
+ @Mock
+ private BulkProcessor bulkProcessorMock;
+
+ private ElasticsearchIndexWriter indexWriter;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(indexDefinitionMock.getRemoteIndexName()).thenReturn("test-index");
+ indexWriter = new ElasticsearchIndexWriter(elasticsearchConnectionMock, indexDefinitionMock, bulkProcessorMock);
+ }
+
+ @Test
+ public void singleUpdateDocument() throws IOException {
+ indexWriter.updateDocument("/foo", new ElasticsearchDocument("/foo"));
+
+ ArgumentCaptor acIndexRequest = ArgumentCaptor.forClass(IndexRequest.class);
+ verify(bulkProcessorMock).add(acIndexRequest.capture());
+
+ IndexRequest request = acIndexRequest.getValue();
+ assertEquals(request.index(), "test-index");
+ assertEquals(request.id(), URLEncoder.encode("/foo", "UTF-8"));
+ }
+
+ @Test
+ public void singleDeleteDocument() throws IOException {
+ indexWriter.deleteDocuments("/bar");
+
+ ArgumentCaptor acDeleteRequest = ArgumentCaptor.forClass(DeleteRequest.class);
+ verify(bulkProcessorMock).add(acDeleteRequest.capture());
+
+ DeleteRequest request = acDeleteRequest.getValue();
+ assertEquals(request.index(), "test-index");
+ assertEquals(request.id(), URLEncoder.encode("/bar", "UTF-8"));
+ }
+
+ @Test
+ public void multiRequests() throws IOException {
+ indexWriter.updateDocument("/foo", new ElasticsearchDocument("/foo"));
+ indexWriter.updateDocument("/bar", new ElasticsearchDocument("/bar"));
+ indexWriter.deleteDocuments("/foo");
+ indexWriter.deleteDocuments("/bar");
+
+ ArgumentCaptor acIndexRequest = ArgumentCaptor.forClass(IndexRequest.class);
+ verify(bulkProcessorMock, times(2)).add(acIndexRequest.capture());
+ ArgumentCaptor acDeleteRequest = ArgumentCaptor.forClass(DeleteRequest.class);
+ verify(bulkProcessorMock, times(2)).add(acDeleteRequest.capture());
+ }
+
+}
diff --git a/oak-search-elastic/src/test/resources/logback-test.xml b/oak-search-elastic/src/test/resources/logback-test.xml
new file mode 100644
index 0000000000..e92401c93e
--- /dev/null
+++ b/oak-search-elastic/src/test/resources/logback-test.xml
@@ -0,0 +1,39 @@
+
+
+
+
+
+ %date{HH:mm:ss.SSS} %-5level %-40([%thread] %F:%L) %msg%n
+
+
+
+
+ target/unit-tests.log
+
+ %date{HH:mm:ss.SSS} %-5level %-40([%thread] %F:%L) %msg%n
+
+
+
+
+
+
+
+
+