Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5122

Elasticsearch Sink loses documents when cluster has high load

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.3.0
    • Component/s: Streaming Connectors
    • Labels:
      None

      Description

      My cluster had high load and documents got not indexed. This violates the "at least once" semantics in the ES connector.

      I gave pressure on my cluster to test Flink, causing new indices to be created and balanced. On those errors the bulk should be tried again instead of being discarded.

      Primary shard not active because ES decided to rebalance the index:
      2016-11-15 15:35:16,123 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - Failed to index document in Elasticsearch: UnavailableShardsException[[index-name][3] primary shard is not active Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] requests]]

      Bulk queue on node full (I set queue to a low value to reproduce error):
      22:37:57,702 ERROR org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink - Failed to index document in Elasticsearch: RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@727e677c on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];

      I can try to propose a PR for this.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Thanks for reporting this issue static-max.

          Would be great if you'd provide a fix. I gave you contributor permissions for JIRA, so you can assign this issue to yourself is you want to work on it.
          Thanks, Fabian

          Show
          fhueske Fabian Hueske added a comment - Thanks for reporting this issue static-max . Would be great if you'd provide a fix. I gave you contributor permissions for JIRA, so you can assign this issue to yourself is you want to work on it. Thanks, Fabian
          Hide
          f.pompermaier Flavio Pompermaier added a comment -

          Can you try and see if our pull request fix your issue (https://github.com/apache/flink/pull/2790/)? We also had this problem of missing documents and the problem was the RunTimeException thrown by the close().

          Show
          f.pompermaier Flavio Pompermaier added a comment - Can you try and see if our pull request fix your issue ( https://github.com/apache/flink/pull/2790/)? We also had this problem of missing documents and the problem was the RunTimeException thrown by the close().
          Hide
          melmoth static-max added a comment -

          Your PR does not handle single failed bulk requests, as far as I can see.
          But your PR makes me think about the whole error handling concept in the ES sink. hasFailure will only be checked on close(), which does not make sense to me. Would it be smarter to throw the Exception immediately when a document cannot be indexed? This way Flink will restart the job from a checkpoint and try again.

          Show
          melmoth static-max added a comment - Your PR does not handle single failed bulk requests, as far as I can see. But your PR makes me think about the whole error handling concept in the ES sink. hasFailure will only be checked on close(), which does not make sense to me. Would it be smarter to throw the Exception immediately when a document cannot be indexed? This way Flink will restart the job from a checkpoint and try again.
          Hide
          f.pompermaier Flavio Pompermaier added a comment -

          Actually the current ES sink implementation check in afterBulk if any error occurred. If so, it keeps track of this and then in the close() it throws an Exception. It is right or wrong depending on the indexing policy you chose: if there's a bad document and you want all indexing to abort it is wrong, if for you it is ok, otherwise it is wrong. Probably it could be useful to retry to index the failed bulk if the occurred error is one of those indicating a cluster congestion (as in your use case), but I don't know whether this is possible or not

          Show
          f.pompermaier Flavio Pompermaier added a comment - Actually the current ES sink implementation check in afterBulk if any error occurred. If so, it keeps track of this and then in the close() it throws an Exception. It is right or wrong depending on the indexing policy you chose: if there's a bad document and you want all indexing to abort it is wrong, if for you it is ok, otherwise it is wrong. Probably it could be useful to retry to index the failed bulk if the occurred error is one of those indicating a cluster congestion (as in your use case), but I don't know whether this is possible or not
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user static-max opened a pull request:

          https://github.com/apache/flink/pull/2861

          FLINK-5122 Index requests will be retried if the error is only temp…

          This PR will re-add index requests to the BulkProcessor if the error is temporay, like

          • Generel timeout errors
          • No master
          • UnavailableShardsException (Rebalancing, Node down)
          • Bulk queue on node full

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/static-max/flink flink-connector-elasticsearch2-robust

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2861.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2861


          commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d
          Author: Max Kuklinski <max.kuklinski@live.de>
          Date: 2016-11-23T16:54:11Z

          FLINK-5122 Index requests will be retried if the error is only temporary on Elasticsearch side. Covered are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user static-max opened a pull request: https://github.com/apache/flink/pull/2861 FLINK-5122 Index requests will be retried if the error is only temp… This PR will re-add index requests to the BulkProcessor if the error is temporay, like Generel timeout errors No master UnavailableShardsException (Rebalancing, Node down) Bulk queue on node full You can merge this pull request into a Git repository by running: $ git pull https://github.com/static-max/flink flink-connector-elasticsearch2-robust Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2861.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2861 commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d Author: Max Kuklinski <max.kuklinski@live.de> Date: 2016-11-23T16:54:11Z FLINK-5122 Index requests will be retried if the error is only temporary on Elasticsearch side. Covered are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full
          Hide
          f.pompermaier Flavio Pompermaier added a comment -

          Hi @static-max,
          we've closed our PR to add the possibility to configure the number of shards and replicas of an ES index (FLINK-4491) moving out such an Helper class into a public repository (https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/elasticsearch/ElasticsearchHelper.java) to help anyone needing such a feature (as suggested by Fabian Hueske).

          However, since in that PR we've suggested to remove the RuntimeException in the close() to not lose any bulk (and this is more related to this ticket) I suggest to check if that fix could be needed also in yout PR (and make this lenient/strict behaviour configurable as suggested by Robert Metzger).

          Best,
          Flavio

          Show
          f.pompermaier Flavio Pompermaier added a comment - Hi @static-max, we've closed our PR to add the possibility to configure the number of shards and replicas of an ES index ( FLINK-4491 ) moving out such an Helper class into a public repository ( https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/elasticsearch/ElasticsearchHelper.java ) to help anyone needing such a feature (as suggested by Fabian Hueske ). However, since in that PR we've suggested to remove the RuntimeException in the close() to not lose any bulk (and this is more related to this ticket) I suggest to check if that fix could be needed also in yout PR (and make this lenient/strict behaviour configurable as suggested by Robert Metzger ). Best, Flavio
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r90916759

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
              • End diff –

          This string-based error matching seems to be a pretty unstable mechanism.
          Can you add a flag to control whether the mechanism is enabled, and disable it by default (but document it on the ES connector page)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916759 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors End diff – This string-based error matching seems to be a pretty unstable mechanism. Can you add a flag to control whether the mechanism is enabled, and disable it by default (but document it on the ES connector page)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r90916177

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + ) {
            + LOG.debug("Retry batch: " + itemResp.getFailureMessage());
              • End diff –

          I would log here at a higher logging level.
          Also, could you not use string concatenation here and use the "Retry batch: {}", itemResp.getFailureMessage()); pattern?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916177 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); End diff – I would log here at a higher logging level. Also, could you not use string concatenation here and use the "Retry batch: {}", itemResp.getFailureMessage()); pattern?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r90917039

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + }

            else

            { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + }

            }
            }

          • hasFailure.set(true);
            + if (!allRequestsRepeatable) { + hasFailure.set(true); + }

            }
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error(failure.getMessage());
          • failureThrowable.compareAndSet(null, failure);
          • hasFailure.set(true);
            + if (failure instanceof ClusterBlockException // Examples: "no master"
            + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
            + )
            + {
            + LOG.debug("Retry batch on throwable: " + failure.getMessage());
              • End diff –

          String concat

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917039 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } } } hasFailure.set(true); + if (!allRequestsRepeatable) { + hasFailure.set(true); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error(failure.getMessage()); failureThrowable.compareAndSet(null, failure); hasFailure.set(true); + if (failure instanceof ClusterBlockException // Examples: "no master" + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet + ) + { + LOG.debug("Retry batch on throwable: " + failure.getMessage()); End diff – String concat
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r90916979

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + }

            else { // Cannot retry action
            + allRequestsRepeatable = false;
            + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());

              • End diff –

          {} instead of string concat.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90916979 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); End diff – {} instead of string concat.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r90917049

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + }

            else

            { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + }

            }
            }

          • hasFailure.set(true);
            + if (!allRequestsRepeatable) { + hasFailure.set(true); + }

            }
            }

          @Override
          public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

          • LOG.error(failure.getMessage());
          • failureThrowable.compareAndSet(null, failure);
          • hasFailure.set(true);
            + if (failure instanceof ClusterBlockException // Examples: "no master"
            + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet
            + )
            + { + LOG.debug("Retry batch on throwable: " + failure.getMessage()); + reAddBulkRequest(request); + }

            else {
            + LOG.error("Failed to index bulk in Elasticsearch. " + failure.getMessage());

              • End diff –

          String concat

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917049 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); + reAddBulkRequest(request); + } else { // Cannot retry action + allRequestsRepeatable = false; + LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); + failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + } } } hasFailure.set(true); + if (!allRequestsRepeatable) { + hasFailure.set(true); + } } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error(failure.getMessage()); failureThrowable.compareAndSet(null, failure); hasFailure.set(true); + if (failure instanceof ClusterBlockException // Examples: "no master" + || failure instanceof ElasticsearchTimeoutException // ElasticsearchTimeoutException sounded good, not seen in stress tests yet + ) + { + LOG.debug("Retry batch on throwable: " + failure.getMessage()); + reAddBulkRequest(request); + } else { + LOG.error("Failed to index bulk in Elasticsearch. " + failure.getMessage()); End diff – String concat
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r90917303

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
          requestIndexer = new BulkProcessorIndexer(bulkProcessor);
          }

          + /**
          + * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
          + * @param bulkRequest
          + */
          + public void reAddBulkRequest(BulkRequest bulkRequest) {
          + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
          — End diff –

          So what about this TODO? Can we somehow filter these requests?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r90917303 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** + * Adds all requests of the bulk to the BulkProcessor. Used when trying again. + * @param bulkRequest + */ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. — End diff – So what about this TODO? Can we somehow filter these requests?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r91963737

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
              • End diff –

          I didn't find an alternative to check strings. I will add a flag an disable it by default.

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91963737 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors End diff – I didn't find an alternative to check strings. I will add a flag an disable it by default.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r91964180

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + ) {
            + LOG.debug("Retry batch: " + itemResp.getFailureMessage());
              • End diff –

          What level do you suggest? Personally I don't care for retried batches as long as the data gets into my ES cluster. When logging this as INFO or WARN, the logfile will get pretty messy on a cluster with high traffic.

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91964180 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); End diff – What level do you suggest? Personally I don't care for retried batches as long as the data gets into my ES cluster. When logging this as INFO or WARN, the logfile will get pretty messy on a cluster with high traffic.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r91991599

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
          requestIndexer = new BulkProcessorIndexer(bulkProcessor);
          }

          + /**
          + * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
          + * @param bulkRequest
          + */
          + public void reAddBulkRequest(BulkRequest bulkRequest) {
          + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
          — End diff –

          Currently I'm not aware of a way to filter these requests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r91991599 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -227,6 +254,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** + * Adds all requests of the bulk to the BulkProcessor. Used when trying again. + * @param bulkRequest + */ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. — End diff – Currently I'm not aware of a way to filter these requests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2861

          I checked the elasticsearch documentation and some user forum from ES, and indeed it seems that they do not include any retry logic into their clients.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2861 I checked the elasticsearch documentation and some user forum from ES, and indeed it seems that they do not include any retry logic into their clients.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r93043759

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
              • End diff –

          Okay. I agree that there seems to be no better way to handle this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93043759 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors End diff – Okay. I agree that there seems to be no better way to handle this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r93045771

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + ) {
            + LOG.debug("Retry batch: " + itemResp.getFailureMessage());
              • End diff –

          I can not assess how often retries are needed.
          Users can also manually increase the log level if needed. So we can leave it as is.
          However, I'm wondering whether we want to include a metric that counts the number of retries that occurred.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93045771 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) { + LOG.debug("Retry batch: " + itemResp.getFailureMessage()); End diff – I can not assess how often retries are needed. Users can also manually increase the log level if needed. So we can leave it as is. However, I'm wondering whether we want to include a metric that counts the number of retries that occurred.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2861

          @fpompermaier Why is throwing exceptions in close causing document losses? As far as I can see ES is flushing all outstanding batches on close().

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2861 @fpompermaier Why is throwing exceptions in close causing document losses? As far as I can see ES is flushing all outstanding batches on close().
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on the issue:

          https://github.com/apache/flink/pull/2861

          @rmetzger Are you sure that Logstash doesn't perform any retry logic..? For example https://logstash.jira.com/browse/LOGSTASH-720 seems to implement that..
          You can test document loss in the close() method trying to index a malformed document...if you have 10 M documents and one of those is malformed is very probable that a lot of them will not be indexed (I opened a ticket only for that https://issues.apache.org/jira/browse/FLINK-5353)

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/2861 @rmetzger Are you sure that Logstash doesn't perform any retry logic..? For example https://logstash.jira.com/browse/LOGSTASH-720 seems to implement that.. You can test document loss in the close() method trying to index a malformed document...if you have 10 M documents and one of those is malformed is very probable that a lot of them will not be indexed (I opened a ticket only for that https://issues.apache.org/jira/browse/FLINK-5353 )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r93049169

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
          requestIndexer = new BulkProcessorIndexer(bulkProcessor);
          }

          + /**
          + * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
          + * @param bulkRequest
          + */
          + public void reAddBulkRequest(BulkRequest bulkRequest) {
          + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs.
          +
          + for (IndicesRequest req : bulkRequest.subRequests()) {
          + if (req instanceof ActionRequest) {
          + // There is no waiting time between index requests, so this may produce additional pressure on cluster
          + bulkProcessor.add((ActionRequest<?>) req);
          — End diff –

          Do you know if the BulkProcessor is thread safe? I assume multiple threads will add bulks concurrently (because of the calls from the callbacks)

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93049169 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) requestIndexer = new BulkProcessorIndexer(bulkProcessor); } + /** + * Adds all requests of the bulk to the BulkProcessor. Used when trying again. + * @param bulkRequest + */ + public void reAddBulkRequest(BulkRequest bulkRequest) { + //TODO Check what happens when bulk contains a DeleteAction and IndexActions and the DeleteAction fails because the document already has been deleted. This may not happen in typical Flink jobs. + + for (IndicesRequest req : bulkRequest.subRequests()) { + if (req instanceof ActionRequest) { + // There is no waiting time between index requests, so this may produce additional pressure on cluster + bulkProcessor.add((ActionRequest<?>) req); — End diff – Do you know if the BulkProcessor is thread safe? I assume multiple threads will add bulks concurrently (because of the calls from the callbacks)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r93048695

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (checkErrorAndRetryBulk && (
            + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + )
            + ) {
            + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
            + reAddBulkRequest(request);
              • End diff –

          Are you sure that the `BulkRequest` is only added once even if it contains multiple failed `BulkItemResponse`s?

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r93048695 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (checkErrorAndRetryBulk && ( + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) + ) { + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); + reAddBulkRequest(request); End diff – Are you sure that the `BulkRequest` is only added once even if it contains multiple failed `BulkItemResponse`s?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/2861

          @fpompermaier I guess logstash is just a client to ES that implement its own retry logic (similar to Flink).
          I'll check out the JIRA.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2861 @fpompermaier I guess logstash is just a client to ES that implement its own retry logic (similar to Flink). I'll check out the JIRA.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fpompermaier commented on the issue:

          https://github.com/apache/flink/pull/2861

          Indeed my goal is to replace completely Logstash with Flink becuase Flink is much faster (Logstash is the official Elasticsearch indexer). On a 10 M docs Flink takes 10 min while Logstash 30 min (on our preliminary tests)

          Show
          githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/2861 Indeed my goal is to replace completely Logstash with Flink becuase Flink is much faster (Logstash is the official Elasticsearch indexer). On a 10 M docs Flink takes 10 min while Logstash 30 min (on our preliminary tests)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2861#discussion_r94382504

          — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java —
          @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) {

          @Override
          public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          + boolean allRequestsRepeatable = true;
          if (response.hasFailures()) {
          for (BulkItemResponse itemResp : response.getItems()) {
          if (itemResp.isFailed()) {

          • LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
          • failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
            + // Check if index request can be retried
            + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
            + if (checkErrorAndRetryBulk && (
            + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
            + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
            + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
            + )
            + ) {
            + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
            + reAddBulkRequest(request);
              • End diff –

          Your're right, it gets added multiple times, I'll fix that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on a diff in the pull request: https://github.com/apache/flink/pull/2861#discussion_r94382504 — Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java — @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + boolean allRequestsRepeatable = true; if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); + // Check if index request can be retried + String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase(); + if (checkErrorAndRetryBulk && ( + failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors + || failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down + || (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full + ) + ) { + LOG.debug("Retry bulk: {}", itemResp.getFailureMessage()); + reAddBulkRequest(request); End diff – Your're right, it gets added multiple times, I'll fix that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on the issue:

          https://github.com/apache/flink/pull/2861

          The path of the connector has changed to "flink/flink-connectors/flink-connector-elasticsearch2/", how should I handle the conflict? Open a new PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 The path of the connector has changed to "flink/flink-connectors/flink-connector-elasticsearch2/", how should I handle the conflict? Open a new PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2861

          Hi @static-max, thank you for working on this, it'll be an important fix for proper at least once support for the ES connector.

          Recently, the community has agreed to first restructure the multiple ES connector version, so that important fixes like this one can be done once and for all across all versions (1.x, 2.x, and 5.x which is currently pending). Here's the discussion: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-ElasticSearch-in-Flink-Strategy-td15049.html.

          Could we wait just a little a bit on this PR, and once the ES connector refactoring is complete, we can come back and rebase this PR on that? You can follow the progress here: #2767. I'm trying to come up with the restructure PR within the next day.
          Very sorry for the extra wait needed on this, but it'll be good for the long run, hope you can understand

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2861 Hi @static-max, thank you for working on this, it'll be an important fix for proper at least once support for the ES connector. Recently, the community has agreed to first restructure the multiple ES connector version, so that important fixes like this one can be done once and for all across all versions (1.x, 2.x, and 5.x which is currently pending). Here's the discussion: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-ElasticSearch-in-Flink-Strategy-td15049.html . Could we wait just a little a bit on this PR, and once the ES connector refactoring is complete, we can come back and rebase this PR on that? You can follow the progress here: #2767. I'm trying to come up with the restructure PR within the next day. Very sorry for the extra wait needed on this, but it'll be good for the long run, hope you can understand
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on the issue:

          https://github.com/apache/flink/pull/2861

          Ok, let's wait for the restructure and rebase this PR to support at least once.
          BTW, we're using my PR in production and haven't lost a single document since then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 Ok, let's wait for the restructure and rebase this PR to support at least once. BTW, we're using my PR in production and haven't lost a single document since then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2861

          Hi @static-max! Will you be ok with me opening a PR for FLINK-5487(https://issues.apache.org/jira/browse/FLINK-5487) by basing it on this PR? Your contribution will be included in that PR. The main thing is that FLINK-5487 won't be complete without retry for temporary ES errors, and since you've already solved that in this PR, it'll be a good idea to use it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2861 Hi @static-max! Will you be ok with me opening a PR for FLINK-5487 ( https://issues.apache.org/jira/browse/FLINK-5487 ) by basing it on this PR? Your contribution will be included in that PR. The main thing is that FLINK-5487 won't be complete without retry for temporary ES errors, and since you've already solved that in this PR, it'll be a good idea to use it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on the issue:

          https://github.com/apache/flink/pull/2861

          Hi @tzulitai, sure, go ahead

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 Hi @tzulitai, sure, go ahead
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2861

          Small note (would be great if @static-max can clarify this also) on matching the exceptions:
          We can actually use `BulkItemResponse.getFailure().getCause()` to get a `Throwable` that we can use to match the exceptions, instead of unstable String matching.

          After surfing the Javadocs a bit, I think the below exceptions refer to what you were trying to catch before:

          • `EsRejectedExecutionException`: the queue on ES node is temporarily full
          • `ElasticsearchTimeoutException`: timeout from Elasticsearch
          • `ClusterBlockException`: no master
          • `UnavailableShardsException` - currently no shards available

          @static-max do you think the above exception types are reasonable to be considered "temporary" (have you seen them before in logs)? I'm personally a bit unsure of the last two. I don't have that much experience with operating an Elasticsearch cluster, and their Javadocs really don't say much, so some input from you will be helpful

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2861 Small note (would be great if @static-max can clarify this also) on matching the exceptions: We can actually use `BulkItemResponse.getFailure().getCause()` to get a `Throwable` that we can use to match the exceptions, instead of unstable String matching. After surfing the Javadocs a bit, I think the below exceptions refer to what you were trying to catch before: `EsRejectedExecutionException`: the queue on ES node is temporarily full `ElasticsearchTimeoutException`: timeout from Elasticsearch `ClusterBlockException`: no master `UnavailableShardsException` - currently no shards available @static-max do you think the above exception types are reasonable to be considered "temporary" (have you seen them before in logs)? I'm personally a bit unsure of the last two. I don't have that much experience with operating an Elasticsearch cluster, and their Javadocs really don't say much, so some input from you will be helpful
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on the issue:

          https://github.com/apache/flink/pull/2861

          In my tests BulkItemResponse.getFailure().getCause() returns a RemoteTransportException like this:
          `RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s]]]; nested: RemoteTransportException[[Harrier][127.0.0.1:9302][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@3a0f3a6e on EsThreadPoolExecutor[bulk, queue capacity = 2, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5ac266bd[Running, pool size = 8, active threads = 8, queued tasks = 2, completed tasks = 206]]];`

          So the nested Exception needs to be checked. That's possible, I will implement that change.

          The last to Exceptions are common when a new Index gets created (if you have new index by day for example), or when a node leaves the cluster and no master can be elected (no quorum),

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 In my tests BulkItemResponse.getFailure().getCause() returns a RemoteTransportException like this: `RemoteTransportException[ [Harrier] [127.0.0.1:9302] [indices:data/write/bulk [s] ]]; nested: RemoteTransportException[ [Harrier] [127.0.0.1:9302] [indices:data/write/bulk [s] [p] ]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@3a0f3a6e on EsThreadPoolExecutor[bulk, queue capacity = 2, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@5ac266bd [Running, pool size = 8, active threads = 8, queued tasks = 2, completed tasks = 206] ]];` So the nested Exception needs to be checked. That's possible, I will implement that change. The last to Exceptions are common when a new Index gets created (if you have new index by day for example), or when a node leaves the cluster and no master can be elected (no quorum),
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2861

          Thank you for the detail information! I've actually already started some work on top of your commits in this PR, so I think it'll be easier to proceed to implement the nested exceptions checks on my side.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2861 Thank you for the detail information! I've actually already started some work on top of your commits in this PR, so I think it'll be easier to proceed to implement the nested exceptions checks on my side.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max commented on the issue:

          https://github.com/apache/flink/pull/2861

          @tzulitai OK, if you need any help feel free to ask.
          Are there plans to switch from the TransportClient to a pure HTTP client? That would reduce the Elasticsearch dependencies and would decouple the cluster's version from the TransportClient version used by Flink. In that case we won't get a Throwable anymore.

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max commented on the issue: https://github.com/apache/flink/pull/2861 @tzulitai OK, if you need any help feel free to ask. Are there plans to switch from the TransportClient to a pure HTTP client? That would reduce the Elasticsearch dependencies and would decouple the cluster's version from the TransportClient version used by Flink. In that case we won't get a Throwable anymore.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2861

          Thanks! There wasn't any discussion on switching to a pure HTTP client, but we might need to take into consideration the effort it will take and benefits the result will offer if we were to go for that.

          Regarding handling the `Throwable`: I'm currently trying out `BulkItemResponse.getFailure().getStatus` which returns a `RestStatus`. From the Javadocs, it seems like we can just target some of the status codes: https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/2.3.5. I'm currently thinking about just handling `TOO_MANY_REQUESTS` (ex. EsRejectedExecutionEx is one of these) and `INTERNAL_SERVER_ERROR` (the timeout exception has this code). What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2861 Thanks! There wasn't any discussion on switching to a pure HTTP client, but we might need to take into consideration the effort it will take and benefits the result will offer if we were to go for that. Regarding handling the `Throwable`: I'm currently trying out `BulkItemResponse.getFailure().getStatus` which returns a `RestStatus`. From the Javadocs, it seems like we can just target some of the status codes: https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/2.3.5 . I'm currently thinking about just handling `TOO_MANY_REQUESTS` (ex. EsRejectedExecutionEx is one of these) and `INTERNAL_SERVER_ERROR` (the timeout exception has this code). What do you think?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

          I would like to handle this issue together with FLINK-5353 with a different approach: let the user provide a FailedActionRequestHandler that implements how to deal with an action request that failed, ex. drop it or re-add it to the BulkProcessor.

          The reason for this is that there is actually quite a variety of different reasons an action request can fail, and for different cases, can be treated to be "temporary" differently. For example, in FLINK-5353, malformed documents can somewhat be "temporary" if the erroneous field is reprocessed. Instead of handling these case by case, I propose to let user implement logic for them.

          The handler will look something like this:

          public interface FailedActionRequestHandler {
              boolean onFailure(ActionRequest originalRequest, Throwable failure, RequestIndexer indexer);
          }
          

          The ElasticsearchSink will still try to retry a bulk request (with backoff) for obvious temporary errors like EsRejectedExecutionException, and will only call onFailure after the retries. There the user can decide whether they want to re-add it to be requested through the RequestIndexer or just drop it. The method should return true / false depending on whether they'd like to fail the sink because of that failure.

          What do you think? Sorry for being picky about how to resolve this. I think it'll be best to find a good long-term solution, as from the current state of the ES issues I have a feeling that things will start to get unmaintainable once new exception handling cases pop out, so it'll be helpful to know what actual ES Flink users think of the idea.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - - edited I would like to handle this issue together with FLINK-5353 with a different approach: let the user provide a FailedActionRequestHandler that implements how to deal with an action request that failed, ex. drop it or re-add it to the BulkProcessor . The reason for this is that there is actually quite a variety of different reasons an action request can fail, and for different cases, can be treated to be "temporary" differently. For example, in FLINK-5353 , malformed documents can somewhat be "temporary" if the erroneous field is reprocessed. Instead of handling these case by case, I propose to let user implement logic for them. The handler will look something like this: public interface FailedActionRequestHandler { boolean onFailure(ActionRequest originalRequest, Throwable failure, RequestIndexer indexer); } The ElasticsearchSink will still try to retry a bulk request (with backoff) for obvious temporary errors like EsRejectedExecutionException , and will only call onFailure after the retries. There the user can decide whether they want to re-add it to be requested through the RequestIndexer or just drop it. The method should return true / false depending on whether they'd like to fail the sink because of that failure. What do you think? Sorry for being picky about how to resolve this. I think it'll be best to find a good long-term solution, as from the current state of the ES issues I have a feeling that things will start to get unmaintainable once new exception handling cases pop out, so it'll be helpful to know what actual ES Flink users think of the idea.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/2861

          Hi @static-max!
          Your contribution has been merged with aaac7c2 and 3743e89.
          Thanks a lot for your work! Could you please manually close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2861 Hi @static-max! Your contribution has been merged with aaac7c2 and 3743e89. Thanks a lot for your work! Could you please manually close this PR?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved in master via http://git-wip-us.apache.org/repos/asf/flink/3743e89
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user static-max closed the pull request at:

          https://github.com/apache/flink/pull/2861

          Show
          githubbot ASF GitHub Bot added a comment - Github user static-max closed the pull request at: https://github.com/apache/flink/pull/2861

            People

            • Assignee:
              melmoth static-max
              Reporter:
              melmoth static-max
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development