diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9b1f565..8194e48 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -817,6 +817,10 @@ public class Sender implements Runnable { String topicByteRateName = "topic." + topic + ".bytes"; Sensor topicByteRate = this.metrics.sensor(topicByteRateName); topicByteRate.add("topic." + topic + ".byte-rate", new Rate()); + + String topicCompressionRateName = "topic." + topic + ".compression-rate"; + Sensor topicCompressionRate = this.metrics.sensor(topicCompressionRateName); + topicCompressionRate.add("topic." + topic + ".compression-rate", new Avg()); String topicRetryName = "topic." + topic + ".record-retries"; Sensor topicRetrySensor = this.metrics.sensor(topicRetryName); @@ -851,6 +855,11 @@ public class Sender implements Runnable { Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); topicByteRate.record(batch.records.sizeInBytes()); + // per-topic compression rate + String topicCompressionRateName = "topic." + topic + ".compression-rate"; + Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName)); + topicCompressionRate.record(batch.records.compressionRate()); + // global metrics this.batchSizeSensor.record(batch.records.sizeInBytes(), nowMs); this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, nowMs); diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0fa6dd2..0323f5f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -80,6 +80,14 @@ public class Compressor { public ByteBuffer buffer() { return bufferStream.buffer(); } + + public double compressionRate() { + ByteBuffer buffer = bufferStream.buffer(); + if (this.writtenUncompressed == 0) + return 1.0; + else + return (double) buffer.position() / this.writtenUncompressed; + } public void close() { try { diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 428968c..15c9577 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -122,6 +122,16 @@ public class MemoryRecords implements Records { public int sizeInBytes() { return compressor.buffer().position(); } + + /** + * The compression rate of this record set + */ + public double compressionRate() { + if (compressor == null) + return 1.0; + else + return compressor.compressionRate(); + } /** * Return the capacity of the buffer