Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1205689) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -30,6 +30,7 @@ val AutoCommit = true val AutoCommitInterval = 10 * 1000 val MaxQueuedChunks = 100 + val MaxRebalanceRetries = 4 val AutoOffsetReset = OffsetRequest.SmallestTimeString val ConsumerTimeoutMs = -1 val MirrorTopicsWhitelist = "" @@ -77,6 +78,9 @@ /** max number of messages buffered for consumption */ val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks) + /** max number of retries during rebalance */ + val maxRebalanceRetries = Utils.getInt(props, "rebalance,retries.max", MaxRebalanceRetries) + /* what to do if an offset is out of range. smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1205689) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -67,7 +67,6 @@ * */ private[kafka] object ZookeeperConsumerConnector { - val MAX_N_RETRIES = 4 val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) } @@ -424,7 +423,7 @@ def syncedRebalance() { rebalanceLock synchronized { - for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) { + for (i <- 0 until config.maxRebalanceRetries) { info("begin rebalancing consumer " + consumerIdString + " try #" + i) var done = false try { @@ -447,7 +446,7 @@ } } - throw new RuntimeException(consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.MAX_N_RETRIES +" retires") + throw new RuntimeException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retires") } private def rebalance(): Boolean = {