Kafka
  1. Kafka
  2. KAFKA-984

Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same

    Details

    • Type: Bug Bug
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: 0.9.0
    • Component/s: None
    • Labels:
      None

      Description

      Currently a full rebalance will be triggered on high level consumers even when just a new topic is added to ZK. Better avoid this behavior but only rebalance on this newly added topic.

      1. KAFKA-984.v1.patch
        8 kB
        Guozhang Wang
      2. KAFKA-984.v2.patch
        14 kB
        Guozhang Wang
      3. KAFKA-984.v2.patch
        12 kB
        Guozhang Wang

        Activity

        Neha Narkhede made changes -
        Fix Version/s 0.9.0 [ 12323928 ]
        Fix Version/s 0.8.1 [ 12322960 ]
        Jun Rao made changes -
        Fix Version/s 0.8.1 [ 12322960 ]
        Fix Version/s 0.8 [ 12317244 ]
        Hide
        Jun Rao added a comment -

        Since the changes are relative large and subtle, we should probably revisit this post 0.8. Moving it to 0.8.1.

        Show
        Jun Rao added a comment - Since the changes are relative large and subtle, we should probably revisit this post 0.8. Moving it to 0.8.1.
        Hide
        Sriram Subramanian added a comment -

        Agreed. I share the same thought and have pushed back on this change. We can work around this for now by partitioning the topics between the different instances.

        Show
        Sriram Subramanian added a comment - Agreed. I share the same thought and have pushed back on this change. We can work around this for now by partitioning the topics between the different instances.
        Hide
        Jay Kreps added a comment -

        Guys I don't think we should be doing massive surgery on 0.8.

        Show
        Jay Kreps added a comment - Guys I don't think we should be doing massive surgery on 0.8.
        Hide
        Joel Koshy added a comment -

        Thanks for the patch - this will help a lot especially for mirroring.
        However, I share Jun's concern about making such a non-trivial change to
        0.8. In any event, here are some comments on
        scala.kafka.consumer.ZookeeperConsumerConnector

        • We should definitely abstract out the common code - syncedPartialRebalance
          and WildcardStreamsHandler. I think with some thought we can refactor it
          or we end up with copies of relatively complex code.
        • The filters on lines 432/433 will not have any effect (I think) since the
          maps are immutable. You should probably apply the filter on assignments on
          lines 428/429. So metadata for other topics will be fetched unnecessarily,
          and fetchers for other topics may be stopped unnecessarily.
        • Also, there are topic variables inside the method that shadow the
          parameter which makes it harder to determine which variable is in effect
          for which scope.
        • Logging can be improved/made more concise: few typos and inconsistencies
          in capitalization.
        • Why do this only if # added topics == 1? Can accept a list of topics to
          rebalance for instead right? I do see your note on Sriram's comments, but
          I don't see it in this jira. Can you include those comments?
        Show
        Joel Koshy added a comment - Thanks for the patch - this will help a lot especially for mirroring. However, I share Jun's concern about making such a non-trivial change to 0.8. In any event, here are some comments on scala.kafka.consumer.ZookeeperConsumerConnector We should definitely abstract out the common code - syncedPartialRebalance and WildcardStreamsHandler. I think with some thought we can refactor it or we end up with copies of relatively complex code. The filters on lines 432/433 will not have any effect (I think) since the maps are immutable. You should probably apply the filter on assignments on lines 428/429. So metadata for other topics will be fetched unnecessarily, and fetchers for other topics may be stopped unnecessarily. Also, there are topic variables inside the method that shadow the parameter which makes it harder to determine which variable is in effect for which scope. Logging can be improved/made more concise: few typos and inconsistencies in capitalization. Why do this only if # added topics == 1? Can accept a list of topics to rebalance for instead right? I do see your note on Sriram's comments, but I don't see it in this jira. Can you include those comments?
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Some comments:

        20. It seems that we should be able to deal with the addition of multiple topics, not just a single one.

        21. There is code in handleAddNewTopicForFilteredStreams very similar to what's in reinitializeConsumer (the part that generates the (topic,threadId)->queue mapping). It would be good if we can avoid the duplication.

        22. rebalance() currently closes all fetchers at the beginning. For syncdPartialRelance() to work, we have to prevent closing the fetchers for existing partitions.

        The fix seems bigger than I expected. So, I am not sure if we should fix it in 0.8 or trunk. Could you make another pass? We can make the call then. Finally, in the future, it would be clearer to upload new patches with a different version. If a version needs to be replaced, you can delete the old one.

        Show
        Jun Rao added a comment - Thanks for patch v2. Some comments: 20. It seems that we should be able to deal with the addition of multiple topics, not just a single one. 21. There is code in handleAddNewTopicForFilteredStreams very similar to what's in reinitializeConsumer (the part that generates the (topic,threadId)->queue mapping). It would be good if we can avoid the duplication. 22. rebalance() currently closes all fetchers at the beginning. For syncdPartialRelance() to work, we have to prevent closing the fetchers for existing partitions. The fix seems bigger than I expected. So, I am not sure if we should fix it in 0.8 or trunk. Could you make another pass? We can make the call then. Finally, in the future, it would be clearer to upload new patches with a different version. If a version needs to be replaced, you can delete the old one.
        Guozhang Wang made changes -
        Attachment KAFKA-984.v2.patch [ 12594079 ]
        Hide
        Guozhang Wang added a comment -

        Following Swapnil's comments, collapsing rebalance and rebalancePartial to reduce code duplication.

        Show
        Guozhang Wang added a comment - Following Swapnil's comments, collapsing rebalance and rebalancePartial to reduce code duplication.
        Hide
        Swapnil Ghike added a comment -

        Hey Guozhang, this looks like a reasonable approach. But we should probably do some code de-dup on this, let's talk tomorrow in person.

        Show
        Swapnil Ghike added a comment - Hey Guozhang, this looks like a reasonable approach. But we should probably do some code de-dup on this, let's talk tomorrow in person.
        Guozhang Wang made changes -
        Attachment KAFKA-984.v2.patch [ 12593814 ]
        Hide
        Guozhang Wang added a comment -

        Added one more unit test in ZookeeperConsumerConnectorTest for wildcard consumer.

        Show
        Guozhang Wang added a comment - Added one more unit test in ZookeeperConsumerConnectorTest for wildcard consumer.
        Guozhang Wang made changes -
        Attachment KAFKA-984.v1.patch [ 12593739 ]
        Guozhang Wang made changes -
        Field Original Value New Value
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Guozhang Wang added a comment -

        Following Sriram's comments, I am trying to propose a simpler solution for a special case of "the event handler reports only adding ONE new topic".

        1. Add a new function handleAddNewTopicForFilteredStreams, which updates topicThreadIdAndQueues and topicStreamsMap and directly calls syncedPartialRebalance(topic : String) of ZKRebalancerListener.

        2. syncedPartialRebalance will use the same rebalanceLock as syncedRebalance.

        2.1 For the added topic, first check if its ownership has not be claimed in ZK yet. If it is already claimed (i.e., ZK has the znode for this topic in owners), release the partitions first.

        2.2 Read the number of consumers of the group from ZK (we only do this once for all topics), and read the number of partitions of this topic, assign the partitions to consumers using the same deterministic algorithm.

        2.2 Try writing to the ZK for all added topics. If succeed, update fetchers (start new threads) and returns, otherwise also returns since it means a syncedRebalance will happen next.

        Considerata: to make things simple when a topic change and consumer/broker change happens at the same time:

        1 At the beginning of syncedPartialRebalance checks if isWatcherTriggered is true, if yes returns directly.
        2 During syncedPartialRebalance if the new topic has already been claimed (i.e., there are some znode in owners already), calls the syncedRebalance directly.

        The correctness of this is based on:

        1. If syncedRebalance is triggered after the syncedPartialRebalance, it will see the new topic.
        2. If syncedRebalance is triggered before the syncedPartialRebalance and does not see this new topic, a later syncedRebalance can still successfully do the rebalance.

        Show
        Guozhang Wang added a comment - Following Sriram's comments, I am trying to propose a simpler solution for a special case of "the event handler reports only adding ONE new topic". 1. Add a new function handleAddNewTopicForFilteredStreams, which updates topicThreadIdAndQueues and topicStreamsMap and directly calls syncedPartialRebalance(topic : String) of ZKRebalancerListener. 2. syncedPartialRebalance will use the same rebalanceLock as syncedRebalance. 2.1 For the added topic, first check if its ownership has not be claimed in ZK yet. If it is already claimed (i.e., ZK has the znode for this topic in owners), release the partitions first. 2.2 Read the number of consumers of the group from ZK (we only do this once for all topics), and read the number of partitions of this topic, assign the partitions to consumers using the same deterministic algorithm. 2.2 Try writing to the ZK for all added topics. If succeed, update fetchers (start new threads) and returns, otherwise also returns since it means a syncedRebalance will happen next. Considerata: to make things simple when a topic change and consumer/broker change happens at the same time: 1 At the beginning of syncedPartialRebalance checks if isWatcherTriggered is true, if yes returns directly. 2 During syncedPartialRebalance if the new topic has already been claimed (i.e., there are some znode in owners already), calls the syncedRebalance directly. The correctness of this is based on: 1. If syncedRebalance is triggered after the syncedPartialRebalance, it will see the new topic. 2. If syncedRebalance is triggered before the syncedPartialRebalance and does not see this new topic, a later syncedRebalance can still successfully do the rebalance.
        Hide
        Guozhang Wang added a comment -

        Approach:

        1. In ZookeeperConsumerConnector.reinitializeConsumer, add three additional arguments:

        partial : boolean

        Seq[String] : addedTopics

        Seq[String] : deletedTopics

        1. In WildcardStreamsHandler.handleTopicEvent, calling reinitializeConsumer with partial = true with addedTopics and deletedTopics set.

        2. In ZookeeperConsumerConnector.reinitializeConsumer, if partial == true, branch out the code starting from

        // map of

        {topic -> Set(thread-1, thread-2, ...)}

        val consumerThreadIdsPerTopic: Map[String, Set[String]] =
        topicCount.getConsumerThreadIdsPerTopic

        to

        val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
        groupedByTopic.foreach(e =>

        { val topic = e._1 val streams = e._2.map(_._2._2).toList topicStreamsMap += (topic -> streams) debug("adding topic %s and %d streams to map.".format(topic, streams.size)) }

        )

        But just update topicThreadIdAndQueues and topicStreamsMap

        • Note that we currently do not handle deleted topics, and this issue will not be fixed in this JIRA.

        3. Add another function syncedPartialRebalance in ZKRebalancerListener, and making the reinitializeConsumer to call this function if partial == true. syncedPartialRebalance will use the same rebalanceLock as syncedRebalance.

        4. ZKRebalancerListener keeps a list of topics it is currently consuming from in memory.

        4. In syncedPartialRebalance, which calls rebalanceForTopic, it first checks if there are any changes of the topics by reading the ZK and comparing with its in memory list. If no new topics or deleted topics found, return directly.

        4.1. For each deleted topic, simply call closeFetchers, and delete the ownership/offsets of the topic in ZK.

        4.2 For each added topic, read the number of consumers of the group from ZK (we only do this once for all topics), and read the number of partitions of this topic, assign the partitions to consumers using the same deterministic algorithm.

        4.3 Try writing to the ZK for all added topics. If succeed, update fetchers (start new threads) and return true, otherwise return false.

        Considerata:

        1. If a topic change and consumer/broker change happens at the same time, two consumers could trigger the corresponding syncedRebalance and syncedPartialRebalance at different orders. In this case we would prefer to make syncedPartialRebalance fail fast and make everyone entering the syncedRebalance phase. So one possible optimization is to check isWatcherTriggered at the beginning of syncedPartialRebalance, if it is set, return false directly. Also do not retry in syncedPartialRebalance.

        2. Stopping fecthers for certain topics, i.e., only sending a partial threadIdMap to closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]], relevantTopicThreadIdsMap: Map[String, Set[String]]) is not used before. Not clear if this works well.

        Show
        Guozhang Wang added a comment - Approach: 1. In ZookeeperConsumerConnector.reinitializeConsumer, add three additional arguments: partial : boolean Seq [String] : addedTopics Seq [String] : deletedTopics 1. In WildcardStreamsHandler.handleTopicEvent, calling reinitializeConsumer with partial = true with addedTopics and deletedTopics set. 2. In ZookeeperConsumerConnector.reinitializeConsumer, if partial == true, branch out the code starting from // map of {topic -> Set(thread-1, thread-2, ...)} val consumerThreadIdsPerTopic: Map[String, Set [String] ] = topicCount.getConsumerThreadIdsPerTopic to val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) groupedByTopic.foreach(e => { val topic = e._1 val streams = e._2.map(_._2._2).toList topicStreamsMap += (topic -> streams) debug("adding topic %s and %d streams to map.".format(topic, streams.size)) } ) But just update topicThreadIdAndQueues and topicStreamsMap Note that we currently do not handle deleted topics, and this issue will not be fixed in this JIRA. 3. Add another function syncedPartialRebalance in ZKRebalancerListener, and making the reinitializeConsumer to call this function if partial == true. syncedPartialRebalance will use the same rebalanceLock as syncedRebalance. 4. ZKRebalancerListener keeps a list of topics it is currently consuming from in memory. 4. In syncedPartialRebalance, which calls rebalanceForTopic, it first checks if there are any changes of the topics by reading the ZK and comparing with its in memory list. If no new topics or deleted topics found, return directly. 4.1. For each deleted topic, simply call closeFetchers, and delete the ownership/offsets of the topic in ZK. 4.2 For each added topic, read the number of consumers of the group from ZK (we only do this once for all topics), and read the number of partitions of this topic, assign the partitions to consumers using the same deterministic algorithm. 4.3 Try writing to the ZK for all added topics. If succeed, update fetchers (start new threads) and return true, otherwise return false. Considerata: 1. If a topic change and consumer/broker change happens at the same time, two consumers could trigger the corresponding syncedRebalance and syncedPartialRebalance at different orders. In this case we would prefer to make syncedPartialRebalance fail fast and make everyone entering the syncedRebalance phase. So one possible optimization is to check isWatcherTriggered at the beginning of syncedPartialRebalance, if it is set, return false directly. Also do not retry in syncedPartialRebalance. 2. Stopping fecthers for certain topics, i.e., only sending a partial threadIdMap to closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream [_,_] ]], relevantTopicThreadIdsMap: Map[String, Set [String] ]) is not used before. Not clear if this works well.
        Guozhang Wang created issue -

          People

          • Assignee:
            Guozhang Wang
            Reporter:
            Guozhang Wang
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:

              Development