From 68cd8675292a89f88d047355d0a3968822ae02c4 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 26 Nov 2014 12:18:48 -0800 Subject: [PATCH] KAFKA-1800.v1 --- .../apache/kafka/clients/producer/KafkaProducer.java | 17 ++++++++++++++--- .../apache/kafka/clients/producer/internals/Sender.java | 9 ++++++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 32f444e..0f32a72 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -246,15 +246,15 @@ public class KafkaProducer implements Producer { // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); + recordError(record.topic()); if (callback != null) callback.onCompletion(null, e); - this.errors.record(); return new FutureFailure(e); } catch (InterruptedException e) { - this.errors.record(); + recordError(record.topic()); throw new KafkaException(e); } catch (KafkaException e) { - this.errors.record(); + recordError(record.topic()); throw e; } } @@ -285,6 +285,17 @@ public class KafkaProducer implements Producer { } /** + * Record one record send error for the given topic + */ + private void recordError(String topic) { + this.errors.record(); + String topicErrorName = "topic." + topic + ".record-errors"; + Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); + if (topicErrorSensor != null) + topicErrorSensor.record(); + } + + /** * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 84a7a07..42ef1b9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -143,6 +143,12 @@ public class Sender implements Runnable { */ public void run(long now) { Cluster cluster = metadata.fetch(); + + // update per-topic metrics registry if possible + for (String topic : cluster.topics()) { + sensors.maybeRegisterTopicMetrics(topic); + } + // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); @@ -406,10 +412,7 @@ public class Sender implements Runnable { if (request.attachment() != null) { Map responseBatches = (Map) request.attachment(); for (RecordBatch batch : responseBatches.values()) { - - // register all per-topic metrics at once String topic = batch.topicPartition.topic(); - maybeRegisterTopicMetrics(topic); // per-topic record send rate String topicRecordsCountName = "topic." + topic + ".records-per-batch"; -- 1.7.12.4