Flume
  1. Flume
  2. FLUME-2390

Flume-ElasticSearch Data gets posted multiple times when one of the event fail validation at elastic search sink for JSON Data

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: v1.4.0
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Labels:
      None
    • Environment:

      CDH4.5

      Description

      Hi,

      I am using Elastic Search Sink to post JSON data. I used the temporary fix mentioned in https://issues.apache.org/jira/browse/FLUME-2126 to get JSON data posted to elastic search. When one of the message fail validation at ElasticSearch mapping for JSON data ( For example - getting empty message) , Flume seems to post the entire batch again and again until I restart Flume. Because of that no of events went from an avg of 100 to avg of 2000 per 10 minutes. As a temporary fix I set a header in my FlumeHTTP Source for non valid JSON and used a interceptor to send data to multiple ESSINKS which has different index names.

        Activity

        Hide
        Hari Shreedharan added a comment -

        If you are using the file channel, the file channel integrity tool will allow you to take a channel offline and validate the data and will remove invalid data. This is in trunk and is coming in 1.6

        Show
        Hari Shreedharan added a comment - If you are using the file channel, the file channel integrity tool will allow you to take a channel offline and validate the data and will remove invalid data. This is in trunk and is coming in 1.6
        Hide
        Edward Sargisson added a comment -

        Benjamin Fiorini your analysis is correct but I would disagree with the solution.
        We use the Rotem Hermon solution ourselves and generate an ID as early in the pipeline as we can manage. This means that our system can use Last Write Wins and overwrite any record with that ID.

        As for #2. I think that if you've got a mapping problem then you need to fix the mapping problem. Sadly, Flume has the head of line blocking problem (aka poison pill) so everything blocks. A more general solution is to have a dead letter queue.

        In dev we simply delete the queue. In production like environments when we want to practice keeping the data we:
        0. backup the file channel directories
        1. set the batch size to 1 and let all the events through until it blocks (otherwise you may have 100's of good events ahead of the poison pill).
        2. use the FileRollSink to flush the queue out to a file (remember to turn headers on)
        3. then we can look at the first event and figure out why it blocked
        4. then we can either fix the problem in es's mappings, restore the file channel and let it run or, in theory, take the poison pill out and re-ingest the events.

        Yes, all of that is a complete pain!

        Show
        Edward Sargisson added a comment - Benjamin Fiorini your analysis is correct but I would disagree with the solution. We use the Rotem Hermon solution ourselves and generate an ID as early in the pipeline as we can manage. This means that our system can use Last Write Wins and overwrite any record with that ID. As for #2. I think that if you've got a mapping problem then you need to fix the mapping problem. Sadly, Flume has the head of line blocking problem (aka poison pill) so everything blocks. A more general solution is to have a dead letter queue. In dev we simply delete the queue. In production like environments when we want to practice keeping the data we: 0. backup the file channel directories 1. set the batch size to 1 and let all the events through until it blocks (otherwise you may have 100's of good events ahead of the poison pill). 2. use the FileRollSink to flush the queue out to a file (remember to turn headers on) 3. then we can look at the first event and figure out why it blocked 4. then we can either fix the problem in es's mappings, restore the file channel and let it run or, in theory, take the poison pill out and re-ingest the events. Yes, all of that is a complete pain!
        Hide
        Benjamin Fiorini added a comment -

        Hi Deepak Subhramanian Edward Sargisson

        I believe there are 2 problems here:

        1. flume elasticsearch sink not indexing with a specific id => this duplicates the data, see Rotem Hermon solution
        2. flume elasticsearch sink not handling mapping discrepancies => this means that a bad message will be stuck and fill up your queue... That's cool from the Flume point of view but bad for Elasticsearch: there is no easy way to fix this on the ES side and you'd need to empty the entire channel because of 1 single bad message. Not ideal if you don't want to lose (too much) data.
          Maybe a solution is to give the possibility to ignore the MapperParsingException. I can provide a patch if this sounds sensible.

        Cheers,
        Benjamin

        Show
        Benjamin Fiorini added a comment - Hi Deepak Subhramanian Edward Sargisson I believe there are 2 problems here: flume elasticsearch sink not indexing with a specific id => this duplicates the data, see Rotem Hermon solution flume elasticsearch sink not handling mapping discrepancies => this means that a bad message will be stuck and fill up your queue... That's cool from the Flume point of view but bad for Elasticsearch: there is no easy way to fix this on the ES side and you'd need to empty the entire channel because of 1 single bad message. Not ideal if you don't want to lose (too much) data. Maybe a solution is to give the possibility to ignore the MapperParsingException. I can provide a patch if this sounds sensible. Cheers, Benjamin
        Hide
        Deepak Subhramanian added a comment -

        Hi Edward,

        It is not related to FLUME-2649. It is a duplicate of
        https://issues.apache.org/jira/browse/FLUME-2254 . So you can keep one
        of the tickets.

        Thanks, Deepak

        On Fri, Apr 10, 2015 at 4:17 PM, Edward Sargisson (JIRA)


        Deepak Subhramanian

        Show
        Deepak Subhramanian added a comment - Hi Edward, It is not related to FLUME-2649 . It is a duplicate of https://issues.apache.org/jira/browse/FLUME-2254 . So you can keep one of the tickets. Thanks, Deepak On Fri, Apr 10, 2015 at 4:17 PM, Edward Sargisson (JIRA) – Deepak Subhramanian
        Hide
        Edward Sargisson added a comment -

        Deepak Subhramanian
        Could you please take a look at FLUME-2649 and see if that will solve your problem? Perhaps we can close this work item.

        Show
        Edward Sargisson added a comment - Deepak Subhramanian Could you please take a look at FLUME-2649 and see if that will solve your problem? Perhaps we can close this work item.
        Hide
        Rotem Hermon added a comment -

        The problem of data getting posted multiple times is that when Flume retries a batch, all the documents are getting indexed again in Elasticsearch. And since no specific document ID is provided for the documents, Elasticsearch treats them as new documents and updates them again.
        The correct solution for this specific problem is to generate the document _id for each indexed document, so that even if it gets indexed again it will re-index the same document and not create a new one.
        We apply such a method in our extended serializer, you can see here - https://github.com/gigya/flume-ng-elasticsearch-ser-ex#generating-document-ids-for-events

        Show
        Rotem Hermon added a comment - The problem of data getting posted multiple times is that when Flume retries a batch, all the documents are getting indexed again in Elasticsearch. And since no specific document ID is provided for the documents, Elasticsearch treats them as new documents and updates them again. The correct solution for this specific problem is to generate the document _id for each indexed document, so that even if it gets indexed again it will re-index the same document and not create a new one. We apply such a method in our extended serializer, you can see here - https://github.com/gigya/flume-ng-elasticsearch-ser-ex#generating-document-ids-for-events
        Hide
        Benjamin Fiorini added a comment - - edited

        Duplicate of FLUME-2254, which has some hints to solve this in the ElasticSearch sink.
        We had to implement something like this, because if for some reason a malformed message (on the ES point of view) get in the queue, then it will get stuck in the channel forever. The only solution then is to delete the entire channel and losing A LOT of data.

        Show
        Benjamin Fiorini added a comment - - edited Duplicate of FLUME-2254 , which has some hints to solve this in the ElasticSearch sink. We had to implement something like this, because if for some reason a malformed message (on the ES point of view) get in the queue, then it will get stuck in the channel forever. The only solution then is to delete the entire channel and losing A LOT of data.
        Hide
        Ashish Paliwal added a comment -

        If I understand correctly, it's more of an issue of when ES has rejected a well formed data (from Flume's perspective), due to constraints like schema validation etc. May be we need to add a chech in ES Client on return status code and add a handler to it. We can have policy in the handler like drop such messages or write to log or something else.

        Show
        Ashish Paliwal added a comment - If I understand correctly, it's more of an issue of when ES has rejected a well formed data (from Flume's perspective), due to constraints like schema validation etc. May be we need to add a chech in ES Client on return status code and add a handler to it. We can have policy in the handler like drop such messages or write to log or something else.
        Hide
        Edward Sargisson added a comment -

        Deepak Subhramanian would you be able to provide an example of the exact data that reproduces this problem for you?

        I'd like to make sure it's in a test.

        Show
        Edward Sargisson added a comment - Deepak Subhramanian would you be able to provide an example of the exact data that reproduces this problem for you? I'd like to make sure it's in a test.
        Hide
        Otis Gospodnetic added a comment -

        Is this maybe related to the JSON ==> ES problem described in FLUME-2476 ?

        Show
        Otis Gospodnetic added a comment - Is this maybe related to the JSON ==> ES problem described in FLUME-2476 ?
        Hide
        Xuri Nagarin added a comment -

        Looking to submit a patch to fix both issues (once we finish some testing).

        Show
        Xuri Nagarin added a comment - Looking to submit a patch to fix both issues (once we finish some testing).
        Hide
        Edward Sargisson added a comment -

        Xuri Nagarin Err... No. If the sink can't figure out a way to write the data then it will block and wait for you to fix it. There is currently no poison pill handling.

        Show
        Edward Sargisson added a comment - Xuri Nagarin Err... No. If the sink can't figure out a way to write the data then it will block and wait for you to fix it. There is currently no poison pill handling.
        Hide
        Xuri Nagarin added a comment -

        Just ran into the related issue of flume ES sink feeding ElasticSearch with object reference instead of JSON content. If I understand the issue correctly, the ESSink should discard invalid JSON and throw an error in the logs and move on?

        Show
        Xuri Nagarin added a comment - Just ran into the related issue of flume ES sink feeding ElasticSearch with object reference instead of JSON content. If I understand the issue correctly, the ESSink should discard invalid JSON and throw an error in the logs and move on?
        Hide
        Edward Sargisson added a comment -

        Yes - but what would we do about that?
        Out attitude with Flume is to not lose data but complain bitterly and fill up the queue. We don't have a dead letter queue feature. So, yes, if you poison pill your queue it's going to sit there and keep retrying.

        I'm inclined to close this work item as Working as Designed but feel free to argue for a better approach.

        Show
        Edward Sargisson added a comment - Yes - but what would we do about that? Out attitude with Flume is to not lose data but complain bitterly and fill up the queue. We don't have a dead letter queue feature. So, yes, if you poison pill your queue it's going to sit there and keep retrying. I'm inclined to close this work item as Working as Designed but feel free to argue for a better approach.

          People

          • Assignee:
            Unassigned
            Reporter:
            Deepak Subhramanian
          • Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

            • Created:
              Updated:

              Development