diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 9a4e4bc..74c1b75 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -146,8 +146,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) - val partitionIndex = getPartition(message.key, topicPartitionsList) - val brokerPartition = topicPartitionsList(partitionIndex) + val brokerPartition = getPartition(message.key, topicPartitionsList) // postpone the failure until the send operation, so that requests for other brokers are handled correctly val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) @@ -197,26 +196,26 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param topicPartitionList the list of available partitions * @return the partition id */ - private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { + private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): PartitionAndLeader = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + "\n Valid values are > 0") - val partition = - if(key == null) { - // If the key is null, we don't really need a partitioner so we just send to the next - // available partition - val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) - if (availablePartitions.isEmpty) - throw new LeaderNotAvailableException("No leader for any partition") - val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size - availablePartitions(index).partitionId - } else - partitioner.partition(key, numPartitions) - if(partition < 0 || partition >= numPartitions) - throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + - "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") - partition + if(key == null) { + // If the key is null, we don't really need a partitioner so we just send to the next + // available partition + val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) + if (availablePartitions.isEmpty) + throw new LeaderNotAvailableException("No leader for any partition") + val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size + availablePartitions(index) + } else { + val partitionIndex = partitioner.partition(key, numPartitions) + if(partitionIndex < 0 || partitionIndex >= numPartitions) + throw new UnknownTopicOrPartitionException("Invalid partition id : " + partitionIndex + + "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") + topicPartitionList(partitionIndex) + } } /**