diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cfabfc1..be42c08 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -201,6 +201,16 @@ class KafkaApis(val requestChannel: RequestChannel, fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) Runtime.getRuntime.halt(1) null + case utpe: UnknownTopicOrPartitionException => + // NOTE: Failed produce requests is not incremented here since failed produce requests metric is supposed + // to indicate failure of a broker in handling a produce request for a partition it is the leader for + warn(utpe.getMessage) + new ProduceResult(topicAndPartition, utpe) + case nle: NotLeaderForPartitionException => + // NOTE: Failed produce requests is not incremented here since failed produce requests metric is supposed + // to indicate failure of a broker in handling a produce request for a partition it is the leader for + warn(nle.getMessage) + new ProduceResult(topicAndPartition, nle) case e => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() @@ -278,7 +288,17 @@ class KafkaApis(val requestChannel: RequestChannel, new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) } } catch { - case t: Throwable => + case utpe: UnknownTopicOrPartitionException => + // NOTE: Failed fetch requests is not incremented here since failed fetch requests metric is supposed + // to indicate failure of a broker in handling a fetch request for a partition it is the leader for + warn(utpe.getMessage) + new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) + case nle: NotLeaderForPartitionException => + // NOTE: Failed fetch requests is not incremented here since failed fetch requests metric is supposed + // to indicate failure of a broker in handling a fetch request for a partition it is the leader for + warn(nle.getMessage) + new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) + case t => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() error("error when processing request " + (topic, partition, offset, fetchSize), t) @@ -344,6 +364,12 @@ class KafkaApis(val requestChannel: RequestChannel, } (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) } catch { + case utpe: UnknownTopicOrPartitionException => + warn(utpe.getMessage) + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) + case nle: NotLeaderForPartitionException => + warn(nle.getMessage) + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case e => warn("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )