diff --git oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticBenchmarkRunner.java oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticBenchmarkRunner.java index 2fea1c9413..29855a589d 100644 --- oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticBenchmarkRunner.java +++ oak-benchmarks-elastic/src/main/java/org/apache/jackrabbit/oak/benchmark/ElasticBenchmarkRunner.java @@ -24,7 +24,7 @@ import java.util.Arrays; public class ElasticBenchmarkRunner extends BenchmarkRunner { - private static ElasticsearchConnection coordinate; + private static ElasticsearchConnection connection; public static void main(String[] args) throws Exception { initOptionSet(args); @@ -36,8 +36,17 @@ public class ElasticBenchmarkRunner extends BenchmarkRunner { // we have orphaned HttpClient's I/O disp threads that don't let the process exit. try { - coordinate = new ElasticsearchConnection(benchmarkOptions.getElasticScheme().value(options), - benchmarkOptions.getElasticHost().value(options), benchmarkOptions.getElasticPort().value(options), "Benchmark"); + connection = ElasticsearchConnection.newBuilder() + .withIndexPrefix("benchmark") + .withConnectionParameters( + benchmarkOptions.getElasticScheme().value(options), + benchmarkOptions.getElasticHost().value(options), + benchmarkOptions.getElasticPort().value(options) + ) + .withApiKeys( + benchmarkOptions.getElasticApiKeyId().value(options), + benchmarkOptions.getElasticApiKeySecret().value(options) + ).build(); BenchmarkRunner.addToBenchMarkList( Arrays.asList( @@ -45,32 +54,32 @@ public class ElasticBenchmarkRunner extends BenchmarkRunner { benchmarkOptions.getFlatStructure().value(options), benchmarkOptions.getReport().value(options), benchmarkOptions.getWithStorage().value(options), - coordinate), + connection), new ElasticPropertyFTIndexedContentAvailability(benchmarkOptions.getWikipedia().value(options), benchmarkOptions.getFlatStructure().value(options), benchmarkOptions.getReport().value(options), benchmarkOptions.getWithStorage().value(options), - coordinate), + connection), new ElasticPropertyFTSeparatedIndexedContentAvailability(benchmarkOptions.getWikipedia().value(options), benchmarkOptions.getFlatStructure().value(options), benchmarkOptions.getReport().value(options), benchmarkOptions.getWithStorage().value(options), - coordinate), + connection), new ElasticFullTextWithoutGlobalIndexSearchTest(benchmarkOptions.getWikipedia().value(options), benchmarkOptions.getFlatStructure().value(options), benchmarkOptions.getReport().value(options), benchmarkOptions.getWithStorage().value(options), - coordinate), + connection), new ElasticPropertyTextSearchTest(benchmarkOptions.getWikipedia().value(options), benchmarkOptions.getFlatStructure().value(options), benchmarkOptions.getReport().value(options), benchmarkOptions.getWithStorage().value(options), - coordinate) + connection) ) ); BenchmarkRunner.main(args); } finally { - coordinate.close(); + connection.close(); } } diff --git oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkOptions.java oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkOptions.java index 89cd84decc..e5f96fc0fc 100644 --- oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkOptions.java +++ oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkOptions.java @@ -97,6 +97,16 @@ public class BenchmarkOptions { private final OptionSpec elasticHost; private final OptionSpec elasticScheme; private final OptionSpec elasticPort; + private final OptionSpec elasticApiKeyId; + private final OptionSpec elasticApiKeySecret; + + public OptionSpec getElasticApiKeyId() { + return elasticApiKeyId; + } + + public OptionSpec getElasticApiKeySecret() { + return elasticApiKeySecret; + } public OptionSpec getElasticHost() { return elasticHost; @@ -502,6 +512,10 @@ public class BenchmarkOptions { .ofType(String.class); elasticPort = parser.accepts("elasticPort", "Elastic scheme").withOptionalArg() .ofType(Integer.class); + elasticApiKeyId = parser.accepts("elasticApiKeyId", "Elastic unique id of the API key").withOptionalArg() + .ofType(String.class); + elasticApiKeySecret = parser.accepts("elasticApiKeySecret", "Elastic generated API secret").withOptionalArg() + .ofType(String.class); } } diff --git oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnection.java oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnection.java index a698ad7a9a..27c2461836 100644 --- oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnection.java +++ oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnection.java @@ -16,55 +16,78 @@ */ package org.apache.jackrabbit.oak.plugins.index.elasticsearch; +import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.message.BasicHeader; import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; +import org.jetbrains.annotations.NotNull; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; /** * This class represents an Elasticsearch Connection with the related RestHighLevelClient. * As per Elasticsearch documentation: the client is thread-safe, there should be one instance per application and it * must be closed when it is not needed anymore. * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_changing_the_client_8217_s_initialization_code.html - * + *

* The getClient() initializes the rest client on the first call. * Once close() is invoked this instance cannot be used anymore. */ public class ElasticsearchConnection implements Closeable { - static final String SCHEME_PROP = "elasticsearch.scheme"; - static final String DEFAULT_SCHEME = "http"; - static final String HOST_PROP = "elasticsearch.host"; - static final String DEFAULT_HOST = "127.0.0.1"; - static final String PORT_PROP = "elasticsearch.port"; - static final int DEFAULT_PORT = 9200; - - static final Supplier defaultConnection = () -> - new ElasticsearchConnection(DEFAULT_SCHEME, DEFAULT_HOST, DEFAULT_PORT, "elastic"); + protected static final String SCHEME_PROP = "elasticsearch.scheme"; + protected static final String DEFAULT_SCHEME = "http"; + protected static final String HOST_PROP = "elasticsearch.host"; + protected static final String DEFAULT_HOST = "127.0.0.1"; + protected static final String PORT_PROP = "elasticsearch.port"; + protected static final int DEFAULT_PORT = 9200; + protected static final String API_KEY_ID_PROP = "elasticsearch.apiKeyId"; + protected static final String DEFAULT_API_KEY_ID = ""; + protected static final String API_KEY_SECRET_PROP = "elasticsearch.apiKeySecret"; + protected static final String DEFAULT_API_KEY_SECRET = ""; - private String scheme; - private String host; - private int port; private final String indexPrefix; + private final String scheme; + private final String host; + private final int port; + + // API key credentials + private final String apiKeyId; + private final String apiKeySecret; private volatile RestHighLevelClient client; - private AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicBoolean isClosed = new AtomicBoolean(false); - public ElasticsearchConnection(String scheme, String host, Integer port, String indexPrefix) { - Objects.requireNonNull(scheme, "Scheme is null"); - Objects.requireNonNull(host, "Host is null"); - Objects.requireNonNull(port, "Port is null"); - Objects.requireNonNull(indexPrefix, "Index prefix is null"); + /** + * Creates an {@code ElasticsearchConnection} instance with the given scheme, host address and port that support API + * key-based authentication. + * + * @param indexPrefix the prefix to be used for index creation + * @param scheme the name {@code HttpHost.scheme} name + * @param host the hostname (IP or DNS name) + * @param port the Elasticsearch port for incoming HTTP requests (transport client not supported) + * @param apiKeyId the unique id of the API key + * @param apiKeySecret the generated API secret + * @see + * Elasticsearch Security API Keys + * + */ + private ElasticsearchConnection(@NotNull String indexPrefix, @NotNull String scheme, @NotNull String host, + @NotNull Integer port, String apiKeyId, String apiKeySecret) { + this.indexPrefix = indexPrefix; this.scheme = scheme; this.host = host; this.port = port; - this.indexPrefix = indexPrefix; + this.apiKeyId = apiKeyId; + this.apiKeySecret = apiKeySecret; } public RestHighLevelClient getClient() { @@ -76,23 +99,20 @@ public class ElasticsearchConnection implements Closeable { if (client == null) { synchronized (this) { if (client == null) { - client = new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, scheme))); - } + RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, scheme)); + if (apiKeyId != null && !apiKeyId.isEmpty() && + apiKeySecret != null && !apiKeySecret.isEmpty()) { + String apiKeyAuth = Base64.getEncoder().encodeToString( + (apiKeyId + ":" + apiKeySecret).getBytes(StandardCharsets.UTF_8) + ); + Header[] headers = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)}; + builder.setDefaultHeaders(headers); } + client = new RestHighLevelClient(builder); } - return client; - } - - public String getScheme() { - return scheme; } - - public String getHost() { - return host; } - - public int getPort() { - return port; + return client; } public String getIndexPrefix() { @@ -119,12 +139,112 @@ public class ElasticsearchConnection implements Closeable { @Override public int hashCode() { - return Objects.hash(getScheme(), getHost(), getPort()); + return Objects.hash(indexPrefix, scheme, host, port); } @Override public String toString() { - return getScheme() + "://" + getHost() + ":" + getPort(); + return scheme + "://" + host + ":" + port + "/" + indexPrefix; + } + + /** + * Returns a new {@code Builder.IndexPrefixStep} instance to allow a step by step construction of a + * {@link ElasticsearchConnection} object. + */ + public static Builder.IndexPrefixStep newBuilder() { + return new Builder.Steps(); + } + + // Step Builder pattern + // https://github.com/iluwatar/java-design-patterns/tree/master/step-builder + public static final class Builder { + + private Builder() { + } + + /** + * First Builder Step in charge of the mandatory indexPrefix. Next Step: {@link BasicConnectionStep}. + */ + public interface IndexPrefixStep { + BasicConnectionStep withIndexPrefix(String indexPrefix); + } + + /** + * Step in charge of handling connection parameters (with default option). Next step: {@link BuildStep}. + */ + public interface BasicConnectionStep { + BuildStep withConnectionParameters( + @NotNull String scheme, + @NotNull String host, + @NotNull Integer port + ); + + BuildStep withDefaultConnectionParameters(); + } + + /** + * Step in charge of optional steps. Next step: {@link BuildStep}. + */ + public interface OptionalSteps { + BuildStep withApiKeys(String id, String secret); + } + + /** + * This is the final step in charge of building the {@link ElasticsearchConnection}. + * Validation should be here. + * + * It adds support for {@link OptionalSteps}. + */ + public interface BuildStep extends OptionalSteps { + ElasticsearchConnection build(); + } + + private static class Steps implements IndexPrefixStep, BasicConnectionStep, OptionalSteps, BuildStep { + + private String indexPrefix; + + private String scheme; + private String host; + private Integer port; + + private String apiKeyId; + private String apiKeySecret; + + @Override + public BasicConnectionStep withIndexPrefix(@NotNull String indexPrefix) { + this.indexPrefix = indexPrefix; + return this; } + @Override + public BuildStep withConnectionParameters(@NotNull String scheme, @NotNull String host, @NotNull Integer port) { + this.scheme = scheme; + this.host = host; + this.port = port; + return this; + } + + @Override + public BuildStep withDefaultConnectionParameters() { + return withConnectionParameters(ElasticsearchConnection.DEFAULT_SCHEME, ElasticsearchConnection.DEFAULT_HOST, ElasticsearchConnection.DEFAULT_PORT); + } + + @Override + public BuildStep withApiKeys(@NotNull String id, @NotNull String secret) { + this.apiKeyId = id; + this.apiKeySecret = secret; + return this; + } + + @Override + public ElasticsearchConnection build() { + return new ElasticsearchConnection( + Objects.requireNonNull(indexPrefix, "indexPrefix must be not null"), + Objects.requireNonNull(scheme, "scheme must be not null"), + Objects.requireNonNull(host, "host must be not null"), + Objects.requireNonNull(port, "port must be not null"), + apiKeyId, apiKeySecret); + } + } + } } diff --git oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java index feab286e64..a77f8e93b8 100644 --- oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java +++ oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchIndexProviderService.java @@ -39,7 +39,6 @@ import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider; import org.apache.jackrabbit.oak.spi.whiteboard.Registration; import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; import org.apache.jackrabbit.oak.stats.StatisticsProvider; -import org.jetbrains.annotations.NotNull; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import org.slf4j.Logger; @@ -51,7 +50,6 @@ import java.util.Dictionary; import java.util.Hashtable; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.commons.io.FileUtils.ONE_MB; import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; @@ -115,6 +113,18 @@ public class ElasticsearchIndexProviderService { ) private static final String PROP_ELASTICSEARCH_PORT = ElasticsearchConnection.PORT_PROP; + @Property( + label = "Elasticsearch API key ID", + value = ElasticsearchConnection.DEFAULT_API_KEY_ID + ) + private static final String PROP_ELASTICSEARCH_API_KEY_ID = ElasticsearchConnection.API_KEY_ID_PROP; + + @Property( + label = "Elasticsearch API key secret", + passwordValue = ElasticsearchConnection.DEFAULT_API_KEY_SECRET + ) + private static final String PROP_ELASTICSEARCH_API_KEY_SECRET = ElasticsearchConnection.API_KEY_SECRET_PROP; + @Property( label = "Local text extraction cache path", description = "Local file system path where text extraction cache stores/load entries to recover from timed out operation" @@ -141,13 +151,15 @@ public class ElasticsearchIndexProviderService { private ElasticsearchConnection elasticsearchConnection; @Activate - private void activate(BundleContext bundleContext, Map config) { + private void activate(BundleContext bundleContext, Map config) { whiteboard = new OsgiWhiteboard(bundleContext); //initializeTextExtractionDir(bundleContext, config); //initializeExtractedTextCache(config, statisticsProvider); - elasticsearchConnection = getElasticsearchCoordinate(config); + elasticsearchConnection = getElasticsearchConnection(config); + + LOG.info("Registering Index and Editor providers with connection {}", elasticsearchConnection); registerIndexProvider(bundleContext); registerIndexEditor(bundleContext); @@ -249,35 +261,27 @@ public class ElasticsearchIndexProviderService { } } - private ElasticsearchConnection getElasticsearchCoordinate(Map contextConfig) { - // system properties have priority - ElasticsearchConnection connection = build(System.getProperties().entrySet() - .stream() - .collect(Collectors.toMap( - e -> String.valueOf(e.getKey()), - e -> String.valueOf(e.getValue())) - ) - ); - - if (connection == null) { - connection = build(contextConfig); - } - - return connection != null ? connection : ElasticsearchConnection.defaultConnection.get(); - } - - private ElasticsearchConnection build(@NotNull Map config) { - ElasticsearchConnection coordinate = null; - Object p = config.get(PROP_ELASTICSEARCH_PORT); - if (p != null) { - try { - Integer port = Integer.parseInt(p.toString()); - coordinate = new ElasticsearchConnection((String) config.get(PROP_ELASTICSEARCH_SCHEME), - (String) config.get(PROP_ELASTICSEARCH_HOST), port, (String) config.get(PROP_INDEX_PREFIX)); - } catch (NumberFormatException nfe) { - LOG.warn("{} value ({}) cannot be parsed to a valid number", PROP_ELASTICSEARCH_PORT, p); - } - } - return coordinate; + private ElasticsearchConnection getElasticsearchConnection(Map contextConfig) { + // system properties have priority, get mandatory params first + final String indexPrefix = System.getProperty(PROP_INDEX_PREFIX, + (String) contextConfig.getOrDefault(PROP_INDEX_PREFIX, "oak-elastic")); + final String scheme = System.getProperty(PROP_ELASTICSEARCH_SCHEME, + (String) contextConfig.getOrDefault(PROP_ELASTICSEARCH_SCHEME, ElasticsearchConnection.DEFAULT_SCHEME)); + final String host = System.getProperty(PROP_ELASTICSEARCH_HOST, + (String) contextConfig.getOrDefault(PROP_ELASTICSEARCH_HOST, ElasticsearchConnection.DEFAULT_HOST)); + final int port = Integer.getInteger(PROP_ELASTICSEARCH_PORT, + (int) contextConfig.getOrDefault(PROP_ELASTICSEARCH_PORT, ElasticsearchConnection.DEFAULT_PORT)); + + // optional params + final String apiKeyId = System.getProperty(PROP_ELASTICSEARCH_API_KEY_ID, + (String) contextConfig.get(PROP_ELASTICSEARCH_API_KEY_ID)); + final String apiSecretId = System.getProperty(PROP_ELASTICSEARCH_API_KEY_SECRET, + (String) contextConfig.get(PROP_ELASTICSEARCH_API_KEY_SECRET)); + + return ElasticsearchConnection.newBuilder() + .withIndexPrefix(indexPrefix) + .withConnectionParameters(scheme, host, port) + .withApiKeys(apiKeyId, apiSecretId) + .build(); } } diff --git oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticIndexAggregationNtFileTest.java oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticIndexAggregationNtFileTest.java index 8d7f2148f0..4399e4d8de 100644 --- oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticIndexAggregationNtFileTest.java +++ oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticIndexAggregationNtFileTest.java @@ -93,15 +93,17 @@ public class ElasticIndexAggregationNtFileTest extends AbstractQueryTest { @Override protected ContentRepository createRepository() { - ElasticsearchConnection coordinate = new ElasticsearchConnection( + ElasticsearchConnection connection = ElasticsearchConnection.newBuilder() + .withIndexPrefix("" + System.nanoTime()) + .withConnectionParameters( ElasticsearchConnection.DEFAULT_SCHEME, elastic.getContainerIpAddress(), - elastic.getMappedPort(ElasticsearchConnection.DEFAULT_PORT), - "" + System.nanoTime()); + elastic.getMappedPort(ElasticsearchConnection.DEFAULT_PORT) + ).build(); - ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(coordinate, + ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(connection, new ExtractedTextCache(10 * FileUtils.ONE_MB, 100)); - ElasticsearchIndexProvider provider = new ElasticsearchIndexProvider(coordinate); + ElasticsearchIndexProvider provider = new ElasticsearchIndexProvider(connection); return new Oak() .with(new InitialContent() { diff --git oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnectionTest.java oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnectionTest.java index 22badb0ca2..2078079a4e 100644 --- oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnectionTest.java +++ oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchConnectionTest.java @@ -27,7 +27,10 @@ public class ElasticsearchConnectionTest { @Test public void uniqueClient() throws IOException { - ElasticsearchConnection connection = ElasticsearchConnection.defaultConnection.get(); + ElasticsearchConnection connection = ElasticsearchConnection.newBuilder() + .withIndexPrefix("test") + .withDefaultConnectionParameters() + .build(); RestHighLevelClient client1 = connection.getClient(); RestHighLevelClient client2 = connection.getClient(); @@ -39,7 +42,11 @@ public class ElasticsearchConnectionTest { @Test(expected = IllegalStateException.class) public void alreadyClosedConnection() throws IOException { - ElasticsearchConnection connection = ElasticsearchConnection.defaultConnection.get(); + ElasticsearchConnection connection = ElasticsearchConnection.newBuilder() + .withIndexPrefix("test") + .withDefaultConnectionParameters() + .build(); + connection.close(); connection.getClient(); diff --git oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java index 6b49ee1911..592456934c 100644 --- oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java +++ oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elasticsearch/ElasticsearchPropertyIndexTest.java @@ -74,15 +74,16 @@ public class ElasticsearchPropertyIndexTest extends AbstractQueryTest { @Override protected ContentRepository createRepository() { - ElasticsearchConnection coordinate = new ElasticsearchConnection( + ElasticsearchConnection connection = ElasticsearchConnection.newBuilder() + .withIndexPrefix("" + System.nanoTime()) + .withConnectionParameters( ElasticsearchConnection.DEFAULT_SCHEME, elastic.getContainerIpAddress(), - elastic.getMappedPort(ElasticsearchConnection.DEFAULT_PORT), - "" + System.currentTimeMillis() - ); - ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(coordinate, + elastic.getMappedPort(ElasticsearchConnection.DEFAULT_PORT) + ).build(); + ElasticsearchIndexEditorProvider editorProvider = new ElasticsearchIndexEditorProvider(connection, new ExtractedTextCache(10 * FileUtils.ONE_MB, 100)); - ElasticsearchIndexProvider indexProvider = new ElasticsearchIndexProvider(coordinate); + ElasticsearchIndexProvider indexProvider = new ElasticsearchIndexProvider(connection); // remove all indexes to avoid cost competition (essentially a TODO for fixing cost ES cost estimation) NodeBuilder builder = InitialContentHelper.INITIAL_CONTENT.builder();