diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 855ae84..9f2b2e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -774,27 +774,35 @@ public class Sender implements Runnable { public SenderMetrics(Metrics metrics) { this.metrics = metrics; + this.batchSizeSensor = metrics.sensor("batch-size"); + this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); + this.queueTimeSensor = metrics.sensor("queue-time"); + this.queueTimeSensor.add("record-queue-time-avg", + "The average time in ms record batches spent in the record accumulator.", + new Avg()); + this.queueTimeSensor.add("record-queue-time-max", + "The maximum time in ms record batches spent in the record accumulator.", + new Max()); + this.requestTimeSensor = metrics.sensor("request-time"); - this.recordsPerRequestSensor = metrics.sensor("records-per-request"); - this.retrySensor = metrics.sensor("record-retries"); - this.errorSensor = metrics.sensor("errors"); - this.maxRecordSizeSensor = metrics.sensor("record-size-max"); - this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); - this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate()); this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg()); this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max()); - this.queueTimeSensor.add("record-queue-time-avg", - "The average time in ms record batches spent in the record accumulator.", - new Avg()); - this.queueTimeSensor.add("record-queue-time-max", - "The maximum time in ms record batches spent in the record accumulator.", - new Max()); - this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate()); + + this.recordsPerRequestSensor = metrics.sensor("records-per-request"); this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate()); this.recordsPerRequestSensor.add("records-per-request-avg", "The average number of records per request.", new Avg()); + + this.retrySensor = metrics.sensor("record-retries"); + this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate()); + + this.errorSensor = metrics.sensor("errors"); + this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate()); + + this.maxRecordSizeSensor = metrics.sensor("record-size-max"); this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max()); + this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() { public double measure(MetricConfig config, long now) { return inFlightRequests.totalInFlightRequests(); @@ -807,32 +815,53 @@ public class Sender implements Runnable { }); } + public void maybeRegisterTopicMetrics(String topic) { + // if one sensor of the metrics has been registered for the topic, + // then all other sensors should have been registered; and vice versa + String topicRecordsCountName = "topic." + topic + ".records-per-batch"; + Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); + if (topicRecordCount == null) { + topicRecordCount = this.metrics.sensor(topicRecordsCountName); + topicRecordCount.add("topic." + topic + ".record-send-rate", new Rate()); + + String topicByteRateName = "topic." + topic + ".bytes"; + Sensor topicByteRate = this.metrics.sensor(topicByteRateName); + topicByteRate.add("topic." + topic + ".byte-rate", new Rate()); + + String topicRetryName = "topic." + topic + ".record-retries"; + Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); + topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate()); + + String topicErrorName = "topic." + topic + ".record-errors"; + Sensor topicErrorSensor = this.metrics.sensor(topicErrorName); + topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate()); + } + } + public void updateProduceRequestMetrics(List requests) { long ns = time.nanoseconds(); for (int i = 0; i < requests.size(); i++) { InFlightRequest request = requests.get(i); int records = 0; + if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { - // per-topic record count - String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch"; - Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName); - if (topicRecordCount == null) { - topicRecordCount = this.metrics.sensor(topicRecordsCountName); - topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate()); - } + // register all per-topic metrics at once + String topic = batch.topicPartition.topic(); + maybeRegisterTopicMetrics(topic); + + // per-topic record send rate + String topicRecordsCountName = "topic." + topic + ".records-per-batch"; + Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName)); topicRecordCount.record(batch.recordCount); - // per-topic bytes-per-second - String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes"; - Sensor topicByteRate = this.metrics.getSensor(topicByteRateName); - if (topicByteRate == null) { - topicByteRate = this.metrics.sensor(topicByteRateName); - topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate()); - } + // per-topic bytes send rate + String topicByteRateName = "topic." + topic + ".bytes"; + Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); topicByteRate.record(batch.records.sizeInBytes()); + // global metrics this.batchSizeSensor.record(batch.records.sizeInBytes(), ns); this.queueTimeSensor.record(batch.drained - batch.created, ns); this.maxRecordSizeSensor.record(batch.maxRecordSize, ns); @@ -847,35 +876,22 @@ public class Sender implements Runnable { this.retrySensor.record(count); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName); - if (topicRetrySensor == null) { - topicRetrySensor = this.metrics.sensor(topicRetryName); - topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate()); - } - topicRetrySensor.record(count); + if (topicRetrySensor != null) topicRetrySensor.record(count); } public void recordErrors(String topic, int count) { this.errorSensor.record(count); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor == null) { - topicErrorSensor = this.metrics.sensor(topicErrorName); - topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate()); - } - topicErrorSensor.record(count); + if (topicErrorSensor != null) topicErrorSensor.record(count); } public void recordLatency(int node, long latency, long nowNs) { this.requestTimeSensor.record(latency, nowNs); if (node >= 0) { - String nodeTimeName = "server." + node + ".latency"; + String nodeTimeName = "node-" + node + ".latency"; Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName); - if (nodeRequestTime == null) { - nodeRequestTime = this.metrics.sensor(nodeTimeName); - nodeRequestTime.add("node-" + node + ".latency-avg", new Avg()); - nodeRequestTime.add("node-" + node + ".latency-max", new Max()); - } - nodeRequestTime.record(latency, nowNs); + if (nodeRequestTime != null) nodeRequestTime.record(latency, nowNs); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 558f8b4..6027cb2 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -221,6 +221,10 @@ public class Selector implements Selectable { Transmissions transmissions = transmissions(key); SocketChannel channel = channel(key); + + // register all per-broker metrics at once + sensors.maybeRegisterNodeMetrics(transmissions.id); + try { /* complete any connections that have finished their handshake */ if (key.isConnectable()) { @@ -401,33 +405,41 @@ public class Selector implements Selectable { public SelectorMetrics(Metrics metrics) { this.metrics = metrics; + this.connectionClosed = this.metrics.sensor("connections-closed"); + this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate()); + this.connectionCreated = this.metrics.sensor("connections-created"); + this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate()); + this.bytesTransferred = this.metrics.sensor("bytes-sent-received"); - this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred); - this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred); - this.selectTime = this.metrics.sensor("select-time"); - this.ioTime = this.metrics.sensor("io-time"); bytesTransferred.add("network-io-rate", - "The average number of network operations (reads or writes) on all connections per second.", - new Rate(new Count())); + "The average number of network operations (reads or writes) on all connections per second.", + new Rate(new Count())); + + this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred); this.bytesSent.add("outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers.", new Rate()); this.bytesSent.add("request-rate", "The average number of requests sent per second.", new Rate(new Count())); this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg()); this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max()); + + this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred); this.bytesReceived.add("incoming-byte-rate", "Bytes/second read off all sockets", new Rate()); this.bytesReceived.add("response-rate", "Responses received sent per second.", new Rate(new Count())); - this.connectionCreated.add("connection-creation-rate", "New connections established per second in the window.", new Rate()); - this.connectionClosed.add("connection-close-rate", "Connections closed per second in the window.", new Rate()); + + this.selectTime = this.metrics.sensor("select-time"); this.selectTime.add("select-rate", - "Number of times the I/O layer checked for new I/O to perform per second", - new Rate(new Count())); + "Number of times the I/O layer checked for new I/O to perform per second", + new Rate(new Count())); this.selectTime.add("io-wait-time-ns-avg", - "The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.", - new Avg()); + "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", + new Avg()); this.selectTime.add("io-wait-ratio", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS)); + + this.ioTime = this.metrics.sensor("io-time"); this.ioTime.add("io-time-ns-avg", "The average length of time for I/O per select call in nanoseconds.", new Avg()); this.ioTime.add("io-ratio", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS)); + this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() { public double measure(MetricConfig config, long now) { return keys.size(); @@ -435,35 +447,49 @@ public class Selector implements Selectable { }); } + public void maybeRegisterNodeMetrics(int node) { + if (node >= 0) { + // if one sensor of the metrics has been registered for the node, + // then all other sensors should have been registered; and vice versa + String nodeRequestName = "node-" + node + ".bytes-sent"; + Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); + if (nodeRequest == null) { + nodeRequest = this.metrics.sensor(nodeRequestName); + nodeRequest.add("node-" + node + ".outgoing-byte-rate", new Rate()); + nodeRequest.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count())); + nodeRequest.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); + nodeRequest.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); + + String nodeResponseName = "node-" + node + ".bytes-received"; + Sensor nodeResponse = this.metrics.sensor(nodeResponseName); + nodeResponse.add("node-" + node + ".incoming-byte-rate", new Rate()); + nodeResponse.add("node-" + node + ".response-rate", + "The average number of responses received per second.", + new Rate(new Count())); + + String nodeTimeName = "node-" + node + ".latency"; + Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName); + nodeRequestTime.add("node-" + node + ".request-latency-avg", new Avg()); + nodeRequestTime.add("node-" + node + ".request-latency-max", new Max()); + } + } + } + public void recordBytesSent(int node, int bytes) { this.bytesSent.record(bytes); if (node >= 0) { - String name = "node-" + node + ".bytes-sent"; - Sensor sensor = this.metrics.getSensor(name); - if (sensor == null) { - sensor = this.metrics.sensor(name); - sensor.add("node-" + node + ".outgoing-byte-rate", new Rate()); - sensor.add("node-" + node + ".request-rate", "The average number of requests sent per second.", new Rate(new Count())); - sensor.add("node-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg()); - sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max()); - } - sensor.record(bytes); + String nodeRequestName = "node-" + node + ".bytes-sent"; + Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); + if (nodeRequest != null) nodeRequest.record(bytes); } } public void recordBytesReceived(int node, int bytes) { this.bytesReceived.record(bytes); if (node >= 0) { - String name = "node-" + node + ".bytes-received"; - Sensor sensor = this.metrics.getSensor(name); - if (sensor == null) { - sensor = this.metrics.sensor(name); - sensor.add("node-" + node + ".incoming-byte-rate", new Rate()); - sensor.add("node-" + node + ".response-rate", - "The average number of responses received per second.", - new Rate(new Count())); - } - sensor.record(bytes); + String nodeRequestName = "node-" + node + ".bytes-received"; + Sensor nodeRequest = this.metrics.getSensor(nodeRequestName); + if (nodeRequest != null) nodeRequest.record(bytes); } } }