From cf2d39b32332174cacdcc7bd9ca2adff7d6b3a84 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Tue, 24 Mar 2015 10:50:37 -0500 Subject: [PATCH 1/2] CompressionType is passed in each RecordAccumulator append --- .../kafka/clients/producer/KafkaProducer.java | 6 ++-- .../producer/internals/RecordAccumulator.java | 7 +++-- .../producer/internals/RecordAccumulatorTest.java | 34 +++++++++++----------- .../clients/producer/internals/SenderTest.java | 8 ++--- 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ab26342..07ed531 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -131,7 +131,6 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; - private final CompressionType compressionType; private final Sensor errors; private final Time time; private final Serializer keySerializer; @@ -211,11 +210,12 @@ public class KafkaProducer implements Producer { this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); - this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + CompressionType compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, + compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), @@ -376,7 +376,7 @@ public class KafkaProducer implements Producer { ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 88b4e4f..0e7ab29 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -59,6 +59,7 @@ public final class RecordAccumulator { private volatile AtomicInteger flushesInProgress; private int drainIndex; private final int batchSize; + private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; private final BufferPool free; @@ -71,6 +72,7 @@ public final class RecordAccumulator { * * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances * @param totalSize The maximum memory the record accumulator can use. + * @param compression The compression codec for the records * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some * latency for potentially better throughput due to more batching (and hence fewer, larger requests). @@ -84,6 +86,7 @@ public final class RecordAccumulator { */ public RecordAccumulator(int batchSize, long totalSize, + CompressionType compression, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, @@ -94,6 +97,7 @@ public final class RecordAccumulator { this.closed = false; this.flushesInProgress = new AtomicInteger(0); this.batchSize = batchSize; + this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; this.batches = new CopyOnWriteMap>(); @@ -139,10 +143,9 @@ public final class RecordAccumulator { * @param tp The topic/partition to which this record is being sent * @param key The key for the record * @param value The value for the record - * @param compression The compression codec for the record * @param callback The user-supplied callback to execute when the request is complete */ - public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException { + public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback) throws InterruptedException { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // check if we have an in-progress batch diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index e379ac8..05e2929 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -64,13 +64,13 @@ public class RecordAccumulatorTest { @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); List batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); @@ -88,16 +88,16 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, CompressionType.NONE, 0L, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, new byte[2 * batchSize], null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); - accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); + accum.append(tp1, key, value, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -114,12 +114,12 @@ public class RecordAccumulatorTest { @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, false, metrics, time, metricTags); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) - accum.append(tp, key, value, CompressionType.NONE, null); + accum.append(tp, key, value, null); } assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); @@ -133,14 +133,14 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 0L, 100L, true, metrics, time, metricTags); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, null); } catch (Exception e) { e.printStackTrace(); } @@ -174,13 +174,13 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; // Partition on node1 only for (int i = 0; i < appends; i++) - accum.append(tp1, key, value, CompressionType.NONE, null); + accum.append(tp1, key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); @@ -189,14 +189,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) - accum.append(tp3, key, value, CompressionType.NONE, null); + accum.append(tp3, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) - accum.append(tp2, key, value, CompressionType.NONE, null); + accum.append(tp2, key, value, null); result = accum.ready(cluster, time.milliseconds()); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable @@ -207,9 +207,9 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; - final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, lingerMs, 100L, false, metrics, time, metricTags); + final RecordAccumulator accum = new RecordAccumulator(4 * 1024, 64 * 1024, CompressionType.NONE, lingerMs, 100L, false, metrics, time, metricTags); for (int i = 0; i < 100; i++) - accum.append(new TopicPartition(topic, i % 3), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % 3), key, value, null); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 24274a6..8b1805d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -52,7 +52,7 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time, metricTags); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, false, metrics, time, metricTags); private Sender sender = new Sender(client, metadata, this.accumulator, @@ -72,7 +72,7 @@ public class SenderTest { @Test public void testSimple() throws Exception { long offset = 0; - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); @@ -99,7 +99,7 @@ public class SenderTest { time, "clientId"); // do a successful retry - Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals(1, client.inFlightRequestCount()); @@ -116,7 +116,7 @@ public class SenderTest { assertEquals(offset, future.get().offset()); // do an unsuccessful retry - future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null).future; + future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), null).future; sender.run(time.milliseconds()); // send produce request for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().request().destination()); -- 2.2.2 From 5ed7a212f1158495b4788c3fee66b72803be9aa5 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Wed, 25 Mar 2015 13:19:40 -0500 Subject: [PATCH 2/2] Promote compressionType to class variable --- .../main/java/org/apache/kafka/clients/producer/KafkaProducer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 07ed531..b91e2c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -131,6 +131,7 @@ public class KafkaProducer implements Producer { private final Sender sender; private final Metrics metrics; private final Thread ioThread; + private final CompressionType compressionType; private final Sensor errors; private final Time time; private final Serializer keySerializer; @@ -210,12 +211,12 @@ public class KafkaProducer implements Producer { this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); - CompressionType compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); + this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); Map metricTags = new LinkedHashMap(); metricTags.put("client-id", clientId); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, - compressionType, + this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), -- 2.2.2