diff --git a/core/src/main/scala/kafka/common/UnknownTopicException.scala b/core/src/main/scala/kafka/common/UnknownTopicException.scala deleted file mode 100644 index 710d3bf..0000000 --- a/core/src/main/scala/kafka/common/UnknownTopicException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for a topic, that hasn't been created in a Kafka cluster - */ -class UnknownTopicException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 1a74951..084b8e6 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, try { for (message <- messages) { val topicPartitionsList = getPartitionListForTopic(message) - val partitionIndex = getPartition(message.key, topicPartitionsList) + val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList) val brokerPartition = topicPartitionsList(partitionIndex) // postpone the failure until the send operation, so that requests for other brokers are handled correctly @@ -177,9 +177,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } Some(ret) }catch { // Swallow recoverable exceptions and return None so that they can be retried. - case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None - case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None - case oe => error("Failed to collate messages by topic, partition due to", oe); None + case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None + case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None + case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } } @@ -200,25 +200,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param topicPartitionList the list of available partitions * @return the partition id */ - private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { + private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) - throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + - "\n Valid values are > 0") + throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner so we just send to the next // available partition val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) - throw new LeaderNotAvailableException("No leader for any partition") + throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size availablePartitions(index).partitionId } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) - throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + - "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") + throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") partition }