Kafka
  1. Kafka
  2. KAFKA-154

ZK consumer may lose a chunk worth of message during rebalance in some rare cases

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7
    • Component/s: core
    • Labels:
      None

      Description

      Occasionally, we have see errors with the following message in the consumer log after a rebalance happens.
      consumed offset: xxx doesn't match fetch offset: yyy for topicz

      The consumer offset xxx should always match the fetch offset yyy.

        Activity

        Hide
        Joel Koshy added a comment -

        Ok nm - it's late.. I see that the queues are cleared out.

        Show
        Joel Koshy added a comment - Ok nm - it's late.. I see that the queues are cleared out.
        Hide
        Joel Koshy added a comment -

        While looking at this area of the code, I was wondering about this patch: wouldn't it permit the mirror issue of enqueue and not advancing the offset, since the interrupt could occur just before the fetch offset update? So the new fetcher may fetch the same offset again. It seems to me that the interrupt and PartitionTopicInfo's enqueue method itself should be mutually exclusive - or perhaps provide suitable handling for chunks with the same fetch offset in the consumer iterator. Or I must be missing something obvious

        Show
        Joel Koshy added a comment - While looking at this area of the code, I was wondering about this patch: wouldn't it permit the mirror issue of enqueue and not advancing the offset, since the interrupt could occur just before the fetch offset update? So the new fetcher may fetch the same offset again. It seems to me that the interrupt and PartitionTopicInfo's enqueue method itself should be mutually exclusive - or perhaps provide suitable handling for chunks with the same fetch offset in the consumer iterator. Or I must be missing something obvious
        Hide
        Jun Rao added a comment -

        Thanks for the review. Committed the patch.

        Show
        Jun Rao added a comment - Thanks for the review. Committed the patch.
        Hide
        Neha Narkhede added a comment -

        +1. Good catch !

        Show
        Neha Narkhede added a comment - +1. Good catch !
        Hide
        Jun Rao added a comment -

        Patch is ready for review.

        We just need to make sure that we enqueue first and then advance the fetch offset. This way, if the enqueue operation gets interrupted, the fetch offset is not advanced. If the enqueue operation succeeds, the offset is guaranteed to be advanced since it can't be interrupted.

        Show
        Jun Rao added a comment - Patch is ready for review. We just need to make sure that we enqueue first and then advance the fetch offset. This way, if the enqueue operation gets interrupted, the fetch offset is not advanced. If the enqueue operation succeeds, the offset is guaranteed to be advanced since it can't be interrupted.
        Hide
        Jun Rao added a comment -

        This problem is caused by a very subtle bug. When a fetcher calls PartitionTopicInfo.enqueue, we first advance the fetch offset and then enqueue the fetched chunk into a blocking queue. When the fetcher thread is interrupted (because we are shutting down the fetcher after a rebalance), it can happen that we just advanced the fetch offset to xxx, but got interrupted while trying to add the fetched chunk into the queue (so the chunk is not added to the queue). Then a new fetcher gets created to start fetching from xxx. This causes a chunk worth of data just before xxx to be lost to the consumer.

        Show
        Jun Rao added a comment - This problem is caused by a very subtle bug. When a fetcher calls PartitionTopicInfo.enqueue, we first advance the fetch offset and then enqueue the fetched chunk into a blocking queue. When the fetcher thread is interrupted (because we are shutting down the fetcher after a rebalance), it can happen that we just advanced the fetch offset to xxx, but got interrupted while trying to add the fetched chunk into the queue (so the chunk is not added to the queue). Then a new fetcher gets created to start fetching from xxx. This causes a chunk worth of data just before xxx to be lost to the consumer.

          People

          • Assignee:
            Jun Rao
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development