Kafka
  1. Kafka
  2. KAFKA-154

ZK consumer may lose a chunk worth of message during rebalance in some rare cases

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7
    • Component/s: core
    • Labels:
      None

      Description

      Occasionally, we have see errors with the following message in the consumer log after a rebalance happens.
      consumed offset: xxx doesn't match fetch offset: yyy for topicz

      The consumer offset xxx should always match the fetch offset yyy.

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Patch Available Patch Available
        14m 38s 1 Jun Rao 11/Oct/11 17:48
        Patch Available Patch Available Resolved Resolved
        1h 23m 1 Jun Rao 11/Oct/11 19:12
        Tony Stevenson made changes -
        Workflow Apache Kafka Workflow [ 13052454 ] no-reopen-closed, patch-avail [ 13055182 ]
        Tony Stevenson made changes -
        Workflow no-reopen-closed, patch-avail [ 12637308 ] Apache Kafka Workflow [ 13052454 ]
        Hide
        Joel Koshy added a comment -

        Ok nm - it's late.. I see that the queues are cleared out.

        Show
        Joel Koshy added a comment - Ok nm - it's late.. I see that the queues are cleared out.
        Hide
        Joel Koshy added a comment -

        While looking at this area of the code, I was wondering about this patch: wouldn't it permit the mirror issue of enqueue and not advancing the offset, since the interrupt could occur just before the fetch offset update? So the new fetcher may fetch the same offset again. It seems to me that the interrupt and PartitionTopicInfo's enqueue method itself should be mutually exclusive - or perhaps provide suitable handling for chunks with the same fetch offset in the consumer iterator. Or I must be missing something obvious

        Show
        Joel Koshy added a comment - While looking at this area of the code, I was wondering about this patch: wouldn't it permit the mirror issue of enqueue and not advancing the offset, since the interrupt could occur just before the fetch offset update? So the new fetcher may fetch the same offset again. It seems to me that the interrupt and PartitionTopicInfo's enqueue method itself should be mutually exclusive - or perhaps provide suitable handling for chunks with the same fetch offset in the consumer iterator. Or I must be missing something obvious
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Fix Version/s 0.7 [ 12317243 ]
        Resolution Fixed [ 1 ]
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed the patch.

        Show
        Jun Rao added a comment - Thanks for the review. Committed the patch.
        Hide
        Neha Narkhede added a comment -

        +1. Good catch !

        Show
        Neha Narkhede added a comment - +1. Good catch !
        Jun Rao made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Jun Rao added a comment -

        Patch is ready for review.

        We just need to make sure that we enqueue first and then advance the fetch offset. This way, if the enqueue operation gets interrupted, the fetch offset is not advanced. If the enqueue operation succeeds, the offset is guaranteed to be advanced since it can't be interrupted.

        Show
        Jun Rao added a comment - Patch is ready for review. We just need to make sure that we enqueue first and then advance the fetch offset. This way, if the enqueue operation gets interrupted, the fetch offset is not advanced. If the enqueue operation succeeds, the offset is guaranteed to be advanced since it can't be interrupted.
        Jun Rao made changes -
        Field Original Value New Value
        Attachment KAFKA-154.patch [ 12498631 ]
        Hide
        Jun Rao added a comment -

        This problem is caused by a very subtle bug. When a fetcher calls PartitionTopicInfo.enqueue, we first advance the fetch offset and then enqueue the fetched chunk into a blocking queue. When the fetcher thread is interrupted (because we are shutting down the fetcher after a rebalance), it can happen that we just advanced the fetch offset to xxx, but got interrupted while trying to add the fetched chunk into the queue (so the chunk is not added to the queue). Then a new fetcher gets created to start fetching from xxx. This causes a chunk worth of data just before xxx to be lost to the consumer.

        Show
        Jun Rao added a comment - This problem is caused by a very subtle bug. When a fetcher calls PartitionTopicInfo.enqueue, we first advance the fetch offset and then enqueue the fetched chunk into a blocking queue. When the fetcher thread is interrupted (because we are shutting down the fetcher after a rebalance), it can happen that we just advanced the fetch offset to xxx, but got interrupted while trying to add the fetched chunk into the queue (so the chunk is not added to the queue). Then a new fetcher gets created to start fetching from xxx. This causes a chunk worth of data just before xxx to be lost to the consumer.
        Jun Rao created issue -

          People

          • Assignee:
            Jun Rao
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development