Index: core/src/main/scala/kafka/consumer/FetcherRunnable.scala =================================================================== --- core/src/main/scala/kafka/consumer/FetcherRunnable.scala (revision 1227405) +++ core/src/main/scala/kafka/consumer/FetcherRunnable.scala (working copy) @@ -95,8 +95,8 @@ trace("fetched bytes: " + read) if(read == 0) { - debug("backing off " + config.backoffIncrementMs + " ms") - Thread.sleep(config.backoffIncrementMs) + debug("backing off " + config.fetcherBackoffMs + " ms") + Thread.sleep(config.fetcherBackoffMs) } } } Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1227405) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -26,7 +26,7 @@ val SocketBufferSize = 64*1024 val FetchSize = 300 * 1024 val MaxFetchSize = 10*FetchSize - val BackoffIncrementMs = 1000 + val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 10 * 1000 val MaxQueuedChunks = 100 @@ -67,7 +67,7 @@ /** to avoid repeatedly polling a broker node which has no new data we will backoff every time we get an empty set from the broker*/ - val backoffIncrementMs: Long = Utils.getInt(props, "backoff.increment.ms", BackoffIncrementMs) + val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit) @@ -81,6 +81,9 @@ /** max number of retries during rebalance */ val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries) + /** backoff time between retries during rebalance */ + val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs) + /* what to do if an offset is out of range. smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1227405) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -442,7 +442,7 @@ // release all partitions, reset state and retry releasePartitionOwnership() resetState() - Thread.sleep(config.zkSyncTimeMs) + Thread.sleep(config.rebalanceBackoffMs) } }