Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5487

Proper at-least-once support for ElasticsearchSink

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Streaming Connectors
    • Labels:
      None

      Description

      Discussion in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html

      Currently, the Elasticsearch Sink actually doesn't offer any guarantees for message delivery.

      For proper support of at-least-once, the sink will need to participate in Flink's checkpointing: when snapshotting is triggered at the ElasticsearchSink, we need to synchronize on the pending ES requests by flushing the internal bulk processor. For temporary ES failures (see FLINK-5122) that may happen on the flush, we should retry them before returning from snapshotting and acking the checkpoint. If there are non-temporary ES failures on the flush, the current snapshot should fail.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          We should block this issue until FLINK-4988, which includes restructuring for the ES connectors as part of the fix, is resolved so at-least-once support can be simultaneously included in all ES versions.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited We should block this issue until FLINK-4988 , which includes restructuring for the ES connectors as part of the fix, is resolved so at-least-once support can be simultaneously included in all ES versions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3358

          FLINK-5487 [elasticsearch] At-least-once ElasticsearchSink

          This PR adds proper support for an at-least-once `ElasticsearchSink`. This is based on the pluggable error handling strategy functionality added in #3426, so only the last commit is relevant.

          Like the Kafka producer, the way it works is that pending requests not yet acknowledged by Elasticsearch needs to be flushed before proceeding with the next record from upstream.
          Slight difference is that for the `ElasticsearchSink`, since we're allowing re-adding failed requests back to the internal `BulkProcessor` (as part of #3426), we'll also need to wait for the re-added requests. The docs warn that if requests are re-added, it may lead to longer checkpoints since we need to wait for those too.

          Flushing is enabled by default, but we provide a `disableFlushOnCheckpoint` method to switch it off. The docs and Javadoc of the method warns the user how this would affect at-least-once delivery.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-5487

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3358.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3358


          commit 6a826b8eb7a98e3d159999bc44d827df54c94fdd
          Author: Max Kuklinski <max.kuklinski@live.de>
          Date: 2016-11-23T16:54:11Z

          FLINK-5122 [elasticsearch] Retry temporary Elasticsearch request errors.

          Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full

          commit 9cb60c263fb0df9a8ccd82b33070e22085b5ab23
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-01-30T05:55:26Z

          FLINK-5353 [elasticsearch] User-provided failure handler for ElasticsearchSink

          This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a
          failure handler to control how failed action requests are dealt with.

          The commit also includes general improvements to FLINK-5122:
          1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not
          available for Elasticsearch 1.x)
          2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler

          commit 1c448e3177c65ebc627bdd4ecfff76bbf209ddde
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-02-20T08:50:19Z

          FLINK-5487 [elasticsearch] At-least-once Elasticsearch Sink


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3358 FLINK-5487 [elasticsearch] At-least-once ElasticsearchSink This PR adds proper support for an at-least-once `ElasticsearchSink`. This is based on the pluggable error handling strategy functionality added in #3426, so only the last commit is relevant. Like the Kafka producer, the way it works is that pending requests not yet acknowledged by Elasticsearch needs to be flushed before proceeding with the next record from upstream. Slight difference is that for the `ElasticsearchSink`, since we're allowing re-adding failed requests back to the internal `BulkProcessor` (as part of #3426), we'll also need to wait for the re-added requests. The docs warn that if requests are re-added, it may lead to longer checkpoints since we need to wait for those too. Flushing is enabled by default, but we provide a `disableFlushOnCheckpoint` method to switch it off. The docs and Javadoc of the method warns the user how this would affect at-least-once delivery. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5487 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3358.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3358 commit 6a826b8eb7a98e3d159999bc44d827df54c94fdd Author: Max Kuklinski <max.kuklinski@live.de> Date: 2016-11-23T16:54:11Z FLINK-5122 [elasticsearch] Retry temporary Elasticsearch request errors. Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full commit 9cb60c263fb0df9a8ccd82b33070e22085b5ab23 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-01-30T05:55:26Z FLINK-5353 [elasticsearch] User-provided failure handler for ElasticsearchSink This commit fixes both FLINK-5353 and FLINK-5122 . It allows users to implement a failure handler to control how failed action requests are dealt with. The commit also includes general improvements to FLINK-5122 : 1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not available for Elasticsearch 1.x) 2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler commit 1c448e3177c65ebc627bdd4ecfff76bbf209ddde Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-02-20T08:50:19Z FLINK-5487 [elasticsearch] At-least-once Elasticsearch Sink
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102177573

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java —
          @@ -0,0 +1,72 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one or more
          + * contributor license agreements. See the NOTICE file distributed with
          + * this work for additional information regarding copyright ownership.
          + * The ASF licenses this file to You under the Apache License, Version 2.0
          + * (the "License"); you may not use this file except in compliance with
          + * the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.elasticsearch;
          +
          +import org.elasticsearch.action.ActionRequest;
          +
          +import java.io.Serializable;
          +
          +/**
          + * An implementation of

          {@link ActionRequestFailureHandler}

          is provided by the user to define how failed
          + *

          {@link ActionRequest ActionRequests}

          should be handled, ex. dropping them, reprocessing malformed documents, or
          — End diff –

          I'm not sure if the "ex." is correct here: http://english.stackexchange.com/questions/16197/whats-the-difference-between-e-g-and-ex

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102177573 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ActionRequestFailureHandler.java — @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +/** + * An implementation of {@link ActionRequestFailureHandler} is provided by the user to define how failed + * {@link ActionRequest ActionRequests} should be handled, ex. dropping them, reprocessing malformed documents, or — End diff – I'm not sure if the "ex." is correct here: http://english.stackexchange.com/questions/16197/whats-the-difference-between-e-g-and-ex
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102180098

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) {
              • End diff –

          I wonder if it would be better if the `onFailure` method would not return a boolean but throw a Throwable?
          This way users have more flexibility in implementing their failure handler.

          For example if a failure handler is doing three retries and fails afterwards, the original exception will be thrown. If the `onFailure()` method can throw their own exception, you can throw a custom exception that tells the user about the three retries.

          We can definitively discuss this because this change is annoying to do (docs & javadocs need to be updated).

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102180098 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { End diff – I wonder if it would be better if the `onFailure` method would not return a boolean but throw a Throwable? This way users have more flexibility in implementing their failure handler. For example if a failure handler is doing three retries and fails afterwards, the original exception will be thrown. If the `onFailure()` method can throw their own exception, you can throw a custom exception that tells the user about the three retries. We can definitively discuss this because this change is annoying to do (docs & javadocs need to be updated).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102177099

          — Diff: docs/dev/connectors/elasticsearch.md —
          @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
          The difference is that now we do not need to provide a list of addresses
          of Elasticsearch nodes.

          +### Handling Failing Elasticsearch Requests
          +
          +Elasticsearch action requests may fail due to a variety of reasons, including
          +temporarily saturated node queue capacity or malformed documents to be indexed.
          +The Flink Elasticsearch Sink allows the user to specify how request
          +failures are handled, by simply implementing an `ActionRequestFailureHandler` and
          +providing it to the constructor.
          +
          +Below is an example:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +DataStream<String> input = ...;
          +
          +input.addSink(new ElasticsearchSink<>(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction<String>()

          {...},
          + new ActionRequestFailureHandler() {
          + @Override
          + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + }
          + }
          +}));
          +{% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val input: DataStream[String] = ...
          +
          +input.addSink(new ElasticsearchSink(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction[String] {...}

          ,
          + new ActionRequestFailureHandler {
          + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0)

          { + // full queue; re-add document for indexing + indexer.add(action) + return false + }

          else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class)

          { + // malformed document; simply drop request without failing sink + return false + }

          else

          { + // for all other failures, fail the sink + return true + }

          + }
          +}))
          +

          {% endhighlight %}

          +</div>
          +</div>
          +
          +The above example will let the sink re-add requests that failed due to
          +queue capacity saturation and drop requests with malformed documents, without
          +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
          +is not provided to the constructor, the sink will fail for any kind of error.
          +
          +Note that `onFailure` is called for failures that still occur only after the
          +`BulkProcessor` internally finishes all backoff retry attempts.
          +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
          +an exponential backoff. For more information on the behaviour of the
          +internal `BulkProcessor` and how to configure it, please see the following section.
          +
          +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
          +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
          +on failures will lead to longer checkpoints, as the sink will also
          +need to wait for the re-added requests to be flushed when checkpointing.
          +This also means that if re-added requests never succeed, the checkpoint will
          +never finish.
          +</p>
          +
          +
          +### Configuring the Internal Bulk Processor
          +
          +The internal `BulkProcessor` can be further configured for its behaviour
          +on how buffered action requests are flushed, by setting the following values in
          +the provided `Map<String, String>`:
          +
          + * *bulk.flush.max.actions*: Maximum amount of actions to buffer before flushing.
          + * *bulk.flush.max.size.mb*: Maximum size of data (in megabytes) to buffer before flushing.
          + * *bulk.flush.interval.ms*: Interval at which to flush regardless of the amount or size of buffered actions.
          +
          +For versions 2.x and above, configuring how temporary request errors are
          +retried is also supported:
          +
          + * *bulk.flush.backoff.enable*: Whether or not to perform retries with backoff delay for a flush
          + if one or more of its actions failed due to a temporary `EsRejectedExecutionException`.
          + * *bulk.flush.backoff.type*: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL`
          + * *bulk.flush.backoff.delay*: The amount of delay for backoff. For constant backoff, this
          + is simply the delay between each retry. For exponential backoff, this is the initial base delay.
          + * *bulk.flush.backoff.retries*: The amount of backoff retries to attempt.
          +
          More information about Elasticsearch can be found [here](https://elastic.co).

                  1. Packaging the Elasticsearch Connector into an Uber-Jar
                    +## Packaging the Elasticsearch Connector into an Uber-Jar
              • End diff –

          I like the reworked documentation page a lot!

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102177099 — Diff: docs/dev/connectors/elasticsearch.md — @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction [String] {...} , + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) + {% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> + + +### Configuring the Internal Bulk Processor + +The internal `BulkProcessor` can be further configured for its behaviour +on how buffered action requests are flushed, by setting the following values in +the provided `Map<String, String>`: + + * * bulk.flush.max.actions *: Maximum amount of actions to buffer before flushing. + * * bulk.flush.max.size.mb *: Maximum size of data (in megabytes) to buffer before flushing. + * * bulk.flush.interval.ms *: Interval at which to flush regardless of the amount or size of buffered actions. + +For versions 2.x and above, configuring how temporary request errors are +retried is also supported: + + * * bulk.flush.backoff.enable *: Whether or not to perform retries with backoff delay for a flush + if one or more of its actions failed due to a temporary `EsRejectedExecutionException`. + * * bulk.flush.backoff.type *: The type of backoff delay, either `CONSTANT` or `EXPONENTIAL` + * * bulk.flush.backoff.delay *: The amount of delay for backoff. For constant backoff, this + is simply the delay between each retry. For exponential backoff, this is the initial base delay. + * * bulk.flush.backoff.retries *: The amount of backoff retries to attempt. + More information about Elasticsearch can be found [here] ( https://elastic.co ). Packaging the Elasticsearch Connector into an Uber-Jar +## Packaging the Elasticsearch Connector into an Uber-Jar End diff – I like the reworked documentation page a lot!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102178460

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -67,10 +73,56 @@
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
          public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
          +
          + public enum FlushBackoffType

          { + CONSTANT, + EXPONENTIAL + }

          +
          + public class BulkFlushBackoffPolicy implements Serializable {
          +
          + private static final long serialVersionUID = -6022851996101826049L;
          +
          + // the default values follow the Elasticsearch default settings for BulkProcessor
          + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
          + private int maxRetryCount = 8;
          + private long delayMillis = 50;
          +
          + public FlushBackoffType getBackoffType()

          { + return backoffType; + }

          +
          + public int getMaxRetryCount()

          { + return maxRetryCount; + }

          +
          + public long getDelayMillis()

          { + return delayMillis; + }

          +
          + public void setBackoffType(FlushBackoffType backoffType)

          { + this.backoffType = checkNotNull(backoffType); + }

          +
          + public void setMaxRetryCount(int maxRetryCount) {
          + checkArgument(maxRetryCount > 0);
          — End diff –

          Isn't 0 also an acceptable value here? If users want to disable retries entirely?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102178460 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -67,10 +73,56 @@ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay"; + + public enum FlushBackoffType { + CONSTANT, + EXPONENTIAL + } + + public class BulkFlushBackoffPolicy implements Serializable { + + private static final long serialVersionUID = -6022851996101826049L; + + // the default values follow the Elasticsearch default settings for BulkProcessor + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL; + private int maxRetryCount = 8; + private long delayMillis = 50; + + public FlushBackoffType getBackoffType() { + return backoffType; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public long getDelayMillis() { + return delayMillis; + } + + public void setBackoffType(FlushBackoffType backoffType) { + this.backoffType = checkNotNull(backoffType); + } + + public void setMaxRetryCount(int maxRetryCount) { + checkArgument(maxRetryCount > 0); — End diff – Isn't 0 also an acceptable value here? If users want to disable retries entirely?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102179118

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -122,10 +198,19 @@ public ElasticsearchSinkBase(
          "The object probably contains or references non serializable fields.");
          }

          • checkNotNull(userConfig);
            + try { + InstantiationUtil.serializeObject(failureHandler); + }

            catch (Exception e)

            { + throw new IllegalArgumentException( + "The implementation of the provided ActionRequestFailureHandler is not serializable. " + + "The object probably contains or references non serializable fields."); + }
              • End diff –

          This looks a bit like duplicate code. I think adding a utility into the `InstantiationUtil` that is called `isSerializable()` would be cleaner and save some LOC.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102179118 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -122,10 +198,19 @@ public ElasticsearchSinkBase( "The object probably contains or references non serializable fields."); } checkNotNull(userConfig); + try { + InstantiationUtil.serializeObject(failureHandler); + } catch (Exception e) { + throw new IllegalArgumentException( + "The implementation of the provided ActionRequestFailureHandler is not serializable. " + + "The object probably contains or references non serializable fields."); + } End diff – This looks a bit like duplicate code. I think adding a utility into the `InstantiationUtil` that is called `isSerializable()` would be cleaner and save some LOC.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102178621

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -67,10 +73,56 @@
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
          public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
          +
          + public enum FlushBackoffType

          { + CONSTANT, + EXPONENTIAL + }

          +
          + public class BulkFlushBackoffPolicy implements Serializable {
          +
          + private static final long serialVersionUID = -6022851996101826049L;
          +
          + // the default values follow the Elasticsearch default settings for BulkProcessor
          + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
          + private int maxRetryCount = 8;
          + private long delayMillis = 50;
          +
          + public FlushBackoffType getBackoffType()

          { + return backoffType; + }

          +
          + public int getMaxRetryCount()

          { + return maxRetryCount; + }

          +
          + public long getDelayMillis()

          { + return delayMillis; + }

          +
          + public void setBackoffType(FlushBackoffType backoffType)

          { + this.backoffType = checkNotNull(backoffType); + }

          +
          + public void setMaxRetryCount(int maxRetryCount)

          { + checkArgument(maxRetryCount > 0); + this.maxRetryCount = maxRetryCount; + }

          +
          + public void setDelayMillis(long delayMillis) {
          + checkArgument(delayMillis > 0);
          — End diff –

          We should accept 0 here as well, if users want to retry immediately (for whatever reason )

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102178621 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -67,10 +73,56 @@ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay"; + + public enum FlushBackoffType { + CONSTANT, + EXPONENTIAL + } + + public class BulkFlushBackoffPolicy implements Serializable { + + private static final long serialVersionUID = -6022851996101826049L; + + // the default values follow the Elasticsearch default settings for BulkProcessor + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL; + private int maxRetryCount = 8; + private long delayMillis = 50; + + public FlushBackoffType getBackoffType() { + return backoffType; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public long getDelayMillis() { + return delayMillis; + } + + public void setBackoffType(FlushBackoffType backoffType) { + this.backoffType = checkNotNull(backoffType); + } + + public void setMaxRetryCount(int maxRetryCount) { + checkArgument(maxRetryCount > 0); + this.maxRetryCount = maxRetryCount; + } + + public void setDelayMillis(long delayMillis) { + checkArgument(delayMillis > 0); — End diff – We should accept 0 here as well, if users want to retry immediately (for whatever reason )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102177012

          — Diff: docs/dev/connectors/elasticsearch.md —
          @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
          The difference is that now we do not need to provide a list of addresses
          of Elasticsearch nodes.

          +### Handling Failing Elasticsearch Requests
          +
          +Elasticsearch action requests may fail due to a variety of reasons, including
          +temporarily saturated node queue capacity or malformed documents to be indexed.
          +The Flink Elasticsearch Sink allows the user to specify how request
          +failures are handled, by simply implementing an `ActionRequestFailureHandler` and
          +providing it to the constructor.
          +
          +Below is an example:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +DataStream<String> input = ...;
          +
          +input.addSink(new ElasticsearchSink<>(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction<String>()

          {...},
          + new ActionRequestFailureHandler() {
          + @Override
          + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + }
          + }
          +}));
          +{% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val input: DataStream[String] = ...
          +
          +input.addSink(new ElasticsearchSink(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction[String] {...}

          ,
          + new ActionRequestFailureHandler {
          + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0)

          { + // full queue; re-add document for indexing + indexer.add(action) + return false + }

          else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class)

          { + // malformed document; simply drop request without failing sink + return false + }

          else

          { + // for all other failures, fail the sink + return true + }

          + }
          +}))
          +

          {% endhighlight %}

          +</div>
          +</div>
          +
          +The above example will let the sink re-add requests that failed due to
          +queue capacity saturation and drop requests with malformed documents, without
          +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
          +is not provided to the constructor, the sink will fail for any kind of error.
          +
          +Note that `onFailure` is called for failures that still occur only after the
          +`BulkProcessor` internally finishes all backoff retry attempts.
          +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
          +an exponential backoff. For more information on the behaviour of the
          +internal `BulkProcessor` and how to configure it, please see the following section.
          +
          +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
          +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
          +on failures will lead to longer checkpoints, as the sink will also
          +need to wait for the re-added requests to be flushed when checkpointing.
          +This also means that if re-added requests never succeed, the checkpoint will
          +never finish.
          +</p>
          — End diff –

          I think the docs should mention the `NoOpActionRequestFailureHandler`.

          Also I wonder if we should offer a default `RetryActionRequestFailureHandler`. I suspect that many users will need that. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102177012 — Diff: docs/dev/connectors/elasticsearch.md — @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction [String] {...} , + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) + {% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> — End diff – I think the docs should mention the `NoOpActionRequestFailureHandler`. Also I wonder if we should offer a default `RetryActionRequestFailureHandler`. I suspect that many users will need that. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102181691

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }

            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());

              • End diff –

          Lets say a bulk with 500 actions fails, so we re-add the bulk again, but subtract 500 actions from the pending requests.

          Now the bulk succeeds and we subtract 500 actions again. Which would make the num pending requests negative? and void the at least once guarantees?

          Am I overseeing something here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102181691 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } + + numPendingRequests.getAndAdd(-request.numberOfActions()); End diff – Lets say a bulk with 500 actions fails, so we re-add the bulk again, but subtract 500 actions from the pending requests. Now the bulk succeeds and we subtract 500 actions again. Which would make the num pending requests negative? and void the at least once guarantees? Am I overseeing something here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102181294

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }
              • End diff –

          I wonder if we should provide a custom, pluggable retry logic here as well. If you are sure that only connection issues cause this, we can leave it as is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102181294 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } End diff – I wonder if we should provide a custom, pluggable retry logic here as well. If you are sure that only connection issues cause this, we can leave it as is.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102183423

          — Diff: docs/dev/connectors/elasticsearch.md —
          @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
          The difference is that now we do not need to provide a list of addresses
          of Elasticsearch nodes.

          +### Handling Failing Elasticsearch Requests
          +
          +Elasticsearch action requests may fail due to a variety of reasons, including
          +temporarily saturated node queue capacity or malformed documents to be indexed.
          +The Flink Elasticsearch Sink allows the user to specify how request
          +failures are handled, by simply implementing an `ActionRequestFailureHandler` and
          +providing it to the constructor.
          +
          +Below is an example:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +DataStream<String> input = ...;
          +
          +input.addSink(new ElasticsearchSink<>(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction<String>()

          {...},
          + new ActionRequestFailureHandler() {
          + @Override
          + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + }
          + }
          +}));
          +{% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val input: DataStream[String] = ...
          +
          +input.addSink(new ElasticsearchSink(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction[String] {...}

          ,
          + new ActionRequestFailureHandler {
          + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0)

          { + // full queue; re-add document for indexing + indexer.add(action) + return false + }

          else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class)

          { + // malformed document; simply drop request without failing sink + return false + }

          else

          { + // for all other failures, fail the sink + return true + }

          + }
          +}))
          +

          {% endhighlight %}

          +</div>
          +</div>
          +
          +The above example will let the sink re-add requests that failed due to
          +queue capacity saturation and drop requests with malformed documents, without
          +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
          +is not provided to the constructor, the sink will fail for any kind of error.
          +
          +Note that `onFailure` is called for failures that still occur only after the
          +`BulkProcessor` internally finishes all backoff retry attempts.
          +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
          +an exponential backoff. For more information on the behaviour of the
          +internal `BulkProcessor` and how to configure it, please see the following section.
          +
          +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
          +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
          +on failures will lead to longer checkpoints, as the sink will also
          +need to wait for the re-added requests to be flushed when checkpointing.
          +This also means that if re-added requests never succeed, the checkpoint will
          +never finish.
          +</p>
          — End diff –

          Good point. I'll add to mention that by default, the sink uses the `NoOpActionRequestFailureHandler `.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102183423 — Diff: docs/dev/connectors/elasticsearch.md — @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction [String] {...} , + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) + {% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> — End diff – Good point. I'll add to mention that by default, the sink uses the `NoOpActionRequestFailureHandler `.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102183643

          — Diff: docs/dev/connectors/elasticsearch.md —
          @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
          The difference is that now we do not need to provide a list of addresses
          of Elasticsearch nodes.

          +### Handling Failing Elasticsearch Requests
          +
          +Elasticsearch action requests may fail due to a variety of reasons, including
          +temporarily saturated node queue capacity or malformed documents to be indexed.
          +The Flink Elasticsearch Sink allows the user to specify how request
          +failures are handled, by simply implementing an `ActionRequestFailureHandler` and
          +providing it to the constructor.
          +
          +Below is an example:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +DataStream<String> input = ...;
          +
          +input.addSink(new ElasticsearchSink<>(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction<String>()

          {...},
          + new ActionRequestFailureHandler() {
          + @Override
          + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + }
          + }
          +}));
          +{% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val input: DataStream[String] = ...
          +
          +input.addSink(new ElasticsearchSink(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction[String] {...}

          ,
          + new ActionRequestFailureHandler {
          + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0)

          { + // full queue; re-add document for indexing + indexer.add(action) + return false + }

          else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class)

          { + // malformed document; simply drop request without failing sink + return false + }

          else

          { + // for all other failures, fail the sink + return true + }

          + }
          +}))
          +

          {% endhighlight %}

          +</div>
          +</div>
          +
          +The above example will let the sink re-add requests that failed due to
          +queue capacity saturation and drop requests with malformed documents, without
          +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
          +is not provided to the constructor, the sink will fail for any kind of error.
          +
          +Note that `onFailure` is called for failures that still occur only after the
          +`BulkProcessor` internally finishes all backoff retry attempts.
          +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
          +an exponential backoff. For more information on the behaviour of the
          +internal `BulkProcessor` and how to configure it, please see the following section.
          +
          +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
          +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
          +on failures will lead to longer checkpoints, as the sink will also
          +need to wait for the re-added requests to be flushed when checkpointing.
          +This also means that if re-added requests never succeed, the checkpoint will
          +never finish.
          +</p>
          — End diff –

          I think a pre-implemented `ActionRequestFailureHandler` that re-adds requests for full queue exceptions will be nice, and useful out-of-the box for a large portion of users. Great idea!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102183643 — Diff: docs/dev/connectors/elasticsearch.md — @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction [String] {...} , + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) + {% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> — End diff – I think a pre-implemented `ActionRequestFailureHandler` that re-adds requests for full queue exceptions will be nice, and useful out-of-the box for a large portion of users. Great idea!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102183754

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -67,10 +73,56 @@
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
          public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
          +
          + public enum FlushBackoffType

          { + CONSTANT, + EXPONENTIAL + }

          +
          + public class BulkFlushBackoffPolicy implements Serializable {
          +
          + private static final long serialVersionUID = -6022851996101826049L;
          +
          + // the default values follow the Elasticsearch default settings for BulkProcessor
          + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
          + private int maxRetryCount = 8;
          + private long delayMillis = 50;
          +
          + public FlushBackoffType getBackoffType()

          { + return backoffType; + }

          +
          + public int getMaxRetryCount()

          { + return maxRetryCount; + }

          +
          + public long getDelayMillis()

          { + return delayMillis; + }

          +
          + public void setBackoffType(FlushBackoffType backoffType)

          { + this.backoffType = checkNotNull(backoffType); + }

          +
          + public void setMaxRetryCount(int maxRetryCount) {
          + checkArgument(maxRetryCount > 0);
          — End diff –

          Yup, 0 should be acceptable.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102183754 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -67,10 +73,56 @@ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay"; + + public enum FlushBackoffType { + CONSTANT, + EXPONENTIAL + } + + public class BulkFlushBackoffPolicy implements Serializable { + + private static final long serialVersionUID = -6022851996101826049L; + + // the default values follow the Elasticsearch default settings for BulkProcessor + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL; + private int maxRetryCount = 8; + private long delayMillis = 50; + + public FlushBackoffType getBackoffType() { + return backoffType; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public long getDelayMillis() { + return delayMillis; + } + + public void setBackoffType(FlushBackoffType backoffType) { + this.backoffType = checkNotNull(backoffType); + } + + public void setMaxRetryCount(int maxRetryCount) { + checkArgument(maxRetryCount > 0); — End diff – Yup, 0 should be acceptable.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102183824

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -67,10 +73,56 @@
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
          public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
          public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
          + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";
          +
          + public enum FlushBackoffType

          { + CONSTANT, + EXPONENTIAL + }

          +
          + public class BulkFlushBackoffPolicy implements Serializable {
          +
          + private static final long serialVersionUID = -6022851996101826049L;
          +
          + // the default values follow the Elasticsearch default settings for BulkProcessor
          + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL;
          + private int maxRetryCount = 8;
          + private long delayMillis = 50;
          +
          + public FlushBackoffType getBackoffType()

          { + return backoffType; + }

          +
          + public int getMaxRetryCount()

          { + return maxRetryCount; + }

          +
          + public long getDelayMillis()

          { + return delayMillis; + }

          +
          + public void setBackoffType(FlushBackoffType backoffType)

          { + this.backoffType = checkNotNull(backoffType); + }

          +
          + public void setMaxRetryCount(int maxRetryCount)

          { + checkArgument(maxRetryCount > 0); + this.maxRetryCount = maxRetryCount; + }

          +
          + public void setDelayMillis(long delayMillis) {
          + checkArgument(delayMillis > 0);
          — End diff –

          True, 0 should be acceptable. Nice catches.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102183824 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -67,10 +73,56 @@ public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries"; + public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay"; + + public enum FlushBackoffType { + CONSTANT, + EXPONENTIAL + } + + public class BulkFlushBackoffPolicy implements Serializable { + + private static final long serialVersionUID = -6022851996101826049L; + + // the default values follow the Elasticsearch default settings for BulkProcessor + private FlushBackoffType backoffType = FlushBackoffType.EXPONENTIAL; + private int maxRetryCount = 8; + private long delayMillis = 50; + + public FlushBackoffType getBackoffType() { + return backoffType; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public long getDelayMillis() { + return delayMillis; + } + + public void setBackoffType(FlushBackoffType backoffType) { + this.backoffType = checkNotNull(backoffType); + } + + public void setMaxRetryCount(int maxRetryCount) { + checkArgument(maxRetryCount > 0); + this.maxRetryCount = maxRetryCount; + } + + public void setDelayMillis(long delayMillis) { + checkArgument(delayMillis > 0); — End diff – True, 0 should be acceptable. Nice catches.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102184175

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }
              • End diff –

          I'll need to double check this. The ES documents don't say much about this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102184175 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } End diff – I'll need to double check this. The ES documents don't say much about this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102184622

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }

            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());

              • End diff –

          The `BulkProcessorIndexer` will increment `numPendingRequests` whenever the user calls `add(ActionRequest)`. So, in your description, when the user re-adds the 500 requests, `numPendingRequests` first becomes `500+500=1000`. Then, we consider the failed 500 requests to have completed when this line is reached, so `numPendingRequests` becomes `1000-500=500`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102184622 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } + + numPendingRequests.getAndAdd(-request.numberOfActions()); End diff – The `BulkProcessorIndexer` will increment `numPendingRequests` whenever the user calls `add(ActionRequest)`. So, in your description, when the user re-adds the 500 requests, `numPendingRequests` first becomes `500+500=1000`. Then, we consider the failed 500 requests to have completed when this line is reached, so `numPendingRequests` becomes `1000-500=500`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102185551

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) {
              • End diff –

          That's actually a good idea, I didn't think it that way. I would like to change it to throw a `Throwable` instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102185551 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { End diff – That's actually a good idea, I didn't think it that way. I would like to change it to throw a `Throwable` instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102185790

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) {
              • End diff –

          For the use case you mentioned, that would mean the user implements a stateful `ActionRequestFailureHandler`, with its state being the number of failures so far, correct?

          I didn't think about this too much, but I guess there shouldn't be a problem for this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102185790 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { End diff – For the use case you mentioned, that would mean the user implements a stateful `ActionRequestFailureHandler`, with its state being the number of failures so far, correct? I didn't think about this too much, but I guess there shouldn't be a problem for this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102186137

          — Diff: docs/dev/connectors/elasticsearch.md —
          @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
          The difference is that now we do not need to provide a list of addresses
          of Elasticsearch nodes.

          +### Handling Failing Elasticsearch Requests
          +
          +Elasticsearch action requests may fail due to a variety of reasons, including
          +temporarily saturated node queue capacity or malformed documents to be indexed.
          +The Flink Elasticsearch Sink allows the user to specify how request
          +failures are handled, by simply implementing an `ActionRequestFailureHandler` and
          +providing it to the constructor.
          +
          +Below is an example:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +DataStream<String> input = ...;
          +
          +input.addSink(new ElasticsearchSink<>(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction<String>()

          {...},
          + new ActionRequestFailureHandler() {
          + @Override
          + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + }
          + }
          +}));
          +{% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val input: DataStream[String] = ...
          +
          +input.addSink(new ElasticsearchSink(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction[String] {...}

          ,
          + new ActionRequestFailureHandler {
          + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0)

          { + // full queue; re-add document for indexing + indexer.add(action) + return false + }

          else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class)

          { + // malformed document; simply drop request without failing sink + return false + }

          else

          { + // for all other failures, fail the sink + return true + }

          + }
          +}))
          +

          {% endhighlight %}

          +</div>
          +</div>
          +
          +The above example will let the sink re-add requests that failed due to
          +queue capacity saturation and drop requests with malformed documents, without
          +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
          +is not provided to the constructor, the sink will fail for any kind of error.
          +
          +Note that `onFailure` is called for failures that still occur only after the
          +`BulkProcessor` internally finishes all backoff retry attempts.
          +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
          +an exponential backoff. For more information on the behaviour of the
          +internal `BulkProcessor` and how to configure it, please see the following section.
          +
          +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
          +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
          +on failures will lead to longer checkpoints, as the sink will also
          +need to wait for the re-added requests to be flushed when checkpointing.
          +This also means that if re-added requests never succeed, the checkpoint will
          +never finish.
          +</p>
          — End diff –

          I wouldn't suggest adding a `ActionRequestFailureHandler` that out-of-the-box retries for all exceptions, though. That could let users easily overlook some exceptions that simply cannot be retried without custom logic (for example, malformed documents with wrong field types).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102186137 — Diff: docs/dev/connectors/elasticsearch.md — @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction [String] {...} , + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) + {% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> — End diff – I wouldn't suggest adding a `ActionRequestFailureHandler` that out-of-the-box retries for all exceptions, though. That could let users easily overlook some exceptions that simply cannot be retried without custom logic (for example, malformed documents with wrong field types).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102203605

          — Diff: docs/dev/connectors/elasticsearch.md —
          @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String
          The difference is that now we do not need to provide a list of addresses
          of Elasticsearch nodes.

          +### Handling Failing Elasticsearch Requests
          +
          +Elasticsearch action requests may fail due to a variety of reasons, including
          +temporarily saturated node queue capacity or malformed documents to be indexed.
          +The Flink Elasticsearch Sink allows the user to specify how request
          +failures are handled, by simply implementing an `ActionRequestFailureHandler` and
          +providing it to the constructor.
          +
          +Below is an example:
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +

          {% highlight java %}

          +DataStream<String> input = ...;
          +
          +input.addSink(new ElasticsearchSink<>(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction<String>()

          {...},
          + new ActionRequestFailureHandler() {
          + @Override
          + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + }
          + }
          +}));
          +{% endhighlight %}
          +</div>
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}
          +val input: DataStream[String] = ...
          +
          +input.addSink(new ElasticsearchSink(
          + config, transportAddresses,
          + new ElasticsearchSinkFunction[String] {...}

          ,
          + new ActionRequestFailureHandler {
          + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
          + // this example uses Apache Commons to search for nested exceptions
          +
          + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0)

          { + // full queue; re-add document for indexing + indexer.add(action) + return false + }

          else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class)

          { + // malformed document; simply drop request without failing sink + return false + }

          else

          { + // for all other failures, fail the sink + return true + }

          + }
          +}))
          +

          {% endhighlight %}

          +</div>
          +</div>
          +
          +The above example will let the sink re-add requests that failed due to
          +queue capacity saturation and drop requests with malformed documents, without
          +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler`
          +is not provided to the constructor, the sink will fail for any kind of error.
          +
          +Note that `onFailure` is called for failures that still occur only after the
          +`BulkProcessor` internally finishes all backoff retry attempts.
          +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
          +an exponential backoff. For more information on the behaviour of the
          +internal `BulkProcessor` and how to configure it, please see the following section.
          +
          +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
          +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
          +on failures will lead to longer checkpoints, as the sink will also
          +need to wait for the re-added requests to be flushed when checkpointing.
          +This also means that if re-added requests never succeed, the checkpoint will
          +never finish.
          +</p>
          — End diff –

          I agree that we should provide a reasonable default behavior, instead of just retrying.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102203605 — Diff: docs/dev/connectors/elasticsearch.md — @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes. +### Handling Failing Elasticsearch Requests + +Elasticsearch action requests may fail due to a variety of reasons, including +temporarily saturated node queue capacity or malformed documents to be indexed. +The Flink Elasticsearch Sink allows the user to specify how request +failures are handled, by simply implementing an `ActionRequestFailureHandler` and +providing it to the constructor. + +Below is an example: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + {% highlight java %} +DataStream<String> input = ...; + +input.addSink(new ElasticsearchSink<>( + config, transportAddresses, + new ElasticsearchSinkFunction<String>() {...}, + new ActionRequestFailureHandler() { + @Override + boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action); + return false; + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) >= 0) { + // malformed document; simply drop request without failing sink + return false; + } else { + // for all other failures, fail the sink + return true; + } + } +})); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [String] = ... + +input.addSink(new ElasticsearchSink( + config, transportAddresses, + new ElasticsearchSinkFunction [String] {...} , + new ActionRequestFailureHandler { + override def onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { + // this example uses Apache Commons to search for nested exceptions + + if (ExceptionUtils.indexOfThrowable(failure, EsRejectedExecutionException.class) >= 0) { + // full queue; re-add document for indexing + indexer.add(action) + return false + } else if (ExceptionUtils.indexOfThrowable(failure, ElasticsearchParseException.class) { + // malformed document; simply drop request without failing sink + return false + } else { + // for all other failures, fail the sink + return true + } + } +})) + {% endhighlight %} +</div> +</div> + +The above example will let the sink re-add requests that failed due to +queue capacity saturation and drop requests with malformed documents, without +failing the sink. For all other failures, the sink will fail. If a `ActionRequestFailureHandler` +is not provided to the constructor, the sink will fail for any kind of error. + +Note that `onFailure` is called for failures that still occur only after the +`BulkProcessor` internally finishes all backoff retry attempts. +By default, the `BulkProcessor` retries to a maximum of 8 attempts with +an exponential backoff. For more information on the behaviour of the +internal `BulkProcessor` and how to configure it, please see the following section. + +<p style="border-radius: 5px; padding: 5px" class="bg-danger"> +<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b> +on failures will lead to longer checkpoints, as the sink will also +need to wait for the re-added requests to be flushed when checkpointing. +This also means that if re-added requests never succeed, the checkpoint will +never finish. +</p> — End diff – I agree that we should provide a reasonable default behavior, instead of just retrying.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102204385

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) {
              • End diff –

          Mh. I don't know if the use case I've mentioned makes a lot of sense. Probably most of the users just want to use a custom logic to decide how to do the retries / discards.

          I think we shouldn't do complicated things like checkpointing the state of the failure handler. Its good enough if the user keeps it locally (and loses it on failure)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102204385 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { End diff – Mh. I don't know if the use case I've mentioned makes a lot of sense. Probably most of the users just want to use a custom logic to decide how to do the retries / discards. I think we shouldn't do complicated things like checkpointing the state of the failure handler. Its good enough if the user keeps it locally (and loses it on failure)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102204579

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }

            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());

              • End diff –

          Puh, that's good Thx for the explanation.
          I didn't look close enough on your changes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102204579 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } + + numPendingRequests.getAndAdd(-request.numberOfActions()); End diff – Puh, that's good Thx for the explanation. I didn't look close enough on your changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102204839

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }
              • End diff –

          Thank you. I'm undecided if we want to add this here or not.
          Just based on my experience with the Kafka connector, at some point there is a user who wants to have a very specific custom behavior But we can also keep it as is and fix it if a user needs it (worst case: they have to override our implementation)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102204839 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } End diff – Thank you. I'm undecided if we want to add this here or not. Just based on my experience with the Kafka connector, at some point there is a user who wants to have a very specific custom behavior But we can also keep it as is and fix it if a user needs it (worst case: they have to override our implementation)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3358

          I finished checking the code.
          The only thing I'm missing from the change is a test case ensuring that the implementation works.

          I think we can build a test similar to what we did with Kafka. (With a mock producer)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3358 I finished checking the code. The only thing I'm missing from the change is a test case ensuring that the implementation works. I think we can build a test similar to what we did with Kafka. (With a mock producer)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102233292

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) {
              • End diff –

          Alright, then I'll simply just change the `boolean` return usage to throwing a `Throwable`, and add some Javadoc that any state in the failure handler is volatile.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102233292 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { End diff – Alright, then I'll simply just change the `boolean` return usage to throwing a `Throwable`, and add some Javadoc that any state in the failure handler is volatile.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3358

          Yes, I overlooked adding tests for this.

          Thanks a lot for the reviews @rmetzger! I'll address your comments and tests for the additional features.
          Will ping you once it's ready for another review.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3358 Yes, I overlooked adding tests for this. Thanks a lot for the reviews @rmetzger! I'll address your comments and tests for the additional features. Will ping you once it's ready for another review.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102400283

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { }
          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          if (response.hasFailures()) {

          • for (BulkItemResponse itemResp : response.getItems()) {
          • Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
            + BulkItemResponse itemResponse;
            + Throwable failure;
            +
            + for (int i = 0; i < response.getItems().length; i++) {
            + itemResponse = response.getItems()[i];
            + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
            if (failure != null) {
          • LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure);
            +
            + if (failureHandler.onFailure(request.requests().get, failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + }

            }
            }
            }
            +
            + numPendingRequests.getAndAdd(-request.numberOfActions());
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure);
          • failureThrowable.compareAndSet(null, failure);
            + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause());
            +
            + // whole bulk request failures are usually just temporary timeouts on
            + // the Elasticsearch side; simply retry all action requests in the bulk
            + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + }
              • End diff –

          It seems like we will need to use the failure handler too.
          Any exception that the Elasticsearch `Client` throws while issuing the bulk request can appear here too. So, exceptions like unreachable node can pop out here as well, and I don't think we should implicitly treat them as temporary.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102400283 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems() [i] ; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get , failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } End diff – It seems like we will need to use the failure handler too. Any exception that the Elasticsearch `Client` throws while issuing the bulk request can appear here too. So, exceptions like unreachable node can pop out here as well, and I don't think we should implicitly treat them as temporary.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3358

          @rmetzger All of your comments have been addressed, and tests for the new features have been added (in `ElasticsearchSinkBaseTest`). Can you take another look? Thanks a lot!

          Some notes on changes I made that weren't previously discussed:
          1. Renamed `NoOpActionRequestFailureHandler` to just `NoOpActionFailureHandler` - less of a mouthful
          2. I added the responsed REST status code through the failure handler's `onFailure(...)` callback. The reason for this is explained in the doc / Javadoc changes of the last follow-up commit (c594523).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3358 @rmetzger All of your comments have been addressed, and tests for the new features have been added (in `ElasticsearchSinkBaseTest`). Can you take another look? Thanks a lot! Some notes on changes I made that weren't previously discussed: 1. Renamed `NoOpActionRequestFailureHandler` to just `NoOpActionFailureHandler` - less of a mouthful 2. I added the responsed REST status code through the failure handler's `onFailure(...)` callback. The reason for this is explained in the doc / Javadoc changes of the last follow-up commit (c594523).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102545769

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
          }

          @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + // no initialization needed + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception {
          + checkErrorAndRethrow();
          +
          + if (flushOnCheckpoint) {
          + do {
          + bulkProcessor.flush();
          — End diff –

          This flush() might be a noop if bulkRequest.numberOfActions() == 0 in the bulkProcessor implementation.
          If so, this loop turns into a busy loop wasting CPU cycles.
          I wonder if we should wait on the numPendingRequests and notify on it once we update it?

          (Sorry that I bring this up in the second review)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102545769 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception { } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); — End diff – This flush() might be a noop if bulkRequest.numberOfActions() == 0 in the bulkProcessor implementation. If so, this loop turns into a busy loop wasting CPU cycles. I wonder if we should wait on the numPendingRequests and notify on it once we update it? (Sorry that I bring this up in the second review)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102546521

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java —
          @@ -0,0 +1,46 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.elasticsearch.util;
          +
          +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
          +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
          +import org.apache.flink.util.ExceptionUtils;
          +import org.elasticsearch.action.ActionRequest;
          +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
          +
          +/**
          + * An

          {@link ActionRequestFailureHandler}

          that re-adds requests that failed due to temporary
          + *

          {@link EsRejectedExecutionException}

          s (which means that Elasticsearch node queues are currently full),
          + * and fails for all other failures.
          + */
          +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
          +
          + private static final long serialVersionUID = -7423562912824511906L;
          +
          + @Override
          + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
          + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
          + indexer.add(action);
          — End diff –

          Do you think this is worth a LOG.debug statement?
          Or will it happen too often / is too uninformative?

          I wonder if we could use the metrics system for exposing stuff like error rate, retry rate etc. (Maybe we should file a JIRA for the ElasticSearch connectors to "metricify" them)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102546521 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java — @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.util.ExceptionUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +/** + * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary + * {@link EsRejectedExecutionException} s (which means that Elasticsearch node queues are currently full), + * and fails for all other failures. + */ +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = -7423562912824511906L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { + indexer.add(action); — End diff – Do you think this is worth a LOG.debug statement? Or will it happen too often / is too uninformative? I wonder if we could use the metrics system for exposing stuff like error rate, retry rate etc. (Maybe we should file a JIRA for the ElasticSearch connectors to "metricify" them)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102544239

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -92,40 +152,59 @@
          /** Call bridge for different version-specfic */
          private final ElasticsearchApiCallBridge callBridge;

          + /**
          + * Number of pending action requests not yet acknowledged by Elasticsearch.
          + * This value is maintained only if

          {@link ElasticsearchSinkBase#flushOnCheckpoint}

          is

          {@code true}

          .
          + *
          + * This is incremented whenever the user adds (or re-adds through the

          {@link ActionRequestFailureHandler}

          ) requests
          + * to the

          {@link RequestIndexer}

          . It is decremented for each completed request of a bulk request, in
          + *

          {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)}

          and
          + *

          {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)}

          .
          + */
          + private AtomicLong numPendingRequests = new AtomicLong(0);
          +
          /** Elasticsearch client created using the call bridge. */
          private transient Client client;

          /** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */
          private transient BulkProcessor bulkProcessor;

          /**

          • * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks.
            + * This is set from inside the {@link BulkProcessor.Listener}

            if a

            {@link Throwable}

            was thrown in callbacks and
            + * the user considered it should fail the sink via the
            + *

            {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)}

            method.
            + *
            + * Errors will be checked and rethrown before processing each input element, and when the sink is closed.
            */
            private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

          public ElasticsearchSinkBase(
          ElasticsearchApiCallBridge callBridge,
          Map<String, String> userConfig,

          • ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
            + ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
            + ActionRequestFailureHandler failureHandler) {

          this.callBridge = checkNotNull(callBridge);
          this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
          + this.failureHandler = checkNotNull(failureHandler);

          • // we eagerly check if the user-provided sink function is serializable;
          • // otherwise, if it isn't serializable, users will merely get a non-informative error message
            + // we eagerly check if the user-provided sink function and failure handler is serializable;
            + // otherwise, if they aren't serializable, users will merely get a non-informative error message
            // "ElasticsearchSinkBase is not serializable"
          • try { - InstantiationUtil.serializeObject(elasticsearchSinkFunction); - }

            catch (Exception e)

            { - throw new IllegalArgumentException( - "The implementation of the provided ElasticsearchSinkFunction is not serializable. " + - "The object probably contains or references non serializable fields."); - }
          • checkNotNull(userConfig);
            + checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction),
            + "The implementation of the provided ElasticsearchSinkFunction is not serializable. " +
            + "The object probably contains or references non-serializable fields.");
            +
            + checkArgument(InstantiationUtil.isSerializable(failureHandler),
            + "The implementation of the provided ActionRequestFailureHandler is not serializable. " +
            + "The object probably contains or references non-serializable fields.");
              • End diff –

          That's so much nicer now

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102544239 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -92,40 +152,59 @@ /** Call bridge for different version-specfic */ private final ElasticsearchApiCallBridge callBridge; + /** + * Number of pending action requests not yet acknowledged by Elasticsearch. + * This value is maintained only if {@link ElasticsearchSinkBase#flushOnCheckpoint} is {@code true} . + * + * This is incremented whenever the user adds (or re-adds through the {@link ActionRequestFailureHandler} ) requests + * to the {@link RequestIndexer} . It is decremented for each completed request of a bulk request, in + * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, BulkResponse)} and + * {@link BulkProcessor.Listener#afterBulk(long, BulkRequest, Throwable)} . + */ + private AtomicLong numPendingRequests = new AtomicLong(0); + /** Elasticsearch client created using the call bridge. */ private transient Client client; /** Bulk processor to buffer and send requests to Elasticsearch, created using the client. */ private transient BulkProcessor bulkProcessor; /** * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks. + * This is set from inside the {@link BulkProcessor.Listener} if a {@link Throwable} was thrown in callbacks and + * the user considered it should fail the sink via the + * {@link ActionRequestFailureHandler#onFailure(ActionRequest, Throwable, int, RequestIndexer)} method. + * + * Errors will be checked and rethrown before processing each input element, and when the sink is closed. */ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); public ElasticsearchSinkBase( ElasticsearchApiCallBridge callBridge, Map<String, String> userConfig, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) { + ElasticsearchSinkFunction<T> elasticsearchSinkFunction, + ActionRequestFailureHandler failureHandler) { this.callBridge = checkNotNull(callBridge); this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction); + this.failureHandler = checkNotNull(failureHandler); // we eagerly check if the user-provided sink function is serializable; // otherwise, if it isn't serializable, users will merely get a non-informative error message + // we eagerly check if the user-provided sink function and failure handler is serializable; + // otherwise, if they aren't serializable, users will merely get a non-informative error message // "ElasticsearchSinkBase is not serializable" try { - InstantiationUtil.serializeObject(elasticsearchSinkFunction); - } catch (Exception e) { - throw new IllegalArgumentException( - "The implementation of the provided ElasticsearchSinkFunction is not serializable. " + - "The object probably contains or references non serializable fields."); - } checkNotNull(userConfig); + checkArgument(InstantiationUtil.isSerializable(elasticsearchSinkFunction), + "The implementation of the provided ElasticsearchSinkFunction is not serializable. " + + "The object probably contains or references non-serializable fields."); + + checkArgument(InstantiationUtil.isSerializable(failureHandler), + "The implementation of the provided ActionRequestFailureHandler is not serializable. " + + "The object probably contains or references non-serializable fields."); End diff – That's so much nicer now
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102647565

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java —
          @@ -0,0 +1,46 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.elasticsearch.util;
          +
          +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
          +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
          +import org.apache.flink.util.ExceptionUtils;
          +import org.elasticsearch.action.ActionRequest;
          +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
          +
          +/**
          + * An

          {@link ActionRequestFailureHandler}

          that re-adds requests that failed due to temporary
          + *

          {@link EsRejectedExecutionException}

          s (which means that Elasticsearch node queues are currently full),
          + * and fails for all other failures.
          + */
          +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
          +
          + private static final long serialVersionUID = -7423562912824511906L;
          +
          + @Override
          + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
          + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
          + indexer.add(action);
          — End diff –

          The `BulkProcessor` listener actually logs them as LOG.error before they are processed by the failure handler (line 171 and line 180). So, these failures are always logged regardless of whether the failure handler chooses to log them. Do you think that's ok?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102647565 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java — @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.util.ExceptionUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +/** + * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary + * {@link EsRejectedExecutionException} s (which means that Elasticsearch node queues are currently full), + * and fails for all other failures. + */ +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = -7423562912824511906L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { + indexer.add(action); — End diff – The `BulkProcessor` listener actually logs them as LOG.error before they are processed by the failure handler (line 171 and line 180). So, these failures are always logged regardless of whether the failure handler chooses to log them. Do you think that's ok?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102647903

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java —
          @@ -0,0 +1,46 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.elasticsearch.util;
          +
          +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
          +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
          +import org.apache.flink.util.ExceptionUtils;
          +import org.elasticsearch.action.ActionRequest;
          +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
          +
          +/**
          + * An

          {@link ActionRequestFailureHandler}

          that re-adds requests that failed due to temporary
          + *

          {@link EsRejectedExecutionException}

          s (which means that Elasticsearch node queues are currently full),
          + * and fails for all other failures.
          + */
          +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
          +
          + private static final long serialVersionUID = -7423562912824511906L;
          +
          + @Override
          + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
          + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
          + indexer.add(action);
          — End diff –

          Regarding the frequency of `EsRejectedExecutionException`, from my experience with ES before, they pop up a lot with under-resourced / configured ES clusters.

          It can flood logs if it isn't treated accordingly, but not logging them can be bad too because you'll know nothing about it, unless the sink eventually fails with it.

          We could also remove the failure logging from the `ElasticsearchSinkBase` and let the user be responsible for that, but I'm a bit undecided here. Open to suggestions for this!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102647903 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java — @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.util.ExceptionUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +/** + * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary + * {@link EsRejectedExecutionException} s (which means that Elasticsearch node queues are currently full), + * and fails for all other failures. + */ +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = -7423562912824511906L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { + indexer.add(action); — End diff – Regarding the frequency of `EsRejectedExecutionException`, from my experience with ES before, they pop up a lot with under-resourced / configured ES clusters. It can flood logs if it isn't treated accordingly, but not logging them can be bad too because you'll know nothing about it, unless the sink eventually fails with it. We could also remove the failure logging from the `ElasticsearchSinkBase` and let the user be responsible for that, but I'm a bit undecided here. Open to suggestions for this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102648005

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java —
          @@ -0,0 +1,46 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.streaming.connectors.elasticsearch.util;
          +
          +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
          +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
          +import org.apache.flink.util.ExceptionUtils;
          +import org.elasticsearch.action.ActionRequest;
          +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
          +
          +/**
          + * An

          {@link ActionRequestFailureHandler}

          that re-adds requests that failed due to temporary
          + *

          {@link EsRejectedExecutionException}

          s (which means that Elasticsearch node queues are currently full),
          + * and fails for all other failures.
          + */
          +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler {
          +
          + private static final long serialVersionUID = -7423562912824511906L;
          +
          + @Override
          + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
          + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
          + indexer.add(action);
          — End diff –

          Metricifing the ES connectors seems like a good idea, especially with its growing popularity. I'll think about it and file a JIRA with some initial proposals.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102648005 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java — @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch.util; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.util.ExceptionUtils; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; + +/** + * An {@link ActionRequestFailureHandler} that re-adds requests that failed due to temporary + * {@link EsRejectedExecutionException} s (which means that Elasticsearch node queues are currently full), + * and fails for all other failures. + */ +public class RetryRejectedExecutionFailureHandler implements ActionRequestFailureHandler { + + private static final long serialVersionUID = -7423562912824511906L; + + @Override + public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable { + if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) { + indexer.add(action); — End diff – Metricifing the ES connectors seems like a good idea, especially with its growing popularity. I'll think about it and file a JIRA with some initial proposals.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102654126

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
          }

          @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + // no initialization needed + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception {
          + checkErrorAndRethrow();
          +
          + if (flushOnCheckpoint) {
          + do {
          + bulkProcessor.flush();
          — End diff –

          Ah, I see the problem here ...
          The bulk processor's internal `bulkRequest.numberOfActions() == 0` will become `true` as soon as it starts executing the flush, and not after `afterBulk` is invoked.

          So, since our `numPendingRequests` implementation relies on the `afterBulk` callback, we might have busy loops on `bulkProcessor.flush()` while we wait for `numPendingRequests` to become 0.

          This is quite a nice catch actually! So no worries on bringing it up now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102654126 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception { } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); — End diff – Ah, I see the problem here ... The bulk processor's internal `bulkRequest.numberOfActions() == 0` will become `true` as soon as it starts executing the flush, and not after `afterBulk` is invoked. So, since our `numPendingRequests` implementation relies on the `afterBulk` callback, we might have busy loops on `bulkProcessor.flush()` while we wait for `numPendingRequests` to become 0. This is quite a nice catch actually! So no worries on bringing it up now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102654252

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
          }

          @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + // no initialization needed + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception {
          + checkErrorAndRethrow();
          +
          + if (flushOnCheckpoint) {
          + do {
          + bulkProcessor.flush();
          — End diff –

          Waiting on `numPendingRequests` makes sense, I'll try and see if it works out.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102654252 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception { } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); — End diff – Waiting on `numPendingRequests` makes sense, I'll try and see if it works out.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102656903

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
          }

          @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + // no initialization needed + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception {
          + checkErrorAndRethrow();
          +
          + if (flushOnCheckpoint) {
          + do {
          + bulkProcessor.flush();
          — End diff –

          On a second look, I think my previous statement is incorrect.

          To elaborate, this is the way the `BulkProcessor`'s `flush` is implemented:
          ```
          if(this.bulkRequest.numberOfActions() > 0)

          { this.execute(); }

          ```

          `execute()` doesn't return until `afterBulk` is called on the listener. Since we can re-add requests to the bulk processor within `afterBulk`, the `bulkRequest.numberOfActions() > 0` will be true again and enters the loop.

          Therefore, the `bulkProcessor.flush()` can actually just be called once, and will work with our failure-handler re-adding strategy so that the flush also waits for re-added requests before returning. We can just check once on `numPendingRequests` after the flush to make sure the flush works as expected.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102656903 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception { } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); — End diff – On a second look, I think my previous statement is incorrect. To elaborate, this is the way the `BulkProcessor`'s `flush` is implemented: ``` if(this.bulkRequest.numberOfActions() > 0) { this.execute(); } ``` `execute()` doesn't return until `afterBulk` is called on the listener. Since we can re-add requests to the bulk processor within `afterBulk`, the `bulkRequest.numberOfActions() > 0` will be true again and enters the loop. Therefore, the `bulkProcessor.flush()` can actually just be called once, and will work with our failure-handler re-adding strategy so that the flush also waits for re-added requests before returning. We can just check once on `numPendingRequests` after the flush to make sure the flush works as expected.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102662136

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
          }

          @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + // no initialization needed + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception {
          + checkErrorAndRethrow();
          +
          + if (flushOnCheckpoint) {
          + do {
          + bulkProcessor.flush();
          — End diff –

          Following my arguments above, I think the busy loop you mentioned shouldn't happen, because bulk processor's internal `bulkRequest.numberOfActions()` should always be synced with our `numPendingRecords`. (i.e., it should not occur that `bulkRequest.numberOfActions() == 0` but our own `numPendingRecords != 0`).

          So in that case, if `bulkRequest.numberOfActions() == 0` then my original loop implementation just fallbacks to a single pass with 2 condition checks.

          To a certain extent, I think it might be better to stick to the original loop implementation, so that we're not locked-in with how the `BulkProcessor`'s flush is implemented. As you can see from a commit I just pushed (2956f99) which modifies the mock bulk processor in tests to correctly mimic the flushing behaviour I described above, the loop implementation still pass the tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102662136 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception { } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); — End diff – Following my arguments above, I think the busy loop you mentioned shouldn't happen, because bulk processor's internal `bulkRequest.numberOfActions()` should always be synced with our `numPendingRecords`. (i.e., it should not occur that `bulkRequest.numberOfActions() == 0` but our own `numPendingRecords != 0`). So in that case, if `bulkRequest.numberOfActions() == 0` then my original loop implementation just fallbacks to a single pass with 2 condition checks. To a certain extent, I think it might be better to stick to the original loop implementation, so that we're not locked-in with how the `BulkProcessor`'s flush is implemented. As you can see from a commit I just pushed (2956f99) which modifies the mock bulk processor in tests to correctly mimic the flushing behaviour I described above, the loop implementation still pass the tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3358#discussion_r102905100

          — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java —
          @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception {
          }

          @Override
          + public void initializeState(FunctionInitializationContext context) throws Exception

          { + // no initialization needed + }

          +
          + @Override
          + public void snapshotState(FunctionSnapshotContext context) throws Exception {
          + checkErrorAndRethrow();
          +
          + if (flushOnCheckpoint) {
          + do {
          + bulkProcessor.flush();
          — End diff –

          Thanks a lot for looking into this in detail.
          I think calling flush() this way is okay then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102905100 — Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java — @@ -211,6 +283,23 @@ public void invoke(T value) throws Exception { } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + // no initialization needed + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkErrorAndRethrow(); + + if (flushOnCheckpoint) { + do { + bulkProcessor.flush(); — End diff – Thanks a lot for looking into this in detail. I think calling flush() this way is okay then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3358

          +1 to merge.

          Thank you for answering all my comments so detailed!

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3358 +1 to merge. Thank you for answering all my comments so detailed!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3358

          Merging this to `master` 🎉 ..

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3358 Merging this to `master` 🎉 ..
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3358

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3358
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved in master via http://git-wip-us.apache.org/repos/asf/flink/2437da6

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development