Kafka
  1. Kafka
  2. KAFKA-286

consumer sometimes don't release partition ownership properly in ZK during rebalance

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.1
    • Component/s: core
    • Labels:
      None
    1. kafka-286_v2.patch
      4 kB
      Jun Rao
    2. kafka-286.patch
      3 kB
      Jun Rao

      Activity

      Hide
      Jun Rao added a comment -

      Patch attached.

      Show
      Jun Rao added a comment - Patch attached.
      Hide
      Neha Narkhede added a comment -

      Wondering if the patch can get the consumer into the following state -

      Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0.

      1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2.
      2. c1 releases partition ownership, but fails to rebalance.
      3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data.
      4. c1 starts next rebalancing attempt and it releases partition 0-1 (since 0-1 is still part of topicRegistry). It owns partition 0-0 again, and starts consuming data.
      5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper.

      I think using the topicRegistry cache is dangerous, since it has to be in sync with the ownership information in zookeeper. How about reading the ownership information from ZK along with the other data and only release that ?

      Show
      Neha Narkhede added a comment - Wondering if the patch can get the consumer into the following state - Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0. 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2. 2. c1 releases partition ownership, but fails to rebalance. 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data. 4. c1 starts next rebalancing attempt and it releases partition 0-1 (since 0-1 is still part of topicRegistry). It owns partition 0-0 again, and starts consuming data. 5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper. I think using the topicRegistry cache is dangerous, since it has to be in sync with the ownership information in zookeeper. How about reading the ownership information from ZK along with the other data and only release that ?
      Hide
      Jun Rao added a comment -

      That's a good point. What can happen is that we may delete ZK paths that c1 didn't successfully own in step 2, if rebalance fails. Added patch v2 that deletes all temporarily owned ZK paths in reflectPartitionOwnershipDecision, if we can't own everything. I think this fix addresses this issue.

      Show
      Jun Rao added a comment - That's a good point. What can happen is that we may delete ZK paths that c1 didn't successfully own in step 2, if rebalance fails. Added patch v2 that deletes all temporarily owned ZK paths in reflectPartitionOwnershipDecision, if we can't own everything. I think this fix addresses this issue.
      Hide
      Neha Narkhede added a comment -

      That's a good change and will handle the majority of failure cases. There is another failure case, I think still needs to be fixed in the rebalancing -

      Say, for the above mentioned scenario, c1 fails to rebalance due to some error/exception that exercises this code path -

      try

      { done = rebalance(cluster) }

      catch

      { case e => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) }

      After this, c1 only closes its fetcher queues and backs off (0-0 and 0-1 are already released), while c2 owns 0-1.
      Then during step 4 above, c1 releases things from its topic registry again which contains 0-0 and 0-1. So it releases 0-1, which it does not own anymore

      Show
      Neha Narkhede added a comment - That's a good change and will handle the majority of failure cases. There is another failure case, I think still needs to be fixed in the rebalancing - Say, for the above mentioned scenario, c1 fails to rebalance due to some error/exception that exercises this code path - try { done = rebalance(cluster) } catch { case e => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) } After this, c1 only closes its fetcher queues and backs off (0-0 and 0-1 are already released), while c2 owns 0-1. Then during step 4 above, c1 releases things from its topic registry again which contains 0-0 and 0-1. So it releases 0-1, which it does not own anymore
      Hide
      Jun Rao added a comment -

      Will this happen? It doesn't seem possible to me. In step 2, when we release 0-0 and 0-1 during rebalance, we clear topicRegistry. Since this rebalance fails, topicRegistry will not be populated. So, in step 4, there is nothing to release for c1.

      Show
      Jun Rao added a comment - Will this happen? It doesn't seem possible to me. In step 2, when we release 0-0 and 0-1 during rebalance, we clear topicRegistry. Since this rebalance fails, topicRegistry will not be populated. So, in step 4, there is nothing to release for c1.
      Hide
      Neha Narkhede added a comment -

      Yes, missed the fact that after releasing the partitions, it also gets deleted from the passed in topic registry. Looks good now, we will have to be careful about keeping this topic registry cache in sync at all times though. I like the idea of refreshing the cache from ZK during each rebalance attempt, but we can look into that later.

      +1 on v2.

      Show
      Neha Narkhede added a comment - Yes, missed the fact that after releasing the partitions, it also gets deleted from the passed in topic registry. Looks good now, we will have to be careful about keeping this topic registry cache in sync at all times though. I like the idea of refreshing the cache from ZK during each rebalance attempt, but we can look into that later. +1 on v2.
      Hide
      Jun Rao added a comment -

      Thanks for the review. Just committed this to trunk.

      Show
      Jun Rao added a comment - Thanks for the review. Just committed this to trunk.

        People

        • Assignee:
          Jun Rao
          Reporter:
          Jun Rao
        • Votes:
          0 Vote for this issue
          Watchers:
          0 Start watching this issue

          Dates

          • Created:
            Updated:
            Resolved:

            Development