From 1bd1874eec5bfe91dc2ff660faf422d034477cfe Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 22 Dec 2014 18:48:55 -0800 Subject: [PATCH] KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches for partitions that has no leader do not stay in accumulator forever. --- .../kafka/clients/producer/KafkaProducer.java | 1 + .../kafka/clients/producer/ProducerConfig.java | 10 +++++++++- .../producer/internals/RecordAccumulator.java | 14 +++++++++++++ .../clients/producer/internals/RecordBatch.java | 5 +++++ .../clients/producer/RecordAccumulatorTest.java | 23 ++++++++++++++++------ .../apache/kafka/clients/producer/SenderTest.java | 2 +- 6 files changed, 47 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index f61efb3..bb388e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -150,6 +150,7 @@ public class KafkaProducer implements Producer { this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, + config.getLong(ProducerConfig.BATCH_EXPIRATION_MS_CONFIG), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, time); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index a893d88..c88f8a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -182,6 +182,13 @@ public class ProducerConfig extends AbstractConfig { public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + /** batch.expiration.ms */ + public static final String BATCH_EXPIRATION_MS_CONFIG = "batch.expiration.ms"; + private static final String BATCH_EXPIRATION_MS_DOC = "Producer keeps a list of records to be sent to any partition in memory in form of a record batch." + + " In the rare case when all the brokers for a partition are unavailable the batch can not be sent anywhere." + + " To avoid consuming memory on producer client side forever the batch will be discarded when batch.expiration.ms " + + " milliseconds has elapsed since the last retry attempt and still no broker is available."; + static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -230,7 +237,8 @@ public class ProducerConfig extends AbstractConfig { Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) + .define(BATCH_EXPIRATION_MS_CONFIG, Type.LONG, 60 * 60 * 1000, atLeast(0L), Importance.MEDIUM, BATCH_EXPIRATION_MS_CONFIG); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c15485d..b647b3a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -61,6 +61,7 @@ public final class RecordAccumulator { private final BufferPool free; private final Time time; private final ConcurrentMap> batches; + private long batchExpirationMs; /** * Create a new record accumulator @@ -72,6 +73,8 @@ public final class RecordAccumulator { * latency for potentially better throughput due to more batching (and hence fewer, larger requests). * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids * exhausting all retries in a short period of time. + * @param batchExpirationMs when this much time has been elapsed since last retry attempt and still no leader node + * can be found for a batch, the batch will be discarded. * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of * memory * @param metrics The metrics @@ -81,6 +84,7 @@ public final class RecordAccumulator { long totalSize, long lingerMs, long retryBackoffMs, + long batchExpirationMs, boolean blockOnBufferFull, Metrics metrics, Time time) { @@ -89,6 +93,7 @@ public final class RecordAccumulator { this.batchSize = batchSize; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; + this.batchExpirationMs = batchExpirationMs; this.batches = new CopyOnWriteMap>(); this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull, metrics, time); this.time = time; @@ -210,6 +215,15 @@ public final class RecordAccumulator { Node leader = cluster.leaderFor(part); if (leader == null) { unknownLeadersExist = true; + RecordBatch recordBatch = deque.peekFirst(); + final long MAX_EXPIRATION_MS = 60 * 60 * 1000;//probably needs a configuration, but setting it to 1 hour for starters. + if(recordBatch.isExpired(nowMs, MAX_EXPIRATION_MS)) { + recordBatch = deque.pollFirst(); + recordBatch.records.close(); + recordBatch.done(0, new RuntimeException("No leader node found even after waiting for " + + MAX_EXPIRATION_MS + " milliseconds.")); + deallocate(recordBatch); + } } else if (!readyNodes.contains(leader)) { synchronized (deque) { RecordBatch batch = deque.peekFirst(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index dd0af8a..81632d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,6 +97,10 @@ public final class RecordBatch { } } + public boolean isExpired(long now, long maxExpirationMs) { + return now >= (lastAttemptMs + maxExpirationMs); + } + /** * A callback and the associated FutureRecordMetadata argument to pass to it. */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 2c99324..30f3282 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -59,11 +59,12 @@ public class RecordAccumulatorTest { private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); + private Long batchExpirationMs = 60L * 60L * 1000L; @Test public void testFull() throws Exception { long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, batchExpirationMs, false, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp1, key, value, CompressionType.NONE, null); @@ -86,7 +87,7 @@ public class RecordAccumulatorTest { @Test public void testAppendLarge() throws Exception { int batchSize = 512; - RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); + RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, batchExpirationMs, false, metrics, time); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @@ -94,7 +95,7 @@ public class RecordAccumulatorTest { @Test public void testLinger() throws Exception { long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, batchExpirationMs, false, metrics, time); accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); @@ -110,8 +111,18 @@ public class RecordAccumulatorTest { } @Test + public void testExpiration() throws Exception { + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, batchExpirationMs, false, metrics, time); + accum.append( new TopicPartition("invalid", 1), key, value, CompressionType.NONE, null); + assertTrue("verifying that recordbatch was added.", accum.hasUnsent()); + time.sleep(60 * 60 * 1000); + assertEquals("no leader should exist for invalid topic", true, accum.ready(cluster, time.milliseconds()).unknownLeadersExist); + assertFalse("no unsent messages should exist", accum.hasUnsent()); + } + + @Test public void testPartialDrain() throws Exception { - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, batchExpirationMs, false, metrics, time); int appends = 1024 / msgSize + 1; List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { @@ -129,7 +140,7 @@ public class RecordAccumulatorTest { final int numThreads = 5; final int msgs = 10000; final int numParts = 2; - final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time); + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, batchExpirationMs, true, metrics, time); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -169,7 +180,7 @@ public class RecordAccumulatorTest { public void testNextReadyCheckDelay() throws Exception { // Next check time will use lingerMs since this test won't trigger any retries/backoff long lingerMs = 10L; - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, batchExpirationMs, false, metrics, time); // Just short of going over the limit so we trigger linger time int appends = 1024 / msgSize; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index ef2ca65..ff69bcb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -50,7 +50,7 @@ public class SenderTest { private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); - private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time); + private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, 60 * 60 * 1000, false, metrics, time); private Sender sender = new Sender(client, metadata, this.accumulator, -- 1.9.3 (Apple Git-50)