Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5353

Elasticsearch Sink loses well-formed documents when there are malformed documents

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.3
    • Fix Version/s: 1.3.0
    • Component/s: Streaming Connectors
    • Labels:
      None

      Issue Links

        Activity

        Hide
        f.pompermaier Flavio Pompermaier added a comment -

        The ES connector should be made tolerant to malformed documents and, thus, the close() method should not throw a RuntimeException (if configured to be lenient)

        Show
        f.pompermaier Flavio Pompermaier added a comment - The ES connector should be made tolerant to malformed documents and, thus, the close() method should not throw a RuntimeException (if configured to be lenient)
        Hide
        rmetzger Robert Metzger added a comment -

        Do you have any information why ES is loosing those messages?

        Show
        rmetzger Robert Metzger added a comment - Do you have any information why ES is loosing those messages?
        Hide
        f.pompermaier Flavio Pompermaier added a comment -

        I just remember that in the afterBulk the error was thrown somehow but I don't remember if it was a typed or a generic Exception..You can try to customize a mapping for an index (e.g. set a field of type int) and then index a document that doesn't respect that mapping.
        We shared a simple admin client to do that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/elasticsearch/ElasticsearchHelper.java

        Show
        f.pompermaier Flavio Pompermaier added a comment - I just remember that in the afterBulk the error was thrown somehow but I don't remember if it was a typed or a generic Exception..You can try to customize a mapping for an index (e.g. set a field of type int) and then index a document that doesn't respect that mapping. We shared a simple admin client to do that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/elasticsearch/ElasticsearchHelper.java
        Hide
        rmetzger Robert Metzger added a comment -

        Okay, I will do some experiments to see what's going on

        Show
        rmetzger Robert Metzger added a comment - Okay, I will do some experiments to see what's going on
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

        Hi Flavio Pompermaier, first some clarification will be helpful:

        1. Was the error somehow thrown in afterBulk, causing the sink to fail immediately? Previously we only keep the reported error, and only rethrow it on close(), so what your original description suggests seems a bit odd to me.

        2. Which Elasticsearch version are you using? We're using the Elasticsearch BulkProcessor internally, and the expected behaviour is that one or more failures in the bulk should not affect the other good documents to be properly requested. So it might be that an older version of Elasticsearch's BulkProcessor is not following this expected behaviour.

        3. Could you confirm if the exception was ElasticsearchParseException? I'm trying to find the exception type for malformed documents.

        4. Have you tried setting the ignore_malformed config for your indices / fields? https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html.

        Now, about the possible approach:
        With all the different types of exceptions that may occur at Elasticsearch, instead of handling them case by case in the connector code, I think it'll be reasonable to have an user-provided handler / callback for these failed documents, and let the user decide on how to handle them: either drop them, or re-process and re-add them to the RequestIndexer.

        What do you think about this? Would be great to hear from actual ES users so that we can find an optimal, future change-proof solution here

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Hi Flavio Pompermaier , first some clarification will be helpful: 1. Was the error somehow thrown in afterBulk , causing the sink to fail immediately? Previously we only keep the reported error, and only rethrow it on close() , so what your original description suggests seems a bit odd to me. 2. Which Elasticsearch version are you using? We're using the Elasticsearch BulkProcessor internally, and the expected behaviour is that one or more failures in the bulk should not affect the other good documents to be properly requested. So it might be that an older version of Elasticsearch's BulkProcessor is not following this expected behaviour. 3. Could you confirm if the exception was ElasticsearchParseException ? I'm trying to find the exception type for malformed documents. 4. Have you tried setting the ignore_malformed config for your indices / fields? https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html . Now, about the possible approach: With all the different types of exceptions that may occur at Elasticsearch, instead of handling them case by case in the connector code, I think it'll be reasonable to have an user-provided handler / callback for these failed documents, and let the user decide on how to handle them: either drop them, or re-process and re-add them to the RequestIndexer . What do you think about this? Would be great to hear from actual ES users so that we can find an optimal, future change-proof solution here
        Hide
        f.pompermaier Flavio Pompermaier added a comment -

        My previous PR was https://github.com/apache/flink/pull/2790.

        The error was thrown in afterBulk and hasFailure was set to true. Then close() was throwing a RunTimeException also for a single malformed document and the entire job used to fail also for a single malformed document.
        Maybe that PR could be useful to test the error. Unfortunately right now I don't have much time to help on this
        I'd like to avoid to set ignore_malformed in my indices. Maybe it could also be usefult to add an accumulator that keeps track of documents that weren't indexed because malformed.
        I like the idea of a FailedActionRequestHandler as suggested in #5122 as long as it address this issue properly

        Show
        f.pompermaier Flavio Pompermaier added a comment - My previous PR was https://github.com/apache/flink/pull/2790 . The error was thrown in afterBulk and hasFailure was set to true. Then close() was throwing a RunTimeException also for a single malformed document and the entire job used to fail also for a single malformed document. Maybe that PR could be useful to test the error. Unfortunately right now I don't have much time to help on this I'd like to avoid to set ignore_malformed in my indices. Maybe it could also be usefult to add an accumulator that keeps track of documents that weren't indexed because malformed. I like the idea of a FailedActionRequestHandler as suggested in #5122 as long as it address this issue properly
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment - - edited

        Ok, then I think the actual cause for the well-formed documents to be missing also isn't because the BulkProcessor is dropping them due to the malformed documents, but simply because at-least-once isn't properly supported in the ES connector yet (see FLINK-5487). Any documents that were buffered in the BulkProcessor at the time of failure simply could not be recovered.

        Thank you for the feedback. With the suggested FailedActionRequestHandler, you can simply just drop the malformed document or re-process it if you want to.

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - - edited Ok, then I think the actual cause for the well-formed documents to be missing also isn't because the BulkProcessor is dropping them due to the malformed documents, but simply because at-least-once isn't properly supported in the ES connector yet (see FLINK-5487 ). Any documents that were buffered in the BulkProcessor at the time of failure simply could not be recovered. Thank you for the feedback. With the suggested FailedActionRequestHandler , you can simply just drop the malformed document or re-process it if you want to.
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user tzulitai opened a pull request:

        https://github.com/apache/flink/pull/3246

        FLINK-5353 [elasticsearch] User-provided failure handler for ElasticsearchSink

        Only the last commit is relevant. This PR is based on #3112 so that the functionality is added for all Elasticsearch versions.

        It is also based on the work of @static-max in #2861, but with improvements for a more general approach to solve both FLINK-5353(https://issues.apache.org/jira/browse/FLINK-5353) and FLINK-5122(https://issues.apache.org/jira/browse/FLINK-5122). The PR is more of a preview of the functionality for our Elasticsearch users, as proper testing for the expected behaviours is still pending / Javadoc updates.

        With this PR, users can now provide a `ActionRequestFailureHandler` that controls how to deal with a failed Elasticsearch request.

        Example:

        ```
        private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler {
        @Override
        boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) {
        if (failure instanceOf EsRejectedExecutionException)

        { indexer.add(action); return false; }

        else if (failure instanceOf ElasticsearchParseException)

        { // simply drop request without failing sink return false; }

        else

        { // for all other failures, fail the sink return true; }

        }
        }
        ```

        The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. The handler is provided to the constructor of `ElasticsearchSink`.

        Note that the `onFailure` method is called only after the internal `BulkProcessor` finishes all backoff retry attempts for temporary `EsRejectedExecutionException`s (saturated ES node queue capacity).

            1. Alternatives:

        1. Currently, all failures reported in the `afterBulk` callback will be used to invoke `onFailure` of the handler. We can perhaps just pass some specific exceptions for the user to decide on how to handle them.

        2. The original `ElasticsearchSinkFunction` and new `ActionRequestFailureHandler` interface could perhaps be integrated into one.

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/tzulitai/flink FLINK-5353

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3246.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3246


        commit bf84c0aa91924aca779189b628a656d9b54e36db
        Author: Mike Dias <mike.rodrigues.dias@gmail.com>
        Date: 2016-11-07T20:09:48Z

        FLINK-4988 Elasticsearch 5.x support

        commit 4efb2d497759b3688fe80261df19bb1e1c3f1c21
        Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
        Date: 2017-01-12T13:21:56Z

        FLINK-4988 [elasticsearch] Restructure Elasticsearch connectors

        commit be35862383b69c0d65fefd2c48c772a81fceb8d5
        Author: Max Kuklinski <max.kuklinski@live.de>
        Date: 2016-11-23T16:54:11Z

        FLINK-5122 [elasticsearch] Retry temporary Elasticsearch request errors.

        Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full

        commit fa67e8be5ca8e90d47ad12e947eac7b695e8fcca
        Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
        Date: 2017-01-30T05:55:26Z

        FLINK-5353 [elasticsearch] User-provided failure handler for ElasticsearchSink

        This commit fixes both FLINK-5353 and FLINK-5122. It allows users to implement a
        failure handler to control how failed action requests are dealt with.

        The commit also includes general improvements to FLINK-5122:
        1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not
        available for Elasticsearch 1.x)
        2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3246 FLINK-5353 [elasticsearch] User-provided failure handler for ElasticsearchSink Only the last commit is relevant. This PR is based on #3112 so that the functionality is added for all Elasticsearch versions. It is also based on the work of @static-max in #2861, but with improvements for a more general approach to solve both FLINK-5353 ( https://issues.apache.org/jira/browse/FLINK-5353 ) and FLINK-5122 ( https://issues.apache.org/jira/browse/FLINK-5122 ). The PR is more of a preview of the functionality for our Elasticsearch users, as proper testing for the expected behaviours is still pending / Javadoc updates. With this PR, users can now provide a `ActionRequestFailureHandler` that controls how to deal with a failed Elasticsearch request. Example: ``` private static class ExampleActionRequestFailureHandler implements ActionRequestFailureHandler { @Override boolean onFailure(ActionRequest action, Throwable failure, RequestIndexer indexer) { if (failure instanceOf EsRejectedExecutionException) { indexer.add(action); return false; } else if (failure instanceOf ElasticsearchParseException) { // simply drop request without failing sink return false; } else { // for all other failures, fail the sink return true; } } } ``` The above example will let the sink re-add requests that failed due to queue capacity saturation and drop requests with malformed documents, without failing the sink. For all other failures, the sink will fail. The handler is provided to the constructor of `ElasticsearchSink`. Note that the `onFailure` method is called only after the internal `BulkProcessor` finishes all backoff retry attempts for temporary `EsRejectedExecutionException`s (saturated ES node queue capacity). Alternatives: 1. Currently, all failures reported in the `afterBulk` callback will be used to invoke `onFailure` of the handler. We can perhaps just pass some specific exceptions for the user to decide on how to handle them. 2. The original `ElasticsearchSinkFunction` and new `ActionRequestFailureHandler` interface could perhaps be integrated into one. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5353 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3246.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3246 commit bf84c0aa91924aca779189b628a656d9b54e36db Author: Mike Dias <mike.rodrigues.dias@gmail.com> Date: 2016-11-07T20:09:48Z FLINK-4988 Elasticsearch 5.x support commit 4efb2d497759b3688fe80261df19bb1e1c3f1c21 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-01-12T13:21:56Z FLINK-4988 [elasticsearch] Restructure Elasticsearch connectors commit be35862383b69c0d65fefd2c48c772a81fceb8d5 Author: Max Kuklinski <max.kuklinski@live.de> Date: 2016-11-23T16:54:11Z FLINK-5122 [elasticsearch] Retry temporary Elasticsearch request errors. Covered exceptions are: Timeouts, No Master, UnavailableShardsException, bulk queue on node full commit fa67e8be5ca8e90d47ad12e947eac7b695e8fcca Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-01-30T05:55:26Z FLINK-5353 [elasticsearch] User-provided failure handler for ElasticsearchSink This commit fixes both FLINK-5353 and FLINK-5122 . It allows users to implement a failure handler to control how failed action requests are dealt with. The commit also includes general improvements to FLINK-5122 : 1. Use the built-in backoff functionality in the Elasticsearch BulkProcessor (not available for Elasticsearch 1.x) 2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure handler
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tzulitai commented on the issue:

        https://github.com/apache/flink/pull/3246

        @fpompermaier @static-max tagging you so that you're aware of this PR. Will be great to hear feedback from you!

        Show
        githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3246 @fpompermaier @static-max tagging you so that you're aware of this PR. Will be great to hear feedback from you!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fpompermaier commented on the issue:

        https://github.com/apache/flink/pull/3246

        I think that's great!

        Show
        githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3246 I think that's great!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user static-max commented on the issue:

        https://github.com/apache/flink/pull/3246

        Looks great!
        One note: In your example ExampleActionRequestFailureHandler you have to unwrap the Exception, as it is typical looks like:

        `RemoteTransportException[[Richard Rider][127.0.0.1:9301][indices:data/write/bulk[s]]]; nested: RemoteTransportException[[Richard Rider][127.0.0.1:9301][indices:data/write/bulk[s][p]]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@e5c47a1 on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7e2d5cc5[Running, pool size = 8, active threads = 8, queued tasks = 1, completed tasks = 119]]];`

        In my implementation I use Apache commons:
        `
        ExceptionUtils.indexOfThrowable(throwable, EsRejectedExecutionException.class) >= 0
        `

        Show
        githubbot ASF GitHub Bot added a comment - Github user static-max commented on the issue: https://github.com/apache/flink/pull/3246 Looks great! One note: In your example ExampleActionRequestFailureHandler you have to unwrap the Exception, as it is typical looks like: `RemoteTransportException[ [Richard Rider] [127.0.0.1:9301] [indices:data/write/bulk [s] ]]; nested: RemoteTransportException[ [Richard Rider] [127.0.0.1:9301] [indices:data/write/bulk [s] [p] ]]; nested: EsRejectedExecutionException[rejected execution of org.elasticsearch.transport.TransportService$4@e5c47a1 on EsThreadPoolExecutor[bulk, queue capacity = 1, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7e2d5cc5 [Running, pool size = 8, active threads = 8, queued tasks = 1, completed tasks = 119] ]];` In my implementation I use Apache commons: ` ExceptionUtils.indexOfThrowable(throwable, EsRejectedExecutionException.class) >= 0 `
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tzulitai commented on the issue:

        https://github.com/apache/flink/pull/3246

        Thanks for the feedbacks. I'll rebase this PR soon.

        @static-max thanks for the tip. I'll keep that in mind when updating the docs for this feature!

        Show
        githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3246 Thanks for the feedbacks. I'll rebase this PR soon. @static-max thanks for the tip. I'll keep that in mind when updating the docs for this feature!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fpompermaier commented on the issue:

        https://github.com/apache/flink/pull/3246

        Any news on this?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3246 Any news on this?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tzulitai commented on the issue:

        https://github.com/apache/flink/pull/3246

        Hi @fpompermaier, sorry I was busy with other stuff over the last week.
        I hope to work towards merging this by the end of next week!

        Show
        githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3246 Hi @fpompermaier, sorry I was busy with other stuff over the last week. I hope to work towards merging this by the end of next week!
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tzulitai commented on the issue:

        https://github.com/apache/flink/pull/3246

        Manually merged with 3743e898104d79a9813d444d38fa9f86617bb5ef. Review happened in #3358.

        Show
        githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3246 Manually merged with 3743e898104d79a9813d444d38fa9f86617bb5ef. Review happened in #3358.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user tzulitai closed the pull request at:

        https://github.com/apache/flink/pull/3246

        Show
        githubbot ASF GitHub Bot added a comment - Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3246
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fpompermaier commented on the issue:

        https://github.com/apache/flink/pull/3246

        Awesome news! Thanks a lot @tzulitai for the great work!

        Show
        githubbot ASF GitHub Bot added a comment - Github user fpompermaier commented on the issue: https://github.com/apache/flink/pull/3246 Awesome news! Thanks a lot @tzulitai for the great work!
        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -
        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved in master via http://git-wip-us.apache.org/repos/asf/flink/3743e89

          People

          • Assignee:
            tzulitai Tzu-Li (Gordon) Tai
            Reporter:
            f.pompermaier Flavio Pompermaier
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development