diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9b1f565..8013dbd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -179,7 +179,7 @@ public class Sender implements Runnable { /** * Run a single iteration of sending - * + * * @param nowMs The current POSIX time in milliseconds */ public void run(long nowMs) { @@ -715,7 +715,7 @@ public class Sender implements Runnable { /** * Can we send more requests to this node? - * + * * @param node Node in question * @return true iff we have no requests still being sent to the given node */ @@ -727,7 +727,7 @@ public class Sender implements Runnable { /** * Clear out all the in-flight requests for the given node and return them - * + * * @param node The node * @return All the in-flight requests for that node that have been removed */ @@ -760,6 +760,7 @@ public class Sender implements Runnable { public final Sensor requestTimeSensor; public final Sensor recordsPerRequestSensor; public final Sensor batchSizeSensor; + public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; public SenderMetrics(Metrics metrics) { @@ -768,6 +769,11 @@ public class Sender implements Runnable { this.batchSizeSensor = metrics.sensor("batch-size"); this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg()); + this.compressionRateSensor = metrics.sensor("compression-rate"); + this.compressionRateSensor.add("batch-compression-rate-avg", + "The average compression rate of record batches.", + new Avg()); + this.queueTimeSensor = metrics.sensor("queue-time"); this.queueTimeSensor.add("record-queue-time-avg", "The average time in ms record batches spent in the record accumulator.", @@ -818,6 +824,10 @@ public class Sender implements Runnable { Sensor topicByteRate = this.metrics.sensor(topicByteRateName); topicByteRate.add("topic." + topic + ".byte-rate", new Rate()); + String topicCompressionRateName = "topic." + topic + ".compression-rate"; + Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName); + topicCompressionRate.add("topic." + topic + ".compression-rate", new Avg()); + String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate()); @@ -851,9 +861,15 @@ public class Sender implements Runnable { Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); topicByteRate.record(batch.records.sizeInBytes()); + // per-topic compression rate + String topicCompressionRateName = "topic." + topic + ".compression-rate"; + Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName)); + topicCompressionRate.record(batch.records.compressionRate()); + // global metrics this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs); this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs); + this.compressionRateSensor.record(batch.records.compressionRate()); this.maxRecordSizeSensor.record(batch.maxRecordSize, nowMs); records += batch.recordCount; } @@ -876,7 +892,7 @@ public class Sender implements Runnable { this.errorSensor.record(count, nowMs); String topicErrorName = "topic." + topic + ".record-errors"; Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName); - if (topicErrorSensor != null) + if (topicErrorSensor != null) topicErrorSensor.record(count, nowMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0fa6dd2..0323f5f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -80,6 +80,14 @@ public class Compressor { public ByteBuffer buffer() { return bufferStream.buffer(); } + + public double compressionRate() { + ByteBuffer buffer = bufferStream.buffer(); + if (this.writtenUncompressed == 0) + return 1.0; + else + return (double) buffer.position() / this.writtenUncompressed; + } public void close() { try { diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 428968c..15c9577 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -122,6 +122,16 @@ public class MemoryRecords implements Records { public int sizeInBytes() { return compressor.buffer().position(); } + + /** + * The compression rate of this record set + */ + public double compressionRate() { + if (compressor == null) + return 1.0; + else + return compressor.compressionRate(); + } /** * Return the capacity of the buffer