Kafka
  1. Kafka
  2. KAFKA-956

High-level consumer fails to check topic metadata response for errors

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: consumer
    • Labels:
      None

      Description

      In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:

      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
      brokers,
      config.clientId,
      config.socketTimeoutMs,
      correlationId.getAndIncrement).topicsMetadata
      val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
      topicsMetadata.foreach(m =>

      { val topic = m.topic val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) partitionsPerTopicMap.put(topic, partitions) }

      )

      The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

        Activity

        Hide
        Sam Meder added a comment -

        I can confirm that this issue was fixed by KAFKA-1030

        Show
        Sam Meder added a comment - I can confirm that this issue was fixed by KAFKA-1030
        Hide
        Neha Narkhede added a comment -

        Sam Meder KAFKA-1030 is now checked in and the problem reported in this JIRA should be fixed. Can you confirm that and close this JIRA?

        Show
        Neha Narkhede added a comment - Sam Meder KAFKA-1030 is now checked in and the problem reported in this JIRA should be fixed. Can you confirm that and close this JIRA?
        Hide
        Jun Rao added a comment -

        This can probably be fixed as part of kafka-1030.

        Show
        Jun Rao added a comment - This can probably be fixed as part of kafka-1030.
        Hide
        Neha Narkhede added a comment -

        The root cause of this issue is the fact that a broker, that is not really ready to serve requests, ends up serving requests and misleading clients. When a broker starts up, it expects to receive an UpdateMetadata request from the controller. Until this happens, the broker should explicitly return a BrokerNotReady error code. Upon receiving this error code, the client should try to send the request to another broker. In the specific example of rebalance, the consumer will get BrokerNotReady error code and will try fetching metadata from all the brokers at least once before giving up. A similar problem exists on the producer side. If you rolling bounce a Kafka cluster when several thousands of producer clients are connected to the cluster, and auto creation of topics is turned on, it creates a storm of topic metadata requests turning into create topic requests to the brokers. The brokers spend a lot of time trying to create topics since they don't yet know that the topic exists.

        You could argue that a broker that is not ready should not accept connections and probably not even start the socket server until it is ready to serve requests. But currently since the broker uses the same socket server to communicate with the controller, this is not an easy fix to put in 0.8

        Show
        Neha Narkhede added a comment - The root cause of this issue is the fact that a broker, that is not really ready to serve requests, ends up serving requests and misleading clients. When a broker starts up, it expects to receive an UpdateMetadata request from the controller. Until this happens, the broker should explicitly return a BrokerNotReady error code. Upon receiving this error code, the client should try to send the request to another broker. In the specific example of rebalance, the consumer will get BrokerNotReady error code and will try fetching metadata from all the brokers at least once before giving up. A similar problem exists on the producer side. If you rolling bounce a Kafka cluster when several thousands of producer clients are connected to the cluster, and auto creation of topics is turned on, it creates a storm of topic metadata requests turning into create topic requests to the brokers. The brokers spend a lot of time trying to create topics since they don't yet know that the topic exists. You could argue that a broker that is not ready should not accept connections and probably not even start the socket server until it is ready to serve requests. But currently since the broker uses the same socket server to communicate with the controller, this is not an easy fix to put in 0.8
        Hide
        Sam Meder added a comment -

        I don't think it has anything to do with topic creation, for us the problem occurs with existing topics as well. It occurs when the consumer is started too close to starting the Kafka server...

        Show
        Sam Meder added a comment - I don't think it has anything to do with topic creation, for us the problem occurs with existing topics as well. It occurs when the consumer is started too close to starting the Kafka server...
        Hide
        Jun Rao added a comment -

        This problem seems to only occur if a consumer is started at the same time when the topic is created. Do you have lots of dynamically created topics? A workaround could be just creating those topics before the consumers are started.

        Show
        Jun Rao added a comment - This problem seems to only occur if a consumer is started at the same time when the topic is created. Do you have lots of dynamically created topics? A workaround could be just creating those topics before the consumers are started.
        Hide
        Sam Meder added a comment -

        I guess we can carry my patch locally for now (we don't use more than one topic per consumer right now), but it doesn't seem great to have a corner case that can basically deadlock the consumer. I did look into whether it would be possible to scope rebalancing to just a subset of the topics for the consumer, but it looks like that would require quite a bit of detangling of the listener from the zookeeper consumer class. Not something I would put in 0.8 right now, but is that something your intern would tackle as part of the offset work? Seems worthwhile...

        Show
        Sam Meder added a comment - I guess we can carry my patch locally for now (we don't use more than one topic per consumer right now), but it doesn't seem great to have a corner case that can basically deadlock the consumer. I did look into whether it would be possible to scope rebalancing to just a subset of the topics for the consumer, but it looks like that would require quite a bit of detangling of the listener from the zookeeper consumer class. Not something I would put in 0.8 right now, but is that something your intern would tackle as part of the offset work? Seems worthwhile...
        Hide
        Jun Rao added a comment -

        One possible solution is to let the consumer read the partition data from ZK directly. This way, if a consumer finds out that a topic doesn't exist, a ZK watcher is guaranteed to be triggered when the topic is created later. The only problem is that if there are many topics, reading them one at a time from ZK can be slow. ZK 3.4.x has the multi api support and we do plan to upgrade to that version. Perhaps we can revisit this issue at that point?

        Show
        Jun Rao added a comment - One possible solution is to let the consumer read the partition data from ZK directly. This way, if a consumer finds out that a topic doesn't exist, a ZK watcher is guaranteed to be triggered when the topic is created later. The only problem is that if there are many topics, reading them one at a time from ZK can be slow. ZK 3.4.x has the multi api support and we do plan to upgrade to that version. Perhaps we can revisit this issue at that point?
        Hide
        Sam Meder added a comment -

        Seems like we need to allow for rebalancing only a subset of topics and keep trying on the rest?

        Show
        Sam Meder added a comment - Seems like we need to allow for rebalancing only a subset of topics and keep trying on the rest?
        Hide
        Jun Rao added a comment -

        Got it. The weird part is that if a topic exists, why would the metadata request return an empty list of partitions (with the LeaderNotAvailableCode)? One case that I can think of is that the metadata may not have been propagated to all brokers yet. Since metadata propagation is quick in general, if we retry rebalance (which will happen with your patch), chances are that we will pick up the updated metadata. However, there is an issue. If a topic doesn't exist (and auto topic creation is turned off), throwing a KafkaException when there is no partition will prevent the consumer from starting. Instead, we should let rebalance proceed. When the topic is created in the future, a ZK watcher will be fired and a rebalance will be triggered. I am not sure what's the best way to deal with both cases. Need to think about this a bit more.

        Show
        Jun Rao added a comment - Got it. The weird part is that if a topic exists, why would the metadata request return an empty list of partitions (with the LeaderNotAvailableCode)? One case that I can think of is that the metadata may not have been propagated to all brokers yet. Since metadata propagation is quick in general, if we retry rebalance (which will happen with your patch), chances are that we will pick up the updated metadata. However, there is an issue. If a topic doesn't exist (and auto topic creation is turned off), throwing a KafkaException when there is no partition will prevent the consumer from starting. Instead, we should let rebalance proceed. When the topic is created in the future, a ZK watcher will be fired and a rebalance will be triggered. I am not sure what's the best way to deal with both cases. Need to think about this a bit more.
        Hide
        Sam Meder added a comment -

        Basically the scenario is (roughly):

        1. Start consumer(s)
        2. Start Kafka server with pre-existing topic

        Consumer notices server is up, but gets error response on on topic metadata with error code mapping to exception LeaderNotAvailable exception (this is the error on the topic metadata response, not individual partition metadata items).

        Show
        Sam Meder added a comment - Basically the scenario is (roughly): 1. Start consumer(s) 2. Start Kafka server with pre-existing topic Consumer notices server is up, but gets error response on on topic metadata with error code mapping to exception LeaderNotAvailable exception (this is the error on the topic metadata response, not individual partition metadata items).
        Hide
        Sam Meder added a comment -

        Right, but the topic already exists at the time of failure, so nothing is going to trigger. The error I am seeing in this case indicates that no leaders exist across any of the partitions.

        Show
        Sam Meder added a comment - Right, but the topic already exists at the time of failure, so nothing is going to trigger. The error I am seeing in this case indicates that no leaders exist across any of the partitions.
        Hide
        Jun Rao added a comment -

        We register a ZK watcher on the subscribed topics. If a topic is created subsequently, a ZK watcher will be fired and will we call the rebalance logic to pick up the new partitions.

        Show
        Jun Rao added a comment - We register a ZK watcher on the subscribed topics. If a topic is created subsequently, a ZK watcher will be fired and will we call the rebalance logic to pick up the new partitions.
        Hide
        Sam Meder added a comment -

        Regarding 1. - how does the consumer recover from having no partition information for a topic? That seems to me the crux of the problem, which is why I was throwing an exception.

        Show
        Sam Meder added a comment - Regarding 1. - how does the consumer recover from having no partition information for a topic? That seems to me the crux of the problem, which is why I was throwing an exception.
        Hide
        Jun Rao added a comment -

        Thanks for the patch. A couple of comments:

        1. When there is an error in metadata response, we shouldn't throw an exception. A consumer may be consuming multiple topics and we should let topics with no error in metadata response proceeds. So, instead, we could add the topic and an empty sequence to partitionsPerTopicMap. The rest of the logic in rebalance seems to be able to handle the case when there is no partition in a topic.

        2. In the following statement, we should probably log that the string is for a metadata response.
        debug(m.string)

        Show
        Jun Rao added a comment - Thanks for the patch. A couple of comments: 1. When there is an error in metadata response, we shouldn't throw an exception. A consumer may be consuming multiple topics and we should let topics with no error in metadata response proceeds. So, instead, we could add the topic and an empty sequence to partitionsPerTopicMap. The rest of the logic in rebalance seems to be able to handle the case when there is no partition in a topic. 2. In the following statement, we should probably log that the string is for a metadata response. debug(m.string)
        Hide
        Sam Meder added a comment -

        Patch to throw an exception if no partition info is available.

        Show
        Sam Meder added a comment - Patch to throw an exception if no partition info is available.

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Sam Meder
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development