diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 2d7e52d..3281136 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; @@ -49,6 +51,7 @@ public final class RecordAccumulator { private volatile boolean closed; private int drainIndex; + private int readyPartitions; private final int batchSize; private final long lingerMs; private final long retryBackoffMs; @@ -79,6 +82,7 @@ public final class RecordAccumulator { Metrics metrics, Time time) { this.drainIndex = 0; + this.readyPartitions = 0; this.closed = false; this.batchSize = batchSize; this.lingerMs = lingerMs; @@ -113,7 +117,7 @@ public final class RecordAccumulator { }); metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() { public double measure(MetricConfig config, long nowMs) { - return ready(nowMs).size(); + return readyPartitions; } }); } @@ -182,32 +186,40 @@ public final class RecordAccumulator { /** * Get a list of topic-partitions which are ready to be sent. *

- * A partition is ready if ANY of the following are true: + * A partition is ready if it is not backing off the send and ANY of the following are true: *

    *
  1. The record set is full *
  2. The record set has sat in the accumulator for at least lingerMs milliseconds *
  3. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are * immediately considered ready). + *
  4. Some other partitions sharing the same leader broker has already been ready *
  5. The accumulator has been closed *
*/ - public List ready(long nowMs) { + public List ready(Cluster cluster, long nowMs) { List ready = new ArrayList(); + List destinations = new ArrayList(); boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { + Node leader = (cluster == null) ? null : cluster.leaderFor(batch.topicPartition); boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; boolean full = deque.size() > 1 || batch.records.isFull(); boolean expired = nowMs - batch.createdMs >= lingerMs; - boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) + boolean carpooled = leader != null && destinations.contains(leader); + boolean sendable = full || expired || exhausted || closed || carpooled; + if (sendable && !backingOff) { ready.add(batch.topicPartition); + if (leader != null) + destinations.add(leader); + } } } } + this.readyPartitions = ready.size(); return ready; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f0152fa..8f8c18f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -192,7 +192,7 @@ public class Sender implements Runnable { public void run(long nowMs) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - List ready = this.accumulator.ready(nowMs); + List ready = this.accumulator.ready(cluster, nowMs); // should we update our metadata? List sends = new ArrayList(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index f37ab77..622b6af 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -48,10 +48,10 @@ public class RecordAccumulatorTest { int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(now).size()); + assertEquals("No partitions should be ready.", 0, accum.ready(null, now).size()); } accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + assertEquals("Our partition should be ready", asList(tp), accum.ready(null, time.milliseconds())); List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -69,7 +69,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + assertEquals("Our partition should be ready", asList(tp), accum.ready(null, time.milliseconds())); } @Test @@ -77,9 +77,9 @@ public class RecordAccumulatorTest { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size()); + assertEquals("No partitions should be ready", 0, accum.ready(null, time.milliseconds()).size()); time.sleep(10); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + assertEquals("Our partition should be ready", asList(tp), accum.ready(null, time.milliseconds())); List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); @@ -99,7 +99,7 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size()); + assertEquals("Both partitions should be ready", 2, accum.ready(null, time.milliseconds()).size()); List batches = accum.drain(partitions, 1024, 0); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); @@ -130,7 +130,7 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - List tps = accum.ready(now); + List tps = accum.ready(null, now); List batches = accum.drain(tps, 5 * 1024, 0); for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java index 1df2266..c788e66 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java @@ -20,13 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; -import org.apache.kafka.common.utils.AbstractIterator; import org.junit.Test; public class AbstractIteratorTest { @@ -49,7 +44,7 @@ public class AbstractIteratorTest { @Test(expected = NoSuchElementException.class) public void testEmptyIterator() { - Iterator iter = new ListIterator(Arrays.asList()); + Iterator iter = new ListIterator(Collections.emptyList()); iter.next(); }