Flume
  1. Flume
  2. FLUME-2222

Duplicate entries in Elasticsearch when using Flume elasticsearch-sink

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Not A Problem
    • Affects Version/s: v1.4.0
    • Fix Version/s: None
    • Component/s: Sinks+Sources
    • Environment:

      centos 6

      Description

      Hello,

      I'm using flume elasticsearch-sink to transfer logs from ec2 instances to elasticsearch and I get duplicate entries for numerous documents.

      I've noticed this issue when I was sending a specific number of log lines to elasticsearch using flume and then I was counting them using kibana to verify that all of them arrived. Most of the time, especially when multiple flume instances were used, I was getting duplicate entries. e.g. instead of receiving 10000 documents from an instance, I was receiving 10060.

      Duplication level seems to be proportional to the number of instances sending log data simultaneously. e.g. with 3 flume instances I get 10060, with 50 flume instances I get 10300.

      Is duplication something that I should expect when using flume elasticsearch-sink?
      There is a doRollback() method that is called on transaction failure but I think that it updates only the local flume channel and not elasticsearch.

      Any info/suggestions would be appreciated.

      Regards,
      Nick

        Activity

        Hide
        Nikolaos Tsipas added a comment -

        I did some more investigation on this issue and it looks like the duplicated documents are produced when for some reason flume has to rollback a transaction.
        Bellow you will find an actual example of document duplication.

        Duplicate message in kibana

        flume.log on the instance from which the above log line came from

        29 Oct 2013 12:05:40,112 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.elasticsearch.ElasticSearchSink.process:217)  - Failed to commit transaction. Transaction rolled back.
        org.elasticsearch.client.transport.NoNodeAvailableException: No node available
                at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
                at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
                at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
                at java.lang.Thread.run(Thread.java:744)
        29 Oct 2013 12:05:40,113 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
        org.elasticsearch.client.transport.NoNodeAvailableException: No node available
                at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249)
                at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84)
                at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311)
                
        

        It looks like the log message got indexed by elasticsearch but flume wasn't aware of this because of a connection error. So, it rolled back the transaction and the same log line was sent twice.
        Does this make sense? I think it does but I'd like to read your thoughts on this issue.

        Regards,
        Nick

        Show
        Nikolaos Tsipas added a comment - I did some more investigation on this issue and it looks like the duplicated documents are produced when for some reason flume has to rollback a transaction. Bellow you will find an actual example of document duplication. Duplicate message in kibana flume.log on the instance from which the above log line came from 29 Oct 2013 12:05:40,112 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.elasticsearch.ElasticSearchSink.process:217) - Failed to commit transaction. Transaction rolled back. org.elasticsearch.client.transport.NoNodeAvailableException: No node available at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249) at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84) at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang. Thread .run( Thread .java:744) 29 Oct 2013 12:05:40,113 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. org.elasticsearch.client.transport.NoNodeAvailableException: No node available at org.elasticsearch.client.transport.TransportClientNodesService$RetryListener.onFailure(TransportClientNodesService.java:249) at org.elasticsearch.action.TransportActionNodeProxy$1.handleException(TransportActionNodeProxy.java:84) at org.elasticsearch.transport.TransportService$Adapter$2$1.run(TransportService.java:311) It looks like the log message got indexed by elasticsearch but flume wasn't aware of this because of a connection error. So, it rolled back the transaction and the same log line was sent twice. Does this make sense? I think it does but I'd like to read your thoughts on this issue. Regards, Nick
        Hide
        Ashish Paliwal added a comment -

        This is the expected behavior. In case of failure the transaction shall be rolled back and the batch shall be sent again, which means some entries might have got indexed by the time failure happened. So your analysis is correct.

        Show
        Ashish Paliwal added a comment - This is the expected behavior. In case of failure the transaction shall be rolled back and the batch shall be sent again, which means some entries might have got indexed by the time failure happened. So your analysis is correct.
        Hide
        Nikolaos Tsipas added a comment -

        Thanks for your message. I guess you can resolve this ticket.

        Show
        Nikolaos Tsipas added a comment - Thanks for your message. I guess you can resolve this ticket.
        Hide
        Ashish Paliwal added a comment -

        Not a problem, it's expected behaviour

        Show
        Ashish Paliwal added a comment - Not a problem, it's expected behaviour
        Hide
        Edward Sargisson added a comment -

        Nikolaos Tsipas BTW, the way we deal with that behaviour is to set an ID as early in the pipeline as we can manage and use that ID when writing to ES. ES will overwrite records with the same id.

        Show
        Edward Sargisson added a comment - Nikolaos Tsipas BTW, the way we deal with that behaviour is to set an ID as early in the pipeline as we can manage and use that ID when writing to ES. ES will overwrite records with the same id.

          People

          • Assignee:
            Ashish Paliwal
            Reporter:
            Nikolaos Tsipas
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development