From ad28fc887dd0f83521c968da4f1b0a9c83684daa Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Mon, 6 Jul 2015 14:52:47 -0700 Subject: [PATCH] KAFKA-2306; New producer should emit metrics for buffer exhaustion --- .../kafka/clients/producer/KafkaProducer.java | 10 ++++++++++ .../kafka/clients/producer/internals/Sender.java | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5671a3f..bcc99f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -133,6 +133,7 @@ public class KafkaProducer implements Producer { private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; + private final Sensor bufferExhaustedRecords; private final Time time; private final Serializer keySerializer; private final Serializer valueSerializer; @@ -250,6 +251,7 @@ public class KafkaProducer implements Producer { this.ioThread.start(); this.errors = this.metrics.sensor("errors"); + this.bufferExhaustedRecords = this.metrics.sensor("bufferExhaustedRecords"); if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, @@ -403,6 +405,14 @@ public class KafkaProducer implements Producer { } catch (InterruptedException e) { this.errors.record(); throw new InterruptException(e); + } catch (BufferExhaustedException e) { + this.errors.record(); + this.bufferExhaustedRecords.record(); + String topicBufferExhaustedName = "topic." + record.topic() + ".record-buffer-exhausted"; + Sensor topicBufferExhaustedSensor = this.metrics.getSensor(topicBufferExhaustedName); + if (topicBufferExhaustedSensor != null) + topicBufferExhaustedSensor.record(); + throw e; } catch (KafkaException e) { this.errors.record(); throw e; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 0baf16e..ff8a4fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -346,6 +346,7 @@ public class Sender implements Runnable { private final Metrics metrics; public final Sensor retrySensor; public final Sensor errorSensor; + public final Sensor bufferExhaustedRecordSensor; public final Sensor queueTimeSensor; public final Sensor requestTimeSensor; public final Sensor recordsPerRequestSensor; @@ -395,6 +396,10 @@ public class Sender implements Runnable { m = new MetricName("record-error-rate", metricGrpName, "The average per-second number of record sends that resulted in errors", metricTags); this.errorSensor.add(m, new Rate()); + this.bufferExhaustedRecordSensor = metrics.sensor("bufferExhaustedRecords"); + m = new MetricName("record-buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags); + this.bufferExhaustedRecordSensor.add(m, new Rate()); + this.maxRecordSizeSensor = metrics.sensor("record-size-max"); m = new MetricName("record-size-max", metricGrpName, "The maximum record size", metricTags); this.maxRecordSizeSensor.add(m, new Max()); @@ -449,6 +454,11 @@ public class Sender implements Runnable { Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); m = new MetricName("record-error-rate", metricGrpName , metricTags); topicErrorSensor.add(m, new Rate()); + + String topicBufferExhaustedName = "topic." + topic + ".record-buffer-exhausted"; + Sensor topicBufferExhaustedSensor = this.metrics.sensor(topicBufferExhaustedName); + m = new MetricName("record-buffer-exhausted-rate", metricGrpName , metricTags); + topicBufferExhaustedSensor.add(m, new Rate()); } } -- 1.7.9.5