diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 7b3f09d..025d3ab 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -7,6 +7,7 @@ import kafka.producer._ import kafka.common.KafkaException import kafka.utils.{Utils, Logging} import java.util.Properties +import util.Random /** * Helper functions common to clients (producer, consumer, or admin) @@ -26,9 +27,12 @@ object ClientUtils extends Logging{ val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null - while(i < brokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i)) - info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId, topics.size, topics)) + // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the + // same broker + val shuffledBrokers = Random.shuffle(brokers) + while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) { + val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i)) + info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics)) try { topicMetadataResponse = producer.send(topicMetadataRequest) fetchMetaDataSucceeded = true @@ -36,7 +40,7 @@ object ClientUtils extends Logging{ catch { case e => warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" - .format(correlationId, topics, brokers(i).toString), e) + .format(correlationId, topics, shuffledBrokers(i).toString), e) t = e } finally { i = i + 1 @@ -44,7 +48,7 @@ object ClientUtils extends Logging{ } } if(!fetchMetaDataSucceeded){ - throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t) + throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t) } else { debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) }