diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 82e6e4d..aad91c3 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -77,16 +77,10 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) - if(tmd.errorCode == ErrorMapping.NoError){ + if(tmd.errorCode == ErrorMapping.NoError) topicPartitionInfo.put(tmd.topic, tmd) - } else - warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) - tmd.partitionsMetadata.foreach(pmd =>{ - if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { - warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, - ErrorMapping.exceptionFor(pmd.errorCode).getClass)) - } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata - }) + else + warn("Error while fetching metadata [%s] for topic [%s] due to: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) }) producerPool.updateProducer(topicsMetadata) } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 89cb27d..a9365db 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -147,7 +147,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) - val partitionIndex = getPartition(message.key, topicPartitionsList) + val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList) val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly @@ -175,9 +175,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } Some(ret) }catch { // Swallow recoverable exceptions and return None so that they can be retried. - case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None - case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None - case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe + case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages due to: " + ute.getMessage); None + case lnae: LeaderNotAvailableException => warn("Failed to collate messages due to: " + lnae.getMessage); None + case oe => error("Failed to collate messages by topic, partition due to: ", oe); throw oe } } @@ -194,29 +194,29 @@ class DefaultEventHandler[K,V](config: ProducerConfig, /** * Retrieves the partition id and throws an UnknownTopicOrPartitionException if * the value of partition is not between 0 and numPartitions-1 + * @param topic the topic * @param key the partition key * @param topicPartitionList the list of available partitions * @return the partition id */ - private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { + private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) - throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + - "\n Valid values are > 0") + throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner so we just send to the next // available partition val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) - throw new LeaderNotAvailableException("No leader for any partition") + throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size availablePartitions(index).partitionId } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) - throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + - "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") + throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + " for topic " + topic + + "\n Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") partition }