Kafka
  1. Kafka
  2. KAFKA-256

Bug in the consumer rebalancing logic leads to the consumer not pulling data from some partitions

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 0.7
    • Fix Version/s: 0.7.1
    • Component/s: core
    • Labels:
      None

      Description

      There is a bug in the consumer rebalancing logic that makes a consumer not pull data from some partitions for a topic. It recovers only after the consumer group is restarted and doesn't hit this bug again.

      Here is the observed behavior of the consumer when it hits the bug -

      1. Consumer is consuming 2 topics with 1 partition each on 2 brokers
      2. Broker 2 is bounced
      3. Rebalancing operation triggers for topic_2, where the consumer decides to now consume data only from Broker 1 for topic_2
      4. During the rebalancing operation, ZK has not yet deleted the /brokers/topics/topic_1/broker_2, so the consumer still decides to consumer from both brokers for topic_1
      5. While restarting the fetchers, it tries to restart fetcher for broker 2 and throws a RuntimeException. Before this, it has successfully started fetcher for broker 1 and is consuming data from broker_1
      6. This exception trickles all the way upto syncedRebalance API and the oldPartitionsPerTopicMap does not get updated to reflect that for topic_2, the consumer has now seen only broker_1. It still points to topic_2 -> broker_1, broker_2
      7. Next rebalancing attempt gets triggered
      8. By now, broker 2 is restarted and registered in zookeeper
      9. For topic_2, the consumer tries to see if rebalancing needs to be done. Since it doesn't see a change in the cached topic partition map, it decides there is no need to rebalance.
      10. It continues fetching only from broker_1

      1. kafka-256-v3.patch
        49 kB
        Neha Narkhede
      2. kafka-256-v2.patch
        49 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        Changes include -

        1. The bug was caused due to a stale cache problem

        2. This patch fixes the bug by clearing the cache on every single unsuccessful rebalancing attempt. This includes exception during rebalancing OR failure to own one or more partitions

        3. A tool to verify if a consumer group successfully completed a rebalancing operation.

        4. To facilitate such a tool, partition ownership is split into 3 steps -

        4.1 the consumer records its decision to own partitions that it has selected
        4.2 the fetchers are started to start pulling data from the selected partitions
        4.3 the above partition ownership decision is written to zookeeper

        5. Cleanup to remove unrequired imports

        Show
        Neha Narkhede added a comment - Changes include - 1. The bug was caused due to a stale cache problem 2. This patch fixes the bug by clearing the cache on every single unsuccessful rebalancing attempt. This includes exception during rebalancing OR failure to own one or more partitions 3. A tool to verify if a consumer group successfully completed a rebalancing operation. 4. To facilitate such a tool, partition ownership is split into 3 steps - 4.1 the consumer records its decision to own partitions that it has selected 4.2 the fetchers are started to start pulling data from the selected partitions 4.3 the above partition ownership decision is written to zookeeper 5. Cleanup to remove unrequired imports
        Hide
        Neha Narkhede added a comment -

        This patch applies cleanly to trunk

        Show
        Neha Narkhede added a comment - This patch applies cleanly to trunk
        Hide
        Neha Narkhede added a comment -

        A slight change here -

        The fetchers are updated only after the partition ownership is reflected in zookeeper. This will reduce the possibility of duplicate data

        Show
        Neha Narkhede added a comment - A slight change here - The fetchers are updated only after the partition ownership is reflected in zookeeper. This will reduce the possibility of duplicate data
        Hide
        Joel Koshy added a comment -

        +1 for v3.

        I like the idea of having a tool to check if a consumer is correctly balanced.
        A more general comment/question on the kafka.tools package: I thought the tools
        package is meant for stand-alone tools that people can run on the command-line,
        whose output can be piped for further processing if desired. If so, it would
        be better not to use logging for the tool's output and simply println.

        Show
        Joel Koshy added a comment - +1 for v3. I like the idea of having a tool to check if a consumer is correctly balanced. A more general comment/question on the kafka.tools package: I thought the tools package is meant for stand-alone tools that people can run on the command-line, whose output can be piped for further processing if desired. If so, it would be better not to use logging for the tool's output and simply println.
        Hide
        Neha Narkhede added a comment -

        You have a good point about the tools package. This tool is meant for use in running system tests (KAFKA-227) in verifying the correctness of Kafka. This means at the very least the output about whether the rebalancing attempt was successful or not, can be println and maybe the other info useful for debugging can be log4j ? If that makes sense, I'll make that change before committing this patch.

        Thanks for reviewing this patch and catching the possible duplication issue in v2.

        Show
        Neha Narkhede added a comment - You have a good point about the tools package. This tool is meant for use in running system tests ( KAFKA-227 ) in verifying the correctness of Kafka. This means at the very least the output about whether the rebalancing attempt was successful or not, can be println and maybe the other info useful for debugging can be log4j ? If that makes sense, I'll make that change before committing this patch. Thanks for reviewing this patch and catching the possible duplication issue in v2.
        Hide
        Neha Narkhede added a comment -

        Committed this patch to trunk. Will fix KAFKA-262 separately.

        Show
        Neha Narkhede added a comment - Committed this patch to trunk. Will fix KAFKA-262 separately.
        Hide
        Neha Narkhede added a comment -

        I found a bug in the v3 patch. The reflectPartitionOwnershipDecision API has a bug that doesn't set the value of partitionOwnershipSuccessful correctly. Will fix this as part of KAFKA-262.

        Show
        Neha Narkhede added a comment - I found a bug in the v3 patch. The reflectPartitionOwnershipDecision API has a bug that doesn't set the value of partitionOwnershipSuccessful correctly. Will fix this as part of KAFKA-262 .
        Hide
        Jun Rao added a comment -

        Some comments:
        1. ZookeeperConsumerConnector.reflectPartitionOwnershipDecision, the following code seems incorrect.
        val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
        The function in foldLeft should check both sum and decision. Also, the local variable success should be named to something like hasFailure.
        2. ZookeeperConsumerConnector.syncedRebalance
        done = false in the catch clause is not necessary. If we hit an exception, done will be left with the initial value, which is false.
        The else after the following
        if (done)

        { return }

        is not necessary.
        Also, it seems there is no need to call commitOffset before closeFetchersFprQuues since the latter commits offsets already.
        3. It seems that we don't need to check ownership registry in ZK in processPartition. The same check will be done later in reflectPartitionOwnershipDecision.

        Show
        Jun Rao added a comment - Some comments: 1. ZookeeperConsumerConnector.reflectPartitionOwnershipDecision, the following code seems incorrect. val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1) The function in foldLeft should check both sum and decision. Also, the local variable success should be named to something like hasFailure. 2. ZookeeperConsumerConnector.syncedRebalance done = false in the catch clause is not necessary. If we hit an exception, done will be left with the initial value, which is false. The else after the following if (done) { return } is not necessary. Also, it seems there is no need to call commitOffset before closeFetchersFprQuues since the latter commits offsets already. 3. It seems that we don't need to check ownership registry in ZK in processPartition. The same check will be done later in reflectPartitionOwnershipDecision.
        Hide
        Neha Narkhede added a comment -

        1. That is the bug I was referring to in my previous comment.

        2. done =false in the catch clause is to prevent a bug, in case the code elsewhere in the rebalance API changes in the future. These bugs are very hard to spot and time consuming to debug. This one-liner seems harmless since it could potentially save a lot of time.
        Though, commitOffsets() can be skipped before closing the fetchers.

        3. This is also a good point, and seems like an over optimization. Will get rid of it as part of KAFKA-262.

        Show
        Neha Narkhede added a comment - 1. That is the bug I was referring to in my previous comment. 2. done =false in the catch clause is to prevent a bug, in case the code elsewhere in the rebalance API changes in the future. These bugs are very hard to spot and time consuming to debug. This one-liner seems harmless since it could potentially save a lot of time. Though, commitOffsets() can be skipped before closing the fetchers. 3. This is also a good point, and seems like an over optimization. Will get rid of it as part of KAFKA-262 .

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development