Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-956

High-level consumer fails to check topic metadata response for errors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • consumer
    • None

    Description

      In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:

      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
      brokers,
      config.clientId,
      config.socketTimeoutMs,
      correlationId.getAndIncrement).topicsMetadata
      val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
      topicsMetadata.foreach(m =>

      { val topic = m.topic val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) partitionsPerTopicMap.put(topic, partitions) }

      )

      The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

      Attachments

        Activity

          People

            nehanarkhede Neha Narkhede
            smeder Sam Meder
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: