Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20224

add username and password to provide a credential for es rest client

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Minor
    • Resolution: Duplicate
    • 1.11.0
    • None
    • None
    • Patch

    Description

      hello,

      Flink ElasticSearch Connector use Java High Level REST Client to process request for index, delete, get, update, etc.  but some ES clusters (version 6 and higher) require security credentials to connect, So it can be considered to add username and password option to build security credentials, then  connect to this ES cluster.

      for example:

      //代码占位符
      org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6UpsertTableSink
      
      @Override
      protected SinkFunction<Tuple2<Boolean, Row>> createSinkFunction(
            List<Host> hosts,
            ActionRequestFailureHandler failureHandler,
            Map<SinkOption, String> sinkOptions,
            ElasticsearchUpsertSinkFunction upsertSinkFunction) {
        ......
        builder.setRestClientFactory(
         new DefaultRestClientFactory(
            Optional.ofNullable(sinkOptions.get(REST_MAX_RETRY_TIMEOUT))
               .map(Integer::valueOf)
               .orElse(null),
            sinkOptions.get(REST_PATH_PREFIX),
            sinkOptions.get(USERNAME),
            sinkOptions.get(PASSWORD)));
        ......
      }
      
      @VisibleForTesting
      static class DefaultRestClientFactory implements RestClientFactory {
      
         private Integer maxRetryTimeout;
         private String pathPrefix;
         private String username;
         private String password;
      
         public DefaultRestClientFactory(@Nullable Integer maxRetryTimeout, @Nullable String pathPrefix,@Nullable String username, @Nullable String password) {
            this.maxRetryTimeout = maxRetryTimeout;
            this.pathPrefix = pathPrefix;
            this.username = username;
            this.password = password;
         }
      
         @Override
         public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
            if (maxRetryTimeout != null) {
               restClientBuilder.setMaxRetryTimeoutMillis(maxRetryTimeout);
            }
            if (pathPrefix != null) {
               restClientBuilder.setPathPrefix(pathPrefix);
            }
            // build credentialsProvider
            if (username != null && password != null) {
               final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
               credentialsProvider.setCredentials(AuthScope.ANY,
                  new UsernamePasswordCredentials(username, password));
               restClientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
                  httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
               );
            }
         }
      
         @Override
         public boolean equals(Object o) {
            if (this == o) {
               return true;
            }
            if (o == null || getClass() != o.getClass()) {
               return false;
            }
            DefaultRestClientFactory that = (DefaultRestClientFactory) o;
            return Objects.equals(maxRetryTimeout, that.maxRetryTimeout) &&
               Objects.equals(pathPrefix, that.pathPrefix) &&
               Objects.equals(username, that.username) &&
               Objects.equals(password, that.password);
         }
      
         @Override
         public int hashCode() {
            return Objects.hash(
               maxRetryTimeout,
               pathPrefix,
               username,
               password);
         }
      }

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jiang7chengzitc Ruguo Yu
              Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: