Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-956

High-level consumer fails to check topic metadata response for errors

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: consumer
    • Labels:
      None

      Description

      In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:

      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
      brokers,
      config.clientId,
      config.socketTimeoutMs,
      correlationId.getAndIncrement).topicsMetadata
      val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
      topicsMetadata.foreach(m =>

      { val topic = m.topic val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) partitionsPerTopicMap.put(topic, partitions) }

      )

      The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

        Attachments

          Activity

            People

            • Assignee:
              nehanarkhede Neha Narkhede
              Reporter:
              smeder Sam Meder
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: