From 5310789732532d7ed0de3856873e52eb439f49d6 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Mon, 6 Jul 2015 18:09:33 -0700 Subject: [PATCH] KAFKA-2306; New producer should emit metrics for buffer exhaustion --- .../kafka/clients/producer/KafkaProducer.java | 4 ++++ .../producer/internals/RecordAccumulator.java | 10 ++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 5671a3f..33b5b3d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -403,6 +403,10 @@ public class KafkaProducer implements Producer { } catch (InterruptedException e) { this.errors.record(); throw new InterruptException(e); + } catch (BufferExhaustedException e) { + this.errors.record(); + this.metrics.sensor("bufferExhaustedRecords").record(); + throw e; } catch (KafkaException e) { this.errors.record(); throw e; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 87dbd64..0380f90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; @@ -112,7 +114,6 @@ public final class RecordAccumulator { } private void registerMetrics(Metrics metrics, String metricGrpName, Map metricTags) { - MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags); Measurable waitingThreads = new Measurable() { public double measure(MetricConfig config, long now) { @@ -120,7 +121,7 @@ public final class RecordAccumulator { } }; metrics.addMetric(metricName, waitingThreads); - + metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags); Measurable totalBytes = new Measurable() { public double measure(MetricConfig config, long now) { @@ -128,6 +129,7 @@ public final class RecordAccumulator { } }; metrics.addMetric(metricName, totalBytes); + metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags); Measurable availableBytes = new Measurable() { public double measure(MetricConfig config, long now) { @@ -135,6 +137,10 @@ public final class RecordAccumulator { } }; metrics.addMetric(metricName, availableBytes); + + Sensor bufferExhaustedRecordSensor = metrics.sensor("bufferExhaustedRecords"); + metricName = new MetricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion", metricTags); + bufferExhaustedRecordSensor.add(metricName, new Rate()); } /** -- 1.7.9.5