diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cfabfc1..35dbc60 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -197,10 +197,18 @@ class KafkaApis(val requestChannel: RequestChannel, .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end)) ProduceResult(topicAndPartition, start, end) } catch { + // NOTE: Failed produce requests is not incremented here since failed produce requests metric is supposed + // to indicate failure of a broker in handling a produce request for a partition it is the leader for case e: KafkaStorageException => fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) Runtime.getRuntime.halt(1) null + case utpe: UnknownTopicOrPartitionException => + warn(utpe.getMessage) + new ProduceResult(topicAndPartition, utpe) + case nle: NotLeaderForPartitionException => + warn(nle.getMessage) + new ProduceResult(topicAndPartition, nle) case e => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() @@ -278,7 +286,15 @@ class KafkaApis(val requestChannel: RequestChannel, new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages) } } catch { - case t: Throwable => + // NOTE: Failed fetch requests is not incremented here since failed fetch requests metric is supposed + // to indicate failure of a broker in handling a fetch request for a partition it is the leader for + case utpe: UnknownTopicOrPartitionException => + warn(utpe.getMessage) + new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) + case nle: NotLeaderForPartitionException => + warn(nle.getMessage) + new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) + case t => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() error("error when processing request " + (topic, partition, offset, fetchSize), t) @@ -344,6 +360,14 @@ class KafkaApis(val requestChannel: RequestChannel, } (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) } catch { + // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages + // are typically transient and there is no value in logging the entire stack trace for the same + case utpe: UnknownTopicOrPartitionException => + warn(utpe.getMessage) + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) ) + case nle: NotLeaderForPartitionException => + warn(nle.getMessage) + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) case e => warn("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )