diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 4cbb52c..de8d950 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -55,7 +55,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.producerRetryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those - Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) + val uniqueTopics = outstandingProduceRequests.map(_.getTopic).toSet.toSeq + Utils.swallowError(brokerPartitionInfo.updateInfo(uniqueTopics)) remainingRetries -= 1 ProducerStats.resendRate.mark() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 737e6fb..1f74282 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -388,7 +388,8 @@ class KafkaApis(val requestChannel: RequestChannel, val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() val config = replicaManager.config - val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) + val uniqueTopics = metadataRequest.topics.toSet.toSeq + val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(uniqueTopics, zkClient) metadataRequest.topics.zip(topicMetadataList).foreach( topicAndMetadata => { val topic = topicAndMetadata._1