From 163ad2b662f1d8747924ac293a52e7acd8305986 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Mon, 29 Jun 2015 21:01:23 -0700 Subject: [PATCH] KAFKA-2306; New producer should emit metrics for buffer exhaustion --- .../kafka/clients/producer/KafkaProducer.java | 11 +++++++++++ .../kafka/clients/producer/internals/Sender.java | 10 ++++++++++ 2 files changed, 21 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5671a3f..a701799 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -133,6 +133,7 @@ public class KafkaProducer implements Producer { private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; + private final Sensor droppedRecords; private final Time time; private final Serializer keySerializer; private final Serializer valueSerializer; @@ -250,6 +251,7 @@ public class KafkaProducer implements Producer { this.ioThread.start(); this.errors = this.metrics.sensor("errors"); + this.droppedRecords = this.metrics.sensor("droppedRecords"); if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -403,6 +405,15 @@ public class KafkaProducer implements Producer { } catch (InterruptedException e) { this.errors.record(); throw new InterruptException(e); + } catch (BufferExhaustedException e) { + this.errors.record(); + this.droppedRecords.record(); + + String topicDropName = "topic." + record.topic() + ".record-drops"; + Sensor topicDropSensor = this.metrics.getSensor(topicDropName); + if (topicDropSensor != null) + topicDropSensor.record(); + throw e; } catch (KafkaException e) { this.errors.record(); throw e; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 0baf16e..dcd184c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -346,6 +346,7 @@ public class Sender implements Runnable { private final Metrics metrics; public final Sensor retrySensor; public final Sensor errorSensor; + public final Sensor droppedRecordSensor; public final Sensor queueTimeSensor; public final Sensor requestTimeSensor; public final Sensor recordsPerRequestSensor; @@ -395,6 +396,10 @@ public class Sender implements Runnable { m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags); this.errorSensor.add(m, new Rate()); + this.droppedRecordSensor = metrics.sensor("droppedRecords"); + m = new MetricName("record-drop-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags); + this.droppedRecordSensor.add(m, new Rate()); + this.maxRecordSizeSensor = metrics.sensor("record-size-max"); m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags); this.maxRecordSizeSensor.add(m, new Max()); @@ -449,6 +454,11 @@ public class Sender implements Runnable { Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); m = new MetricName("record-error-rate", metricGrpName , metricTags); topicErrorSensor.add(m, new Rate()); + + String topicDropName = "topic." + topic + ".record-drops"; + Sensor topicDropSensor = this.metrics.sensor(topicDropName); + m = new MetricName("record-drop-rate", metricGrpName , metricTags); + topicDropSensor.add(m, new Rate()); } } -- 1.7.9.5