From ef9a709960ee6e665af3994cf50407a15b390238 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 21 Apr 2015 15:50:41 -0700 Subject: [PATCH 1/2] Patch for KAFKA-2138 honor retry backoff in KafkaProducer --- .../producer/internals/RecordAccumulator.java | 26 +++++++++------- .../producer/internals/RecordAccumulatorTest.java | 36 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 0e7ab29..49a9883 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -292,17 +292,21 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch first = deque.peekFirst(); if (first != null) { - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - break; - } else { - RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); - ready.add(batch); - batch.drainedMs = now; + boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; + // Only drain the batch if it is not during backoff period. + if (!backoff) { + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request + break; + } else { + RecordBatch batch = deque.pollFirst(); + batch.records.close(); + size += batch.records.sizeInBytes(); + ready.add(batch); + batch.drainedMs = now; + } } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 05e2929..0c610be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -203,6 +203,42 @@ public class RecordAccumulatorTest { // but have leaders with other sendable data. assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); } + + @Test + public void testRertryBackoff() throws Exception { + long lingerMs = Long.MAX_VALUE / 4; + long retryBackoffMs = Long.MAX_VALUE / 2; + final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); + + long now = time.milliseconds(); + accum.append(tp1, key, value, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); + assertEquals("Node1 should only have one partition ready.", 1, batches.size()); + assertEquals("Partition 0 should only have one batche drained.", 1, batches.get(0).size()); + + // Reenqueue the batch + now = time.milliseconds(); + accum.reenqueue(batches.get(0).get(0), now); + + // Put message for paritition 1 into accumulator + accum.append(tp2, key, value, null); + result = accum.ready(cluster, now + lingerMs + 1); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + + // Partition 0 should backoff while partition 1 should not + batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); + assertEquals("Node1 should be the only ready node.", 1, batches.size()); + assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); + assertEquals("Node1 should only have one batch for partition 1.", tp2, batches.get(0).get(0).topicPartition); + + // Partition 0 now can be drained. + batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1); + assertEquals("Node1 should only have one partition ready.", 1, batches.size()); + assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); + assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition); + } @Test public void testFlush() throws Exception { -- 1.8.3.4 (Apple Git-47) From a370cb8792b7189b25edc84aa4fb6e6dc0dfe284 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 22 Apr 2015 17:18:58 -0700 Subject: [PATCH 2/2] Incorporated Joel's comments. --- .../clients/producer/internals/RecordAccumulatorTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 0c610be..6f73cd4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -205,7 +205,7 @@ public class RecordAccumulatorTest { } @Test - public void testRertryBackoff() throws Exception { + public void testRetryBackoff() throws Exception { long lingerMs = Long.MAX_VALUE / 4; long retryBackoffMs = Long.MAX_VALUE / 2; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, false, metrics, time, metricTags); @@ -215,19 +215,19 @@ public class RecordAccumulatorTest { RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); Map> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); - assertEquals("Node1 should only have one partition ready.", 1, batches.size()); - assertEquals("Partition 0 should only have one batche drained.", 1, batches.get(0).size()); + assertEquals("Node1 should be the only ready node.", 1, batches.size()); + assertEquals("Partition 0 should only have one batch drained.", 1, batches.get(0).size()); // Reenqueue the batch now = time.milliseconds(); accum.reenqueue(batches.get(0).get(0), now); - // Put message for paritition 1 into accumulator + // Put message for partition 1 into accumulator accum.append(tp2, key, value, null); result = accum.ready(cluster, now + lingerMs + 1); assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); - // Partition 0 should backoff while partition 1 should not + // tp1 should backoff while tp2 should not batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); assertEquals("Node1 should be the only ready node.", 1, batches.size()); assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); @@ -235,7 +235,7 @@ public class RecordAccumulatorTest { // Partition 0 now can be drained. batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1); - assertEquals("Node1 should only have one partition ready.", 1, batches.size()); + assertEquals("Node1 should be the only ready node.", 1, batches.size()); assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition); } -- 1.8.3.4 (Apple Git-47)