Kafka
  1. Kafka
  2. KAFKA-262

Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: core
    • Labels:
      None

      Description

      The consumer maintains a cache of topics and partitions it owns along with the fetcher queues corresponding to those. But while releasing partition ownership, this cache is not cleared. This leads the consumer to release a partition that it does not own any more. This can also lead the consumer to commit offsets for partitions that it no longer consumes from.

      The rebalance operation goes through following steps -

      1. close fetchers
      2. commit offsets
      3. release partition ownership.
      4. rebalance, add topic, partition and fetcher queues to the topic registry, for all topics that the consumer process currently wants to own.
      5. If the consumer runs into conflict for one topic or partition, the rebalancing attempt fails, and it goes to step 1.

      Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0.

      1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2.
      2. c1's release partition ownership and during step 4 (above), fails to rebalance.
      3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data.
      4. c1 starts next rebalancing attempt and during step 3 (above), it releases partition 0-1. During step 4, it owns partition 0-0 again, and starts consuming data.
      5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper.

      1. kafka-262-v3.patch
        24 kB
        Neha Narkhede
      2. kafka-262.patch
        25 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        This patch removes the cache used by each consumer to decide whether or not it should trigger a rebalance operation. The reason being that it is very tricky to keep the cache updated in each participating consumer, leading to incorrect partition ownership decisions.

        This patch also changes the code to use the topicRegistry correctly. It is used to keep track of the fetcher queues for every topic and partition the consumer owns. Hence, whenever partition ownership is released, the relevant data needs to be deleted from the topic registry.

        Show
        Neha Narkhede added a comment - This patch removes the cache used by each consumer to decide whether or not it should trigger a rebalance operation. The reason being that it is very tricky to keep the cache updated in each participating consumer, leading to incorrect partition ownership decisions. This patch also changes the code to use the topicRegistry correctly. It is used to keep track of the fetcher queues for every topic and partition the consumer owns. Hence, whenever partition ownership is released, the relevant data needs to be deleted from the topic registry.
        Hide
        Neha Narkhede added a comment -

        Includes some cleanup. Removing oldPartitionsPerTopicMap and oldConsumersPerTopicMap.

        Show
        Neha Narkhede added a comment - Includes some cleanup. Removing oldPartitionsPerTopicMap and oldConsumersPerTopicMap.
        Hide
        Jun Rao added a comment -

        Some comments:

        1. In ZookeeperConsumerConnector.reflectPartitionOwnershipDecision, the local variable success is not intuitive. It should be named to something like hasFailure.

        2. In ZookeeperConsumerConnector.releasePartitionOwnership. It's not clear to me why this method has to take an input parameter. Wouldn't it be simpler to always release partition ownership according to topicRegistry?

        Show
        Jun Rao added a comment - Some comments: 1. In ZookeeperConsumerConnector.reflectPartitionOwnershipDecision, the local variable success is not intuitive. It should be named to something like hasFailure. 2. In ZookeeperConsumerConnector.releasePartitionOwnership. It's not clear to me why this method has to take an input parameter. Wouldn't it be simpler to always release partition ownership according to topicRegistry?
        Hide
        Neha Narkhede added a comment -

        1. Will change that before committing the patch
        2. Maybe. This sounds like a doable optimization. Al though, the code is very complex, and each optimization needs to be thought through deeply and comes with extensive testing. I'd like to commit this patch, since it holds off KAFKA-253 and the release and fixes the bug. I can open a JIRA to address that optimization, or can put it as part of KAFKA-265.

        Show
        Neha Narkhede added a comment - 1. Will change that before committing the patch 2. Maybe. This sounds like a doable optimization. Al though, the code is very complex, and each optimization needs to be thought through deeply and comes with extensive testing. I'd like to commit this patch, since it holds off KAFKA-253 and the release and fixes the bug. I can open a JIRA to address that optimization, or can put it as part of KAFKA-265 .
        Hide
        Jun Rao added a comment -

        2. If releasePartitionOwnership always just checks topicRegistry, the code and the logic will be a bit simpler. Could we run the system test and see if there is any issue with the simplification?

        Show
        Jun Rao added a comment - 2. If releasePartitionOwnership always just checks topicRegistry, the code and the logic will be a bit simpler. Could we run the system test and see if there is any issue with the simplification?
        Hide
        Neha Narkhede added a comment -

        1. Changed releasePartitionOwnership to not take in a map
        2. Changed the name of the variable success to hasPartitionOwnershipFailed.

        Show
        Neha Narkhede added a comment - 1. Changed releasePartitionOwnership to not take in a map 2. Changed the name of the variable success to hasPartitionOwnershipFailed.
        Hide
        Jun Rao added a comment -

        In releasePartitionOwnership(), topicAndPartitionsToBeReleased is no longer used and should be removed. Otherwise, the patch looks good.

        Show
        Jun Rao added a comment - In releasePartitionOwnership(), topicAndPartitionsToBeReleased is no longer used and should be removed. Otherwise, the patch looks good.

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 24h
              24h
              Remaining:
              Remaining Estimate - 24h
              24h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development