From e0c05d6bfa13da22d16861482d1bf1cc7f63e37f Mon Sep 17 00:00:00 2001 From: asingh Date: Fri, 17 Jul 2015 16:56:47 -0700 Subject: [PATCH 1/2] KAFKA-2275: Add a "Map> partitionsFor(String... topics)" API to the new consumer --- .../apache/kafka/clients/consumer/Consumer.java | 5 +++ .../kafka/clients/consumer/KafkaConsumer.java | 45 +++++++++++++++++++++- .../kafka/clients/consumer/MockConsumer.java | 17 ++++++++ .../scala/integration/kafka/api/ConsumerTest.scala | 15 ++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 252b759..bf65989 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -114,6 +114,11 @@ public interface Consumer extends Closeable { public List partitionsFor(String topic); /** + * @see KafkaConsumer#partitionsFor(String...) + */ + public Map> partitionsFor(String... topics); + + /** * @see KafkaConsumer#close() */ public void close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bea3d73..617538d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -1003,7 +1004,7 @@ public class KafkaConsumer implements Consumer { /** * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it * does not already have any metadata about the given topic. - * + * * @param topic The topic to get partition metadata for * @return The list of partitions */ @@ -1024,6 +1025,48 @@ public class KafkaConsumer implements Consumer { } } + /** + * Get metadata about the partitions for a given list of topics. This method will issue a + * remote call to the server if it does not already have any metadata about the given topic. + * + * @param topics The list of topics to get partition metadata for + * @return The map of topic and its list of partitions pair + */ + @Override + public Map> partitionsFor(String... topics) { + acquire(); + try { + Map> topicAndPartitionInfoMap + = new HashMap>(); + List missingTopics = new ArrayList(); + + // add metadata for topics that are available and add the remaining to missing topics + for (String topic: topics) { + final Cluster cluster = this.metadata.fetch(); + List parts = cluster.partitionsForTopic(topic); + if (parts == null) { + missingTopics.add(topic); + metadata.add(topic); + } else { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } + + // get metadata for topics that are missing metadata + client.awaitMetadataUpdate(); + + // add metadata for topics that were missing metadata before metadata update + final Cluster cluster = this.metadata.fetch(); + for (String topic: missingTopics) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + + return topicAndPartitionInfoMap; + } finally { + release(); + } + } + @Override public void close() { acquire(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c14eed1..4015cac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -177,6 +177,23 @@ public class MockConsumer implements Consumer { return parts; } + @Override + public synchronized Map> partitionsFor(String... topics) { + ensureNotClosed(); + Map> map = new HashMap>(); + + for (String topic: topics) { + List parts = this.partitions.get(topic); + if (parts == null) { + map.put(topic, Collections.emptyList()); + } else { + map.put(topic, parts); + } + } + + return map; + } + public synchronized void updatePartitions(String topic, List partitions) { ensureNotClosed(); this.partitions.put(topic, partitions); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..22be9fd 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -186,6 +186,21 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } + def testPartitionsForListOfTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + val mapTopicParts = this.consumers(0).partitionsFor(topic1, topic2) + assertNotNull(mapTopicParts) + assertEquals(2, mapTopicParts.size()) + assertEquals(2, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test -- 2.3.2 (Apple Git-55) From ef9f27fc168309f66138dfab52d8d357713a9783 Mon Sep 17 00:00:00 2001 From: asingh Date: Fri, 17 Jul 2015 21:38:53 -0700 Subject: [PATCH 2/2] Return metadata for all topics if empty list is passed to partitionsFor --- .../kafka/clients/consumer/KafkaConsumer.java | 55 ++++++++++++++-------- .../kafka/clients/consumer/MockConsumer.java | 19 +++++--- .../scala/integration/kafka/api/ConsumerTest.scala | 18 ++++++- 3 files changed, 65 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 617538d..0ae4bde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1026,10 +1026,13 @@ public class KafkaConsumer implements Consumer { } /** - * Get metadata about the partitions for a given list of topics. This method will issue a - * remote call to the server if it does not already have any metadata about the given topic. + * Get metadata about the partitions for a given list of topics. If given list of topics is + * empty, metadata about all the topics in the cluster will be returned. This method + * will issue a remote call to the server if it does not already have metadata about all + * of the topics in the provided list of topics or if the list is empty. * - * @param topics The list of topics to get partition metadata for + * @param topics The list of topics to get partition metadata for or an empty list to get + * metadata for all topics * @return The map of topic and its list of partitions pair */ @Override @@ -1038,27 +1041,39 @@ public class KafkaConsumer implements Consumer { try { Map> topicAndPartitionInfoMap = new HashMap>(); - List missingTopics = new ArrayList(); - // add metadata for topics that are available and add the remaining to missing topics - for (String topic: topics) { - final Cluster cluster = this.metadata.fetch(); - List parts = cluster.partitionsForTopic(topic); - if (parts == null) { - missingTopics.add(topic); - metadata.add(topic); - } else { - topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + if (topics.length > 0) { // return info about topics in the provided list + List missingTopics = new ArrayList(); + + // add metadata for topics that are available and add the remaining to missing topics + for (String topic : topics) { + final Cluster cluster = this.metadata.fetch(); + List parts = cluster.partitionsForTopic(topic); + if (parts == null) { + missingTopics.add(topic); + metadata.add(topic); + } else { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } } - } - // get metadata for topics that are missing metadata - client.awaitMetadataUpdate(); + // get metadata for topics that are missing metadata + client.awaitMetadataUpdate(); - // add metadata for topics that were missing metadata before metadata update - final Cluster cluster = this.metadata.fetch(); - for (String topic: missingTopics) { - topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + // add metadata for topics that were missing metadata before metadata update + final Cluster cluster = this.metadata.fetch(); + for (String topic : missingTopics) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } else { // Return info about all topics in the system + client.awaitMetadataUpdate(); + final Cluster cluster = this.metadata.fetch(); + for (String topic: cluster.topics()) { + final String OffsetsTopicName = "__consumer_offsets"; + if (!topic.equals(OffsetsTopicName)) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } } return topicAndPartitionInfoMap; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 4015cac..7c29dae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -182,12 +183,18 @@ public class MockConsumer implements Consumer { ensureNotClosed(); Map> map = new HashMap>(); - for (String topic: topics) { - List parts = this.partitions.get(topic); - if (parts == null) { - map.put(topic, Collections.emptyList()); - } else { - map.put(topic, parts); + if (topics.length > 0) { + for (String topic : topics) { + List parts = this.partitions.get(topic); + if (parts == null) { + map.put(topic, Collections.emptyList()); + } else { + map.put(topic, parts); + } + } + } else { + for (String key: this.partitions.keySet()) { + map.put(key, this.partitions.get(key)); } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 22be9fd..c43fca0 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -198,7 +198,23 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(2, mapTopicParts.keySet().size()) assertEquals(2, mapTopicParts.get(topic1).length) assertEquals(2, mapTopicParts.get(topic2).length) - assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + + def testPartitionsForAllTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + val topic3: String = "part-test-topic-3" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers) + val mapTopicParts = this.consumers(0).partitionsFor() + assertNotNull(mapTopicParts) + assertEquals(4, mapTopicParts.size()) + assertEquals(4, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + assertEquals(2, mapTopicParts.get(topic3).length) } def testPartitionReassignmentCallback() { -- 2.3.2 (Apple Git-55)