diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 2d7e52d..3281136 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -49,6 +51,7 @@ public final class RecordAccumulator {
private volatile boolean closed;
private int drainIndex;
+ private int readyPartitions;
private final int batchSize;
private final long lingerMs;
private final long retryBackoffMs;
@@ -79,6 +82,7 @@ public final class RecordAccumulator {
Metrics metrics,
Time time) {
this.drainIndex = 0;
+ this.readyPartitions = 0;
this.closed = false;
this.batchSize = batchSize;
this.lingerMs = lingerMs;
@@ -113,7 +117,7 @@ public final class RecordAccumulator {
});
metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() {
public double measure(MetricConfig config, long nowMs) {
- return ready(nowMs).size();
+ return readyPartitions;
}
});
}
@@ -182,32 +186,40 @@ public final class RecordAccumulator {
/**
* Get a list of topic-partitions which are ready to be sent.
*
- * A partition is ready if ANY of the following are true:
+ * A partition is ready if it is not backing off the send and ANY of the following are true:
*
*
The record set is full
*
The record set has sat in the accumulator for at least lingerMs milliseconds
*
The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are
* immediately considered ready).
+ *
Some other partitions sharing the same leader broker has already been ready
*
The accumulator has been closed
*
*/
- public List ready(long nowMs) {
+ public List ready(Cluster cluster, long nowMs) {
List ready = new ArrayList();
+ List destinations = new ArrayList();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry> entry : this.batches.entrySet()) {
Deque deque = entry.getValue();
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
if (batch != null) {
+ Node leader = (cluster == null) ? null : cluster.leaderFor(batch.topicPartition);
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = nowMs - batch.createdMs >= lingerMs;
- boolean sendable = full || expired || exhausted || closed;
- if (sendable && !backingOff)
+ boolean carpooled = leader != null && destinations.contains(leader);
+ boolean sendable = full || expired || exhausted || closed || carpooled;
+ if (sendable && !backingOff) {
ready.add(batch.topicPartition);
+ if (leader != null)
+ destinations.add(leader);
+ }
}
}
}
+ this.readyPartitions = ready.size();
return ready;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index f0152fa..8f8c18f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -192,7 +192,7 @@ public class Sender implements Runnable {
public void run(long nowMs) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
- List ready = this.accumulator.ready(nowMs);
+ List ready = this.accumulator.ready(cluster, nowMs);
// should we update our metadata?
List sends = new ArrayList();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index f37ab77..622b6af 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -48,10 +48,10 @@ public class RecordAccumulatorTest {
int appends = 1024 / msgSize;
for (int i = 0; i < appends; i++) {
accum.append(tp, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready.", 0, accum.ready(now).size());
+ assertEquals("No partitions should be ready.", 0, accum.ready(null, now).size());
}
accum.append(tp, key, value, CompressionType.NONE, null);
- assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ assertEquals("Our partition should be ready", asList(tp), accum.ready(null, time.milliseconds()));
List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
@@ -69,7 +69,7 @@ public class RecordAccumulatorTest {
int batchSize = 512;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
- assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ assertEquals("Our partition should be ready", asList(tp), accum.ready(null, time.milliseconds()));
}
@Test
@@ -77,9 +77,9 @@ public class RecordAccumulatorTest {
long lingerMs = 10L;
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
accum.append(tp, key, value, CompressionType.NONE, null);
- assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
+ assertEquals("No partitions should be ready", 0, accum.ready(null, time.milliseconds()).size());
time.sleep(10);
- assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
+ assertEquals("Our partition should be ready", asList(tp), accum.ready(null, time.milliseconds()));
List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
@@ -99,7 +99,7 @@ public class RecordAccumulatorTest {
for (int i = 0; i < appends; i++)
accum.append(tp, key, value, CompressionType.NONE, null);
}
- assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
+ assertEquals("Both partitions should be ready", 2, accum.ready(null, time.milliseconds()).size());
List batches = accum.drain(partitions, 1024, 0);
assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
@@ -130,7 +130,7 @@ public class RecordAccumulatorTest {
int read = 0;
long now = time.milliseconds();
while (read < numThreads * msgs) {
- List tps = accum.ready(now);
+ List tps = accum.ready(null, now);
List batches = accum.drain(tps, 5 * 1024, 0);
for (RecordBatch batch : batches) {
for (LogEntry entry : batch.records)
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
index 1df2266..c788e66 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
@@ -20,13 +20,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
-import org.apache.kafka.common.utils.AbstractIterator;
import org.junit.Test;
public class AbstractIteratorTest {
@@ -49,7 +44,7 @@ public class AbstractIteratorTest {
@Test(expected = NoSuchElementException.class)
public void testEmptyIterator() {
- Iterator