diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index fbb732a..2d5733d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -41,7 +41,7 @@ public class Partitioner { * Compute the partition for the given record. * * @param record The record being sent - * @param numPartitions The total number of partitions for the given topic + * @param cluster The current cluster metadata */ public int partition(ProducerRecord record, Cluster cluster) { List partitions = cluster.partitionsFor(record.topic()); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 2d7e52d..2e48e19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,15 +13,13 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; @@ -48,6 +46,7 @@ public final class RecordAccumulator { private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class); private volatile boolean closed; + private volatile int readyPartitions; private int drainIndex; private final int batchSize; private final long lingerMs; @@ -79,6 +78,7 @@ public final class RecordAccumulator { Metrics metrics, Time time) { this.drainIndex = 0; + this.readyPartitions = 0; this.closed = false; this.batchSize = batchSize; this.lingerMs = lingerMs; @@ -113,7 +113,7 @@ public final class RecordAccumulator { }); metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() { public double measure(MetricConfig config, long nowMs) { - return ready(nowMs).size(); + return readyPartitions; } }); } @@ -182,33 +182,66 @@ public final class RecordAccumulator { /** * Get a list of topic-partitions which are ready to be sent. *

- * A partition is ready if ANY of the following are true: + * A partition is ready if it is not backing off the send and ANY of the following are true: *

    *
  1. The record set is full *
  2. The record set has sat in the accumulator for at least lingerMs milliseconds *
  3. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are * immediately considered ready). + *
  4. Some other partitions sharing the same leader broker has already been ready *
  5. The accumulator has been closed *
*/ - public List ready(long nowMs) { - List ready = new ArrayList(); + public List ready(Cluster cluster, long nowMs) { + List readyNodes = new ArrayList(); + int readyPartitions = 0; boolean exhausted = this.free.queued() > 0; + // check which node is ready + for (Node node : cluster.nodes()) { + List parts = cluster.partitionsFor(node.id()); + for (PartitionInfo part : parts) { + Deque deque = this.batches.get(new TopicPartition(part.topic(), part.partition())); + if (deque != null) { + synchronized (deque) { + RecordBatch batch = deque.peekFirst(); + if (batch != null) { + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; + boolean full = deque.size() > 1 || batch.records.isFull(); + boolean expired = nowMs - batch.createdMs >= lingerMs; + boolean sendable = full || expired || exhausted || closed; + if (sendable && !backingOff) { + readyNodes.add(node); + readyPartitions += parts.size(); + break; + } + } + } + } + } + } + this.readyPartitions = readyPartitions; + + // check if any partitions with unknown leader is ready, hence needs to enforce a metadata update for (Map.Entry> entry : this.batches.entrySet()) { - Deque deque = entry.getValue(); - synchronized (deque) { - RecordBatch batch = deque.peekFirst(); - if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; - boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = nowMs - batch.createdMs >= lingerMs; - boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) - ready.add(batch.topicPartition); + TopicPartition part = entry.getKey(); + if (cluster.leaderFor(part) == null) { + Deque deque = entry.getValue(); + synchronized (deque) { + RecordBatch batch = deque.peekFirst(); + if (batch != null) { + boolean full = deque.size() > 1 || batch.records.isFull(); + boolean expired = nowMs - batch.createdMs >= lingerMs; + boolean sendable = full || expired || exhausted || closed; + if (sendable) { + readyNodes.add(Node.Unknown); + break; + } + } } } } - return ready; + + return readyNodes; } /** @@ -226,45 +259,55 @@ public final class RecordAccumulator { } /** - * Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts - * to avoid choosing the same topic-partitions over and over. + * Drain all the data for the given nodes and collate them into a list of + * batches that will fit within the specified size on a per-node basis. + * This method attempts to avoid choosing the same topic-node over and over. * - * @param partitions The list of partitions to drain + * @param cluster The current cluster metadata + * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain * @param nowMs The current unix time in milliseconds - * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize. + * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public List drain(List partitions, int maxSize, long nowMs) { - if (partitions.isEmpty()) - return Collections.emptyList(); - int size = 0; - List ready = new ArrayList(); - /* to make starvation less likely this loop doesn't start at 0 */ - int start = drainIndex = drainIndex % partitions.size(); - do { - TopicPartition tp = partitions.get(drainIndex); - Deque deque = dequeFor(tp); - if (deque != null) { - synchronized (deque) { - RecordBatch first = deque.peekFirst(); - if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - return ready; - } else { - RecordBatch batch = deque.pollFirst(); - batch.records.close(); - size += batch.records.sizeInBytes(); - ready.add(batch); - batch.drainedMs = nowMs; + public Map> drain(Cluster cluster, List nodes, int maxSize, long nowMs) { + if (nodes.isEmpty()) + return Collections.emptyMap(); + + Map> batches = new HashMap>(); + for (Node node : nodes) { + int size = 0; + List parts = cluster.partitionsFor(node.id()); + List ready = new ArrayList(); + /* to make starvation less likely this loop doesn't start at 0 */ + int start = drainIndex = drainIndex % parts.size(); + do { + PartitionInfo part = parts.get(drainIndex); + Deque deque = dequeFor(new TopicPartition(part.topic(), part.partition())); + if (deque != null) { + synchronized (deque) { + RecordBatch first = deque.peekFirst(); + if (first != null) { + if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due + // to compression; in this case we will still eventually send this batch in a single + // request + break; + } else { + RecordBatch batch = deque.pollFirst(); + batch.records.close(); + size += batch.records.sizeInBytes(); + ready.add(batch); + batch.drainedMs = nowMs; + } + } } } - } - this.drainIndex = (this.drainIndex + 1) % partitions.size(); - } while (start != drainIndex); - return ready; + this.drainIndex = (this.drainIndex + 1) % parts.size(); + } while (start != drainIndex); + batches.put(node.id(), ready); + } + return batches; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f0152fa..c2eec19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -192,18 +192,18 @@ public class Sender implements Runnable { public void run(long nowMs) { Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send - List ready = this.accumulator.ready(nowMs); + List ready = this.accumulator.ready(cluster, nowMs); // should we update our metadata? List sends = new ArrayList(); maybeUpdateMetadata(cluster, sends, nowMs); // prune the list of ready topics to eliminate any that we aren't ready to send yet - List sendable = processReadyPartitions(cluster, ready, nowMs); + List sendable = processReadyNode(ready, nowMs); // create produce requests - List batches = this.accumulator.drain(sendable, this.maxRequestSize, nowMs); - List requests = collate(cluster, batches, nowMs); + Map> batches = this.accumulator.drain(cluster, sendable, this.maxRequestSize, nowMs); + List requests = collate(batches, nowMs); sensors.updateProduceRequestMetrics(requests); if (ready.size() > 0) { @@ -310,19 +310,18 @@ public class Sender implements Runnable { } /** - * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add + * Process the set of destination nodes with data ready to send. If we have a connection to the appropriate node, add * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate * metadata to be able to do so */ - private List processReadyPartitions(Cluster cluster, List ready, long nowMs) { - List sendable = new ArrayList(ready.size()); - for (TopicPartition tp : ready) { - Node node = cluster.leaderFor(tp); - if (node == null) { + private List processReadyNode(List ready, long nowMs) { + List sendable = new ArrayList(ready.size()); + for (Node node : ready) { + if (node.equals(Node.Unknown)) { // we don't know about this topic/partition or it has no leader, re-fetch metadata metadata.forceUpdate(); } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { - sendable.add(tp); + sendable.add(node); } else if (nodeStates.canConnect(node.id(), nowMs)) { // we don't have a connection to this node right now, make one initiateConnect(node, nowMs); @@ -520,19 +519,9 @@ public class Sender implements Runnable { } /** - * Collate the record batches into a list of produce requests on a per-node basis + * Transfer the record batches into a list of produce requests on a per-node basis */ - private List collate(Cluster cluster, List batches, long nowMs) { - Map> collated = new HashMap>(); - for (RecordBatch batch : batches) { - Node node = cluster.leaderFor(batch.topicPartition); - List found = collated.get(node.id()); - if (found == null) { - found = new ArrayList(); - collated.put(node.id(), found); - } - found.add(batch); - } + private List collate(Map> collated, long nowMs) { List requests = new ArrayList(collated.size()); for (Map.Entry> entry : collated.entrySet()) requests.add(produceRequest(nowMs, entry.getKey(), acks, requestTimeout, entry.getValue())); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 426bd1e..bf9d3bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -28,6 +28,7 @@ public final class Cluster { private final List nodes; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; + private final Map> partitionsByNode; /** * Create a new cluster with the given nodes and partitions @@ -45,18 +46,34 @@ public final class Cluster { for (PartitionInfo p : partitions) this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); - // index the partitions by topic and make the lists unmodifiable so we can handle them out in - // user-facing apis without risk of the client modifying the contents - HashMap> parts = new HashMap>(); + // index the partitions by topic and node respectively, and make the lists + // unmodifiable so we can handle them out in user-facing apis without risk + // of the client modifying the contents + HashMap> partsForTopic = new HashMap>(); + HashMap> partsForNode = new HashMap>(); + for (Node n : this.nodes) { + partsForNode.put(n.id(), new ArrayList()); + } for (PartitionInfo p : partitions) { - if (!parts.containsKey(p.topic())) - parts.put(p.topic(), new ArrayList()); - List ps = parts.get(p.topic()); - ps.add(p); + if (!partsForTopic.containsKey(p.topic())) + partsForTopic.put(p.topic(), new ArrayList()); + List psTopic = partsForTopic.get(p.topic()); + psTopic.add(p); + + if (p.leader() != null) { + if (!partsForNode.containsKey(p.leader().id())) + throw new IllegalStateException("Metadata contains some partition with unknown leader broker id"); + List psNode = partsForNode.get(p.leader().id()); + psNode.add(p); + } } - this.partitionsByTopic = new HashMap>(parts.size()); - for (Map.Entry> entry : parts.entrySet()) + this.partitionsByTopic = new HashMap>(partsForTopic.size()); + for (Map.Entry> entry : partsForTopic.entrySet()) this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + this.partitionsByNode = new HashMap>(partsForNode.size()); + for (Map.Entry> entry : partsForNode.entrySet()) + this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + } /** @@ -117,6 +134,15 @@ public final class Cluster { return this.partitionsByTopic.get(topic); } + /** + * Get the list of partitions for this node + * @param nodeId The node id + * @return A list of partitions + */ + public List partitionsFor(int nodeId) { + return this.partitionsByNode.get(nodeId); + } + @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 0e47ff3..d21aee4 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -28,6 +28,8 @@ public class Node { this.port = port; } + public static final Node Unknown = new Node(-1, null, -1); + /** * The node id of this node */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index f37ab77..cbccecc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -17,12 +17,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.*; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; @@ -34,11 +35,19 @@ import org.junit.Test; public class RecordAccumulatorTest { - private TopicPartition tp = new TopicPartition("test", 0); + private String topic = "test"; + private int partition1 = 0; + private int partition2 = 1; + private Node node = new Node(0, "localhost", 1111); + private TopicPartition tp1 = new TopicPartition(topic, partition1); + private TopicPartition tp2 = new TopicPartition(topic, partition2); + private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null); + private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null); private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); + private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2)); private Metrics metrics = new Metrics(time); @Test @@ -47,12 +56,12 @@ public class RecordAccumulatorTest { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize; for (int i = 0; i < appends; i++) { - accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready.", 0, accum.ready(now).size()); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).size()); } - accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); - List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("Our partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); + List batches = accum.drain(cluster, asList(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -68,19 +77,19 @@ public class RecordAccumulatorTest { public void testAppendLarge() throws Exception { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); - accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); + accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); + assertEquals("Our partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); } @Test public void testLinger() throws Exception { long lingerMs = 10L; RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); - accum.append(tp, key, value, CompressionType.NONE, null); - assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size()); + accum.append(tp1, key, value, CompressionType.NONE, null); + assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).size()); time.sleep(10); - assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds())); - List batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0); + assertEquals("Our partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); + List batches = accum.drain(cluster, asList(node), Integer.MAX_VALUE, 0).get(node.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator iter = batch.records.iterator(); @@ -94,14 +103,14 @@ public class RecordAccumulatorTest { public void testPartialDrain() throws Exception { RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time); int appends = 1024 / msgSize + 1; - List partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1)); + List partitions = asList(tp1, tp2); for (TopicPartition tp : partitions) { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size()); + assertEquals("Partition's leader should be ready", asList(node), accum.ready(cluster, time.milliseconds())); - List batches = accum.drain(partitions, 1024, 0); + List batches = accum.drain(cluster, asList(node), 1024, 0).get(node.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -109,7 +118,7 @@ public class RecordAccumulatorTest { public void testStressfulSituation() throws Exception { final int numThreads = 5; final int msgs = 10000; - final int numParts = 10; + final int numParts = 2; final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time); List threads = new ArrayList(); for (int i = 0; i < numThreads; i++) { @@ -117,7 +126,7 @@ public class RecordAccumulatorTest { public void run() { for (int i = 0; i < msgs; i++) { try { - accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null); + accum.append(new TopicPartition(topic, i % numParts), key, value, CompressionType.NONE, null); } catch (Exception e) { e.printStackTrace(); } @@ -130,12 +139,14 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - List tps = accum.ready(now); - List batches = accum.drain(tps, 5 * 1024, 0); - for (RecordBatch batch : batches) { - for (LogEntry entry : batch.records) - read++; - accum.deallocate(batch); + List nodes = accum.ready(cluster, now); + List batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); + if (batches != null) { + for (RecordBatch batch : batches) { + for (LogEntry entry : batch.records) + read++; + accum.deallocate(batch); + } } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java index 1df2266..c788e66 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java @@ -20,13 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; -import org.apache.kafka.common.utils.AbstractIterator; import org.junit.Test; public class AbstractIteratorTest { @@ -49,7 +44,7 @@ public class AbstractIteratorTest { @Test(expected = NoSuchElementException.class) public void testEmptyIterator() { - Iterator iter = new ListIterator(Arrays.asList()); + Iterator iter = new ListIterator(Collections.emptyList()); iter.next(); }