From 8cf9728f5f00d15cf997d76bbddde9d1a25e3274 Mon Sep 17 00:00:00 2001 From: Sam Meder Date: Fri, 16 Aug 2013 18:26:52 +0200 Subject: [PATCH] KAFKA-1010 Fix concurrency issue in getCluster() by moving it into retry loop --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 3 ++- core/src/main/scala/kafka/tools/ImportZkOffsets.scala | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 17977e7..c2b9b9a 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -399,8 +399,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, for (i <- 0 until config.rebalanceMaxRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false - val cluster = getCluster(zkClient) + var cluster: Cluster = null try { + cluster = getCluster(zkClient) done = rebalance(cluster) } catch { case e => diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 63519e1..55709b5 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -96,9 +96,6 @@ object ImportZkOffsets extends Logging { } private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { - val cluster = ZkUtils.getCluster(zkClient) - var partitions: List[String] = Nil - for ((partition, offset) <- partitionOffsets) { debug("updating [" + partition + "] with offset [" + offset + "]") -- 1.8.3.1