From 2de7cb709174f20f4f29a54aae824eac7be6e351 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 24 Feb 2015 09:39:04 -0800 Subject: [PATCH 1/2] kafka-1984 --- .../clients/producer/internals/Partitioner.java | 15 ++++++------ .../main/java/org/apache/kafka/common/Cluster.java | 24 +++++++++++++++++-- .../kafka/clients/producer/PartitionerTest.java | 28 +++++++++++----------- 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index 8112e6d..dfb936d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -56,14 +56,15 @@ public class Partitioner { + "]."); return partition; } else if (key == null) { - // choose the next available node in a round-robin fashion - for (int i = 0; i < numPartitions; i++) { - int part = Utils.abs(counter.getAndIncrement()) % numPartitions; - if (partitions.get(part).leader() != null) - return part; + int nextValue = counter.getAndIncrement(); + List availablePartitions = cluster.availablePartitionsForTopic(topic); + if (availablePartitions.size() > 0) { + int part = Utils.abs(nextValue) % availablePartitions.size(); + return availablePartitions.get(part).partition(); + } else { + // no partitions are available, give a non-available partition + return Utils.abs(nextValue) % numPartitions; } - // no partitions are available, give a non-available partition - return Utils.abs(counter.getAndIncrement()) % numPartitions; } else { // hash the key to choose a partition return Utils.abs(Utils.murmur2(key)) % numPartitions; diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 8fcd291..60594a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -25,6 +25,7 @@ public final class Cluster { private final List nodes; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; + private final Map> availablePartitionsByTopic; private final Map> partitionsByNode; private final Map nodesById; @@ -68,8 +69,18 @@ public final class Cluster { } } this.partitionsByTopic = new HashMap>(partsForTopic.size()); - for (Map.Entry> entry : partsForTopic.entrySet()) - this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + this.availablePartitionsByTopic = new HashMap>(partsForTopic.size()); + for (Map.Entry> entry : partsForTopic.entrySet()) { + String topic = entry.getKey(); + List partitionList = entry.getValue(); + this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList)); + List availablePartitions = new ArrayList(); + for (PartitionInfo part : partitionList) { + if (part.leader() != null) + availablePartitions.add(part); + } + this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions)); + } this.partitionsByNode = new HashMap>(partsForNode.size()); for (Map.Entry> entry : partsForNode.entrySet()) this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); @@ -144,6 +155,15 @@ public final class Cluster { } /** + * Get the list of available partitions for this topic + * @param topic The topic name + * @return A list of partitions + */ + public List availablePartitionsForTopic(String topic) { + return this.availablePartitionsByTopic.get(topic); + } + + /** * Get the list of partitions whose leader is this node * @param nodeId The node id * @return A list of partitions diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index 29c8417..38d6aa8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -33,9 +33,9 @@ public class PartitionerTest { private Node node2 = new Node(2, "localhost", 101); private Node[] nodes = new Node[] {node0, node1, node2}; private String topic = "test"; - private List partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes), - new PartitionInfo(topic, 1, node1, nodes, nodes), - new PartitionInfo(topic, 2, null, nodes, nodes)); + private List partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), + new PartitionInfo(topic, 2, node1, nodes, nodes), + new PartitionInfo(topic, 0, node0, nodes, nodes)); private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); @Test @@ -50,19 +50,19 @@ public class PartitionerTest { } @Test - public void testRoundRobinIsStable() { - int startPart = partitioner.partition("test", null, null, cluster); + public void testRoundRobinWithUnavailablePartitions() { + // When there are some unavailable partitions, we want to make sure that (1) we also pick an available partition, + // and (2) the available partitions are selected in a round robin way. + int countForPart0 = 0; + int countForPart1 = 0; for (int i = 1; i <= 100; i++) { - int partition = partitioner.partition("test", null, null, cluster); - assertEquals("Should yield a different partition each call with round-robin partitioner", partition, (startPart + i) % 2); - } - } - - @Test - public void testRoundRobinWithDownNode() { - for (int i = 0; i < partitions.size(); i++) { int part = partitioner.partition("test", null, null, cluster); - assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2); + assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2); + if (part == 0) + countForPart0++; + else + countForPart1++; } + assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart1); } } -- 1.8.5.2 (Apple Git-48) From 540c3d9d0e5d3fe59c94bb50eb365bc6eeaf703b Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 24 Feb 2015 09:42:34 -0800 Subject: [PATCH 2/2] minor renaming --- .../java/org/apache/kafka/clients/producer/PartitionerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index 38d6aa8..f0e0f0d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -54,15 +54,15 @@ public class PartitionerTest { // When there are some unavailable partitions, we want to make sure that (1) we also pick an available partition, // and (2) the available partitions are selected in a round robin way. int countForPart0 = 0; - int countForPart1 = 0; + int countForPart2 = 0; for (int i = 1; i <= 100; i++) { int part = partitioner.partition("test", null, null, cluster); assertTrue("We should never choose a leader-less node in round robin", part == 0 || part == 2); if (part == 0) countForPart0++; else - countForPart1++; + countForPart2++; } - assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart1); + assertEquals("The distribution between two available partitions should be even", countForPart0, countForPart2); } } -- 1.8.5.2 (Apple Git-48)