Kafka
  1. Kafka
  2. KAFKA-956

High-level consumer fails to check topic metadata response for errors

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: consumer
    • Labels:
      None

      Description

      In our environment we noticed that consumers would sometimes hang when started too close to starting the Kafka server. I tracked this down and it seems to be related to some code in rebalance (ZookeeperConsumerConnector.scala). In particular the following code seems problematic:

      val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
      brokers,
      config.clientId,
      config.socketTimeoutMs,
      correlationId.getAndIncrement).topicsMetadata
      val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
      topicsMetadata.foreach(m =>

      { val topic = m.topic val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) partitionsPerTopicMap.put(topic, partitions) }

      )

      The response is never checked for error, so may not actually contain any partition info! Rebalance goes its merry way, but doesn't know about any partitions so never assigns them...

        Activity

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Sam Meder
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development