diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a6423f4..9670a59 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -22,16 +22,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; -import org.apache.kafka.clients.producer.internals.Metadata; -import org.apache.kafka.clients.producer.internals.Partitioner; -import org.apache.kafka.clients.producer.internals.RecordAccumulator; -import org.apache.kafka.clients.producer.internals.Sender; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.producer.internals.*; +import org.apache.kafka.common.*; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -47,6 +39,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,7 +129,9 @@ public class KafkaProducer implements Producer { this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); this.ioThread.start(); - this.errors = this.metrics.sensor("errors"); + // register global sensors and metrics + ProducerMetrics.registerGlobalMetrics(this.metrics); + this.errors = Utils.notNull(this.metrics.getSensor("errors")); config.logUnused(); log.debug("Kafka producer started"); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index ffd13ff..98f7648 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -89,6 +89,7 @@ public final class RecordAccumulator { registerMetrics(metrics); } + // add additional metrics that are exposed only in accumulator private void registerMetrics(Metrics metrics) { metrics.addMetric("waiting-threads", "The number of user threads blocked waiting for buffer memory to enqueue their records", diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 855ae84..9bd8be6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.ProducerMetrics; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.RetriableException; @@ -35,9 +36,6 @@ import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; @@ -774,27 +772,15 @@ public class Sender implements Runnable { public SenderMetrics(Metrics metrics) { this.metrics = metrics; - this.batchSizeSensor = metrics.sensor("batch-size"); - this.queueTimeSensor = metrics.sensor("queue-time"); - this.requestTimeSensor = metrics.sensor("request-time"); - this.recordsPerRequestSensor = metrics.sensor("records-per-request"); - this.retrySensor = metrics.sensor("record-retries"); - this.errorSensor = metrics.sensor("errors"); - this.maxRecordSizeSensor = metrics.sensor("record-size-max"); - this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); - this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate()); - this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg()); - this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max()); - this.queueTimeSensor.add("record-queue-time-avg", - "The average time in ms record batches spent in the record accumulator.", - new Avg()); - this.queueTimeSensor.add("record-queue-time-max", - "The maximum time in ms record batches spent in the record accumulator.", - new Max()); - this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate()); - this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate()); - this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg()); - this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max()); + this.batchSizeSensor = Utils.notNull(metrics.sensor(ProducerMetrics.BATCH_SIZE_SENSOR)); + this.queueTimeSensor = Utils.notNull(metrics.sensor(ProducerMetrics.QUEUE_TIME_SENSOR)); + this.requestTimeSensor = Utils.notNull(metrics.sensor(ProducerMetrics.REQUEST_TIME_SENSOR)); + this.recordsPerRequestSensor = Utils.notNull(metrics.sensor(ProducerMetrics.RECORDS_PER_REQUEST_SENSOR)); + this.retrySensor = Utils.notNull(metrics.sensor(ProducerMetrics.RETRY_SENSOR)); + this.errorSensor = Utils.notNull(metrics.sensor(ProducerMetrics.GLOBAL_ERROR_SENSOR)); + this.maxRecordSizeSensor = Utils.notNull(metrics.sensor(ProducerMetrics.MAX_RECORD_SIZE_SENSOR)); + + // add additional metrics that are exposed only in sender this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { public double measure(MetricConfig config, long now) { return inFlightRequests.totalInFlightRequests(); @@ -812,27 +798,25 @@ public class Sender implements Runnable { for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); int records = 0; + if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { - // per-topic record count - String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch"; - Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); - if (topicRecordCount == null) { - topicRecordCount = this.metrics.sensor(topicRecordsCountName); - topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate()); - } + // register all per-topic metrics at once + String topic = batch.topicPartition.topic(); + ProducerMetrics.maybeRegisterTopicMetrics(this.metrics, topic); + + // per-topic record send rate + String topicRecordsCountName = ProducerMetrics.TOPIC_PREFIX + topic + "." + ProducerMetrics.RECORDS_PER_BATCH_SENSOR; + Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName)); topicRecordCount.record(batch.recordCount); - // per-topic bytes-per-second - String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes"; - Sensor topicByteRate = this.metrics.getSensor(topicByteRateName); - if (topicByteRate == null) { - topicByteRate = this.metrics.sensor(topicByteRateName); - topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate()); - } + // per-topic bytes send rate + String topicByteRateName = ProducerMetrics.TOPIC_PREFIX + topic + "." + ProducerMetrics.BYTES_PER_BATCH_SENSOR; + Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); topicByteRate.record(batch.records.sizeInBytes()); + // global metrics this.batchSizeSensor.record(batch.records.sizeInBytes(), ns); this.queueTimeSensor.record(batch.drained - batch.created, ns); this.maxRecordSizeSensor.record(batch.maxRecordSize, ns); @@ -845,36 +829,23 @@ public class Sender implements Runnable { public void recordRetries(String topic, int count) { this.retrySensor.record(count); - String topicRetryName = "topic." + topic + ".record-retries"; - Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor == null) { - topicRetrySensor = this.metrics.sensor(topicRetryName); - topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate()); - } + String topicRetryName = ProducerMetrics.TOPIC_PREFIX + topic + "." + ProducerMetrics.RETRY_SENSOR; + Sensor topicRetrySensor = Utils.notNull(this.metrics.getSensor(topicRetryName)); topicRetrySensor.record(count); } public void recordErrors(String topic, int count) { this.errorSensor.record(count); - String topicErrorName = "topic." + topic + ".record-errors"; - Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor == null) { - topicErrorSensor = this.metrics.sensor(topicErrorName); - topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate()); - } + String topicErrorName = ProducerMetrics.TOPIC_PREFIX + topic + "." + ProducerMetrics.TOPIC_ERROR_SENSOR; + Sensor topicErrorSensor = Utils.notNull(this.metrics.getSensor(topicErrorName)); topicErrorSensor.record(count); } public void recordLatency(int node, long latency, long nowNs) { this.requestTimeSensor.record(latency, nowNs); if (node >= 0) { - String nodeTimeName = "server." + node + ".latency"; - Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime == null) { - nodeRequestTime = this.metrics.sensor(nodeTimeName); - nodeRequestTime.add("node-" + node + ".latency-avg", new Avg()); - nodeRequestTime.add("node-" + node + ".latency-max", new Max()); - } + String nodeTimeName = ProducerMetrics.BROKER_PREFIX + node + "." + ProducerMetrics.REQUEST_LATENCY_SENSOR; + Sensor nodeRequestTime = Utils.notNull(this.metrics.getSensor(nodeTimeName)); nodeRequestTime.record(latency, nowNs); } } diff --git a/clients/src/main/java/org/apache/kafka/common/ProducerMetrics.java b/clients/src/main/java/org/apache/kafka/common/ProducerMetrics.java new file mode 100644 index 0000000..f989f03 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/ProducerMetrics.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; + +import java.util.concurrent.TimeUnit; + +/** + * A collection of sensors for the producer + */ +public class ProducerMetrics { + + static public final String TOPIC_PREFIX = "topic."; + + static public final String BROKER_PREFIX = "node-"; + + // sensor names + static public final String RETRY_SENSOR = "record-retries"; + + static public final String GLOBAL_ERROR_SENSOR = "errors"; + + static public final String TOPIC_ERROR_SENSOR = "record-errors"; + + static public final String QUEUE_TIME_SENSOR = "queue-time"; + + static public final String REQUEST_TIME_SENSOR = "request-time"; + + static public final String REQUEST_LATENCY_SENSOR = "latency"; + + static public final String RECORDS_PER_REQUEST_SENSOR = "records-per-request"; + + static public final String RECORDS_PER_BATCH_SENSOR = "records-per-batch"; + + static public final String BYTES_PER_BATCH_SENSOR = "bytes"; + + static public final String BATCH_SIZE_SENSOR = "batch-size"; + + static public final String MAX_RECORD_SIZE_SENSOR = "record-size-max"; + + static public final String CONNECTION_CLOSED_SENSOR = "connections-closed"; + + static public final String CONNECTION_CREATED_SENSOR = "connections-created"; + + static public final String BYTES_TRANSFERRED_SENSOR = "bytes-sent-received"; + + static public final String BYTES_SENT_SENSOR = "bytes-sent"; + + static public final String BYTES_RECEIVED_SENSOR = "bytes-received"; + + static public final String SELECT_TIME_SENSOR = "select-time"; + + static public final String IO_TIME_SENSOR = "io-time"; + + // sensor attribute names + static public final String BATCH_SIZE_AVG_ATTR = "batch-size-avg"; + static public final String BATCH_SIZE_AVG_ATTR_DOC = "The average number of bytes sent per partition per-request."; + + static public final String RETRY_RATE_ATTR = "record-retry-rate"; + static public final String RETRY_RATE_ATTR_DOC = "The average per-second number of retried record sends"; + + static public final String ERROR_RATE_ATTR = "record-error-rate"; + static public final String ERROR_RATE_ATTR_DOC = "The average per-second number of record sends that resulted in errors"; + + static public final String REQUEST_LATENCY_AVG_ATTR = "request-latency-avg"; + static public final String REQUEST_LATENCY_AVG_ATTR_DOC = "The average request latency in ms"; + + static public final String REQUEST_LATENCY_MAX_ATTR = "request-latency-max"; + static public final String REQUEST_LATENCY_MAX_ATTR_DOC = "The maximum request latency in ms"; + + static public final String QUEUE_TIME_AVG_ATTR = "record-queue-time-avg"; + static public final String QUEUE_TIME_AVG_ATTR_DOC = "The average time in ms record batches spent in the record accumulator."; + + static public final String QUEUE_TIME_MAX_ATTR = "record-queue-time-max"; + static public final String QUEUE_TIME_MAX_ATTR_DOC = "The maximum time in ms record batches spent in the record accumulator."; + + static public final String RECORD_SEND_RATE_ATTR = "record-send-rate"; + static public final String RECORD_SEND_RATE_ATTR_DOC = "The average number of records sent per second."; + + static public final String BYTE_SEND_RATE_ATTR = "byte-rate"; + static public final String BYTE_SEND_RATE_ATTR_DOC = "The average number of bytes sent per second."; + + static public final String RECORD_PER_REQUEST_AVG_ATTR = "records-per-request-avg"; + static public final String RECORD_PER_REQUEST_AVG_ATTR_DOC = "The average number of records per request."; + + static public final String RECORD_SIZE_MAX_ATTR = "record-size-max"; + static public final String RECORD_SIZE_MAX_ATTR_DOC = "The maximum record size"; + + static public final String NETWORK_IO_RATE_ATTR = "network-io-rate"; + static public final String NETWORK_IO_RATE_ATTR_DOC = "The average number of network operations (reads or writes) on all connections per second."; + + static public final String OUTGOING_BYTE_RATE_ATTR = "outgoing-byte-rate"; + static public final String OUTGOING_BYTE_RATE_ATTR_DOC = "Number of outgoing bytes sent per second to all servers."; + + static public final String REQUEST_RATE_ATTR = "request-rate"; + static public final String REQUEST_RATE_ATTR_DOC = "Number of requests sent per second."; + + static public final String REQUEST_SIZE_AVG_ATTR = "request-size-avg"; + static public final String REQUEST_SIZE_AVG_ATTR_DOC = "The average size of all requests sent in the window."; + + static public final String REQUEST_SIZE_MAX_ATTR = "request-size-max"; + static public final String REQUEST_SIZE_MAX_ATTR_DOC = "The maximum size of any requests sent in the window."; + + static public final String INCOMING_BYTE_RATE_ATTR = "incoming-byte-rate"; + static public final String INCOMING_BYTE_RATE_ATTR_DOC = "Number of incoming bytes read per second from all sockets"; + + static public final String RESPONSE_RATE_ATTR = "response-rate"; + static public final String RESPONSE_RATE_ATTR_DOC = "Number of responses received per second."; + + static public final String CONNECTION_CREATION_RATE_ATTR = "connection-creation-rate"; + static public final String CONNECTION_CREATION_RATE_ATTR_DOC = "New connections established per second in the window."; + + static public final String CONNECTION_CLOSE_RATE_ATTR = "connection-close-rate"; + static public final String CONNECTION_CLOSE_RATE_ATTR_DOC = "Connections closed per second in the window."; + + static public final String SELECT_RATE_ATTR = "select-rate"; + static public final String SELECT_RATE_ATTR_DOC = "Number of times the I/O layer checked for new I/O to perform per second"; + + static public final String IO_WAIT_TIME_AVG_ATTR = "io-wait-time-ns-avg"; + static public final String IO_WAIT_TIME_AVG_ATTR_DOC = "The average time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds."; + + static public final String IO_WAIT_RATIO_ATTR = "io-wait-ratio"; + static public final String IO_WAIT_RATIO_ATTR_DOC = "The fraction of time the I/O thread spent waiting."; + + static public final String IO_TIME_AVT_ATTR = "io-time-ns-avg"; + static public final String IO_TIME_AVT_ATTR_DOC = "The average time for I/O per select call in nanoseconds."; + + static public final String IO_RATIO_ATTR = "io-ratio"; + static public final String IO_RATIO_ATTR_DOC = "The fraction of time the I/O thread spent doing I/O"; + + static public void registerGlobalMetrics(Metrics metrics) { + Sensor retrySensor = metrics.sensor(RETRY_SENSOR); + retrySensor.add(RETRY_RATE_ATTR, RETRY_RATE_ATTR_DOC, new Rate()); + + Sensor errorSensor = metrics.sensor(GLOBAL_ERROR_SENSOR); + errorSensor.add(ERROR_RATE_ATTR, ERROR_RATE_ATTR_DOC, new Rate()); + + Sensor queueTimeSensor = metrics.sensor(QUEUE_TIME_SENSOR); + queueTimeSensor.add(QUEUE_TIME_AVG_ATTR, QUEUE_TIME_AVG_ATTR_DOC, new Avg()); + queueTimeSensor.add(QUEUE_TIME_MAX_ATTR, QUEUE_TIME_MAX_ATTR_DOC, new Max()); + + Sensor requestTimeSensor = metrics.sensor(REQUEST_TIME_SENSOR); + requestTimeSensor.add(REQUEST_LATENCY_AVG_ATTR, REQUEST_LATENCY_AVG_ATTR_DOC, new Avg()); + requestTimeSensor.add(REQUEST_LATENCY_MAX_ATTR, REQUEST_LATENCY_MAX_ATTR_DOC, new Max()); + + Sensor recordsPerRequestSensor = metrics.sensor(RECORDS_PER_REQUEST_SENSOR); + recordsPerRequestSensor.add(RECORD_SEND_RATE_ATTR, RECORD_SEND_RATE_ATTR_DOC, new Rate()); + recordsPerRequestSensor.add(RECORD_PER_REQUEST_AVG_ATTR, RECORD_PER_REQUEST_AVG_ATTR_DOC, new Avg()); + + Sensor batchSizeSensor = metrics.sensor(BATCH_SIZE_SENSOR); + batchSizeSensor.add(BATCH_SIZE_AVG_ATTR, BATCH_SIZE_AVG_ATTR_DOC, new Avg()); + + Sensor maxRecordSizeSensor = metrics.sensor(MAX_RECORD_SIZE_SENSOR); + maxRecordSizeSensor.add(RECORD_SIZE_MAX_ATTR, RECORD_SIZE_MAX_ATTR_DOC, new Max()); + + Sensor connectionCreated = metrics.sensor(CONNECTION_CREATED_SENSOR); + connectionCreated.add(CONNECTION_CREATION_RATE_ATTR, CONNECTION_CREATION_RATE_ATTR_DOC, new Rate()); + + Sensor connectionClosed = metrics.sensor(CONNECTION_CLOSED_SENSOR); + connectionClosed.add(CONNECTION_CLOSE_RATE_ATTR, CONNECTION_CLOSE_RATE_ATTR_DOC, new Rate()); + + Sensor bytesTransferred = metrics.sensor(BYTES_TRANSFERRED_SENSOR); + bytesTransferred.add(NETWORK_IO_RATE_ATTR, NETWORK_IO_RATE_ATTR_DOC, new Rate(new Count())); + + Sensor bytesSent = metrics.sensor(BYTES_SENT_SENSOR, bytesTransferred); + bytesSent.add(OUTGOING_BYTE_RATE_ATTR, OUTGOING_BYTE_RATE_ATTR_DOC, new Rate()); + bytesSent.add(REQUEST_RATE_ATTR, REQUEST_RATE_ATTR_DOC, new Rate(new Count())); + bytesSent.add(REQUEST_SIZE_AVG_ATTR, REQUEST_SIZE_AVG_ATTR_DOC, new Avg()); + bytesSent.add(REQUEST_SIZE_MAX_ATTR, REQUEST_SIZE_MAX_ATTR_DOC, new Max()); + + Sensor bytesReceived = metrics.sensor(BYTES_RECEIVED_SENSOR, bytesTransferred); + bytesReceived.add(INCOMING_BYTE_RATE_ATTR, INCOMING_BYTE_RATE_ATTR_DOC, new Rate()); + bytesReceived.add(RESPONSE_RATE_ATTR, RESPONSE_RATE_ATTR_DOC, new Rate(new Count())); + + Sensor selectTime = metrics.sensor(SELECT_TIME_SENSOR); + selectTime.add(SELECT_RATE_ATTR, SELECT_RATE_ATTR_DOC, new Rate(new Count())); + selectTime.add(IO_WAIT_TIME_AVG_ATTR, IO_WAIT_TIME_AVG_ATTR_DOC, new Avg()); + selectTime.add(IO_WAIT_RATIO_ATTR, IO_WAIT_RATIO_ATTR_DOC, new Rate(TimeUnit.NANOSECONDS)); + + Sensor ioTime = metrics.sensor(IO_TIME_SENSOR); + ioTime.add(IO_TIME_AVT_ATTR, IO_TIME_AVT_ATTR_DOC, new Avg()); + ioTime.add(IO_RATIO_ATTR, IO_RATIO_ATTR_DOC, new Rate(TimeUnit.NANOSECONDS)); + } + + static public void maybeRegisterTopicMetrics(Metrics metrics, String topic) { + // if one sensor of the metrics has been registered for the topic, + // then all other sensors should have been registered; and vice versa + String topicRecordsCountName = TOPIC_PREFIX + topic + "." + RECORDS_PER_BATCH_SENSOR; + Sensor topicRecordCount = metrics.getSensor(topicRecordsCountName); + if (topicRecordCount == null) { + topicRecordCount = metrics.sensor(topicRecordsCountName); + topicRecordCount.add(TOPIC_PREFIX + topic + "." + RECORD_SEND_RATE_ATTR, new Rate()); + + String topicByteRateName = TOPIC_PREFIX + topic + "." + BYTES_PER_BATCH_SENSOR; + Sensor topicByteRate = metrics.sensor(topicByteRateName); + topicByteRate.add(TOPIC_PREFIX + topic + "." + BYTE_SEND_RATE_ATTR, new Rate()); + + String topicRetryName = TOPIC_PREFIX + topic + "." + RETRY_SENSOR; + Sensor topicRetrySensor = topicRetrySensor = metrics.sensor(topicRetryName); + topicRetrySensor.add(TOPIC_PREFIX + topic + "." + RETRY_RATE_ATTR, new Rate()); + + String topicErrorName = TOPIC_PREFIX + topic + "." + TOPIC_ERROR_SENSOR; + Sensor topicErrorSensor = metrics.sensor(topicErrorName); + topicErrorSensor.add(TOPIC_PREFIX + topic + "." + ERROR_RATE_ATTR, new Rate()); + } + } + + static public void maybeRegisterNodeMetrics(Metrics metrics, int node) { + if (node >= 0) { + // if one sensor of the metrics has been registered for the node, + // then all other sensors should have been registered; and vice versa + String nodeRequestName = BROKER_PREFIX + node + "." + BYTES_SENT_SENSOR; + Sensor nodeRequest = metrics.getSensor(nodeRequestName); + if (nodeRequest == null) { + nodeRequest = metrics.sensor(nodeRequestName); + nodeRequest.add(BROKER_PREFIX + node + "." + OUTGOING_BYTE_RATE_ATTR, OUTGOING_BYTE_RATE_ATTR_DOC, new Rate()); + nodeRequest.add(BROKER_PREFIX + node + "." + REQUEST_RATE_ATTR, REQUEST_RATE_ATTR_DOC, new Rate(new Count())); + nodeRequest.add(BROKER_PREFIX + node + "." + REQUEST_SIZE_AVG_ATTR, REQUEST_SIZE_AVG_ATTR_DOC, new Avg()); + nodeRequest.add(BROKER_PREFIX + node + "." + REQUEST_SIZE_MAX_ATTR, REQUEST_SIZE_MAX_ATTR_DOC, new Max()); + + String nodeResponseName = BROKER_PREFIX + node + "." + BYTES_RECEIVED_SENSOR; + Sensor nodeResponse = metrics.sensor(nodeResponseName); + nodeResponse.add(BROKER_PREFIX + node + "." + INCOMING_BYTE_RATE_ATTR, INCOMING_BYTE_RATE_ATTR_DOC, new Rate()); + nodeResponse.add(BROKER_PREFIX + node + "." + RESPONSE_RATE_ATTR, RESPONSE_RATE_ATTR_DOC, new Rate(new Count())); + + String nodeTimeName = BROKER_PREFIX + node + "." + REQUEST_LATENCY_SENSOR; + Sensor nodeRequestTime = metrics.sensor(nodeTimeName); + nodeRequestTime.add(BROKER_PREFIX + node + "." + REQUEST_LATENCY_AVG_ATTR, REQUEST_LATENCY_AVG_ATTR_DOC, new Avg()); + nodeRequestTime.add(BROKER_PREFIX + node + "." + REQUEST_LATENCY_MAX_ATTR, REQUEST_LATENCY_MAX_ATTR_DOC, new Max()); + } + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 558f8b4..0df4a5d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -25,18 +25,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.ProducerMetrics; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Count; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +77,7 @@ public class Selector implements Selectable { private final List connected; private final Time time; private final SelectorMetrics sensors; + private final Metrics metrics; /** * Create a new selector @@ -96,6 +94,7 @@ public class Selector implements Selectable { this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); + this.metrics = metrics; this.sensors = new SelectorMetrics(metrics); } @@ -221,6 +220,10 @@ public class Selector implements Selectable { Transmissions transmissions = transmissions(key); SocketChannel channel = channel(key); + + // register all per-broker metrics at once + ProducerMetrics.maybeRegisterNodeMetrics(this.metrics, transmissions.id); + try { /* complete any connections that have finished their handshake */ if (key.isConnectable()) { @@ -401,33 +404,15 @@ public class Selector implements Selectable { public SelectorMetrics(Metrics metrics) { this.metrics = metrics; - this.connectionClosed = this.metrics.sensor("connections-closed"); - this.connectionCreated = this.metrics.sensor("connections-created"); - this.bytesTransferred = this.metrics.sensor("bytes-sent-received"); - this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred); - this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred); - this.selectTime = this.metrics.sensor("select-time"); - this.ioTime = this.metrics.sensor("io-time"); - bytesTransferred.add("network-io-rate", - "The average number of network operations (reads or writes) on all connections per second.", - new Rate(new Count())); - this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate()); - this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count())); - this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg()); - this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max()); - this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate()); - this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count())); - this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate()); - this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate()); - this.selectTime.add("select-rate", - "Number of times the I/O layer checked for new I/O to perform per second", - new Rate(new Count())); - this.selectTime.add("io-wait-time-ns-avg", - "The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.", - new Avg()); - this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS)); - this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg()); - this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS)); + this.connectionClosed = Utils.notNull(this.metrics.getSensor(ProducerMetrics.CONNECTION_CLOSED_SENSOR)); + this.connectionCreated = Utils.notNull(this.metrics.getSensor(ProducerMetrics.CONNECTION_CREATED_SENSOR)); + this.bytesTransferred = Utils.notNull(this.metrics.getSensor(ProducerMetrics.BYTES_TRANSFERRED_SENSOR)); + this.bytesSent = Utils.notNull(this.metrics.getSensor(ProducerMetrics.BYTES_SENT_SENSOR)); + this.bytesReceived = Utils.notNull(this.metrics.getSensor(ProducerMetrics.BYTES_RECEIVED_SENSOR)); + this.selectTime = Utils.notNull(this.metrics.getSensor(ProducerMetrics.SELECT_TIME_SENSOR)); + this.ioTime = Utils.notNull(this.metrics.getSensor(ProducerMetrics.IO_TIME_SENSOR)); + + // add additional metrics that are exposed only in selector this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() { public double measure(MetricConfig config, long now) { return keys.size(); @@ -438,32 +423,18 @@ public class Selector implements Selectable { public void recordBytesSent(int node, int bytes) { this.bytesSent.record(bytes); if (node >= 0) { - String name = "node-" + node + ".bytes-sent"; - Sensor sensor = this.metrics.getSensor(name); - if (sensor == null) { - sensor = this.metrics.sensor(name); - sensor.add("node-" + node + ".outgoing-byte-rate", new Rate()); - sensor.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count())); - sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); - sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); - } - sensor.record(bytes); + String nodeRequestName = ProducerMetrics.BROKER_PREFIX + node + "." + ProducerMetrics.BYTES_SENT_SENSOR; + Sensor nodeRequest = Utils.notNull(this.metrics.getSensor(nodeRequestName)); + nodeRequest.record(bytes); } } public void recordBytesReceived(int node, int bytes) { this.bytesReceived.record(bytes); if (node >= 0) { - String name = "node-" + node + ".bytes-received"; - Sensor sensor = this.metrics.getSensor(name); - if (sensor == null) { - sensor = this.metrics.sensor(name); - sensor.add("node-" + node + ".incoming-byte-rate", new Rate()); - sensor.add("node-" + node + ".response-rate", - "The average number of responses received per second.", - new Rate(new Count())); - } - sensor.record(bytes); + String nodeRequestName = ProducerMetrics.BROKER_PREFIX + node + "." + ProducerMetrics.BYTES_RECEIVED_SENSOR; + Sensor nodeRequest = Utils.notNull(this.metrics.getSensor(nodeRequestName)); + nodeRequest.record(bytes); } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index a2b7722..786f119 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.ProducerMetrics; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.NetworkReceive; @@ -51,31 +52,40 @@ public class SenderTest { private static final int SEND_BUFFER_SIZE = 64 * 1024; private static final int RECEIVE_BUFFER_SIZE = 64 * 1024; - private TopicPartition tp = new TopicPartition("test", 0); - private MockTime time = new MockTime(); - private MockSelector selector = new MockSelector(time); private int batchSize = 16 * 1024; + private TopicPartition tp = new TopicPartition("test", 0); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster("test", 1); - private Metrics metrics = new Metrics(time); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time); - private Sender sender = new Sender(selector, - metadata, - this.accumulator, - CLIENT_ID, - MAX_REQUEST_SIZE, - RECONNECT_BACKOFF_MS, - ACKS_ALL, - MAX_RETRIES, - REQUEST_TIMEOUT_MS, - SEND_BUFFER_SIZE, - RECEIVE_BUFFER_SIZE, - metrics, - time); + + private MockTime time; + private Metrics metrics; + private MockSelector selector; + private RecordAccumulator accumulator; + private Sender sender; @Before public void setup() { + this.time = new MockTime(); + this.metrics = new Metrics(time); + this.selector = new MockSelector(time); + this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time); + this.sender = new Sender(selector, + metadata, + this.accumulator, + CLIENT_ID, + MAX_REQUEST_SIZE, + RECONNECT_BACKOFF_MS, + ACKS_ALL, + MAX_RETRIES, + REQUEST_TIMEOUT_MS, + SEND_BUFFER_SIZE, + RECEIVE_BUFFER_SIZE, + metrics, + time); + metadata.update(cluster, time.milliseconds()); + ProducerMetrics.registerGlobalMetrics(metrics); + ProducerMetrics.maybeRegisterNodeMetrics(metrics, 0); } @Test @@ -104,22 +114,32 @@ public class SenderTest { public void testRetries() throws Exception { // create a sender with retries = 1 int maxRetries = 1; + Metrics metrics = new Metrics(time); + //MockSelector selector = new MockSelector(metrics, time); + RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time); + Sender sender = new Sender(selector, - metadata, - this.accumulator, - CLIENT_ID, - MAX_REQUEST_SIZE, - RECONNECT_BACKOFF_MS, - ACKS_ALL, - maxRetries, - REQUEST_TIMEOUT_MS, - SEND_BUFFER_SIZE, - RECEIVE_BUFFER_SIZE, - new Metrics(), - time); + metadata, + accumulator, + CLIENT_ID, + MAX_REQUEST_SIZE, + RECONNECT_BACKOFF_MS, + ACKS_ALL, + maxRetries, + REQUEST_TIMEOUT_MS, + SEND_BUFFER_SIZE, + RECEIVE_BUFFER_SIZE, + metrics, + time); + + ProducerMetrics.registerGlobalMetrics(metrics); + ProducerMetrics.maybeRegisterNodeMetrics(metrics, 0); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + sender.run(time.milliseconds()); RequestSend request1 = completeSend(sender); selector.clear(); + sender.run(time.milliseconds()); selector.completeReceive(produceResponse(request1.header().correlationId(), cluster.leaderFor(tp).id(), tp.topic(), diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 5c5e3d4..9adf5da 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.kafka.common.ProducerMetrics; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -45,12 +46,15 @@ public class SelectorTest { private EchoServer server; private Selectable selector; + private Metrics metrics; @Before public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(new Metrics(), new MockTime()); + this.metrics = new Metrics(); + ProducerMetrics.registerGlobalMetrics(metrics); + this.selector = new Selector(metrics, new MockTime()); } @After