Details
-
Improvement
-
Status: Closed
-
Minor
-
Resolution: Duplicate
-
1.11.0
-
None
-
None
-
Patch
Description
hello,
Flink ElasticSearch Connector use Java High Level REST Client to process request for index, delete, get, update, etc. but some ES clusters (version 6 and higher) require security credentials to connect, So it can be considered to add username and password option to build security credentials, then connect to this ES cluster.
for example:
//代码占位符 org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink @Override protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction( List<Host> hosts, ActionRequestFailureHandler failureHandler, Map<SinkOption, String> sinkOptions, ElasticsearchUpsertSinkFunction upsertSinkFunction) { ...... builder.setRestClientFactory( new DefaultRestClientFactory( Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT)) .map(Integer::valueOf) .orElse(null), sinkOptions.get(REST_PATH_PREFIX), sinkOptions.get(USERNAME), sinkOptions.get(PASSWORD))); ...... } @VisibleForTesting static class DefaultRestClientFactory implements RestClientFactory { private Integer maxRetryTimeout; private String pathPrefix; private String username; private String password; public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix,@Nullable String username, @Nullable String password) { this.maxRetryTimeout = maxRetryTimeout; this.pathPrefix = pathPrefix; this.username = username; this.password = password; } @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { if (maxRetryTimeout != null) { restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout); } if (pathPrefix != null) { restClientBuilder.setPathPrefix(pathPrefix); } // build credentialsProvider if (username != null && password != null) { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) ); } } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } DefaultRestClientFactory that = (DefaultRestClientFactory) o; return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) && Objects.equals(pathPrefix, that.pathPrefix) && Objects.equals(username, that.username) && Objects.equals(password, that.password); } @Override public int hashCode() { return Objects.hash( maxRetryTimeout, pathPrefix, username, password); } }
Attachments
Issue Links
- is duplicated by
-
FLINK-18361 Support username and password options for new Elasticsearch connector
- Closed