Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25756

Dedicated Opensearch connectors

    XMLWordPrintableJSON

Details

    Description

      Since the time Opensearch got forked from Elasticsearch a few things got changed. The projects evolve in different directions, the Elasticsearch clients up to 7.13.x were able to connect to Opensearch clusters, but since 7.14 - not anymore [1] (Elastic continues to harden their clients to connect to Elasticsearch clusters only).

      For example, running current Flink master against Opensearch clusters using Elalsticsearch 7 connectors would fail with:

       

       Caused by: ElasticsearchException[Elasticsearch version 6 or more is required]
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2056)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2043)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.notifyListenerDirectly(ListenableFuture.java:113)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.done(ListenableFuture.java:100)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.BaseFuture.set(BaseFuture.java:133)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.onResponse(ListenableFuture.java:139)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$5.onSuccess(RestHighLevelClient.java:2129)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:636)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:376)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:370)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)

       

      With the compatibility mode [2] turned on, still fails further the line:

      Caused by: ElasticsearchException[Invalid or missing tagline [The OpenSearch Project: https://opensearch.org/]]
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2056)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$4.onResponse(RestHighLevelClient.java:2043)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.notifyListenerDirectly(ListenableFuture.java:113)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.done(ListenableFuture.java:100)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.BaseFuture.set(BaseFuture.java:133)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.util.concurrent.ListenableFuture.onResponse(ListenableFuture.java:139)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient$5.onSuccess(RestHighLevelClient.java:2129)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:636)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:376)
       at org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient$1.completed(RestClient.java:370)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
       at org.apache.flink.elasticsearch7.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted 

       

      Arguably, the best way to proceed in this situation is to provide dedicated Opensearch connectors, on par with Elasticsearch ones.

      Me (reta) and Sergey Nuyanzin would be more than happy to make the contribution (and maintain it as well), if there are no objections from committers / PMC members.

      [1] https://github.com/elastic/elasticsearch/pull/73910

      [2] https://opensearch.org/docs/latest/upgrade-to/upgrade-to/#upgrade-elasticsearch-oss

      Attachments

        Activity

          People

            reta Andriy Redko
            reta Andriy Redko
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: