From e0c05d6bfa13da22d16861482d1bf1cc7f63e37f Mon Sep 17 00:00:00 2001 From: asingh Date: Fri, 17 Jul 2015 16:56:47 -0700 Subject: [PATCH] KAFKA-2275: Add a "Map> partitionsFor(String... topics)" API to the new consumer --- .../apache/kafka/clients/consumer/Consumer.java | 5 +++ .../kafka/clients/consumer/KafkaConsumer.java | 45 +++++++++++++++++++++- .../kafka/clients/consumer/MockConsumer.java | 17 ++++++++ .../scala/integration/kafka/api/ConsumerTest.scala | 15 ++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 252b759..bf65989 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -114,6 +114,11 @@ public interface Consumer extends Closeable { public List partitionsFor(String topic); /** + * @see KafkaConsumer#partitionsFor(String...) + */ + public Map> partitionsFor(String... topics); + + /** * @see KafkaConsumer#close() */ public void close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bea3d73..617538d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -1003,7 +1004,7 @@ public class KafkaConsumer implements Consumer { /** * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it * does not already have any metadata about the given topic. - * + * * @param topic The topic to get partition metadata for * @return The list of partitions */ @@ -1024,6 +1025,48 @@ public class KafkaConsumer implements Consumer { } } + /** + * Get metadata about the partitions for a given list of topics. This method will issue a + * remote call to the server if it does not already have any metadata about the given topic. + * + * @param topics The list of topics to get partition metadata for + * @return The map of topic and its list of partitions pair + */ + @Override + public Map> partitionsFor(String... topics) { + acquire(); + try { + Map> topicAndPartitionInfoMap + = new HashMap>(); + List missingTopics = new ArrayList(); + + // add metadata for topics that are available and add the remaining to missing topics + for (String topic: topics) { + final Cluster cluster = this.metadata.fetch(); + List parts = cluster.partitionsForTopic(topic); + if (parts == null) { + missingTopics.add(topic); + metadata.add(topic); + } else { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } + + // get metadata for topics that are missing metadata + client.awaitMetadataUpdate(); + + // add metadata for topics that were missing metadata before metadata update + final Cluster cluster = this.metadata.fetch(); + for (String topic: missingTopics) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + + return topicAndPartitionInfoMap; + } finally { + release(); + } + } + @Override public void close() { acquire(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c14eed1..4015cac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -177,6 +177,23 @@ public class MockConsumer implements Consumer { return parts; } + @Override + public synchronized Map> partitionsFor(String... topics) { + ensureNotClosed(); + Map> map = new HashMap>(); + + for (String topic: topics) { + List parts = this.partitions.get(topic); + if (parts == null) { + map.put(topic, Collections.emptyList()); + } else { + map.put(topic, parts); + } + } + + return map; + } + public synchronized void updatePartitions(String topic, List partitions) { ensureNotClosed(); this.partitions.put(topic, partitions); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..22be9fd 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -186,6 +186,21 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } + def testPartitionsForListOfTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + val mapTopicParts = this.consumers(0).partitionsFor(topic1, topic2) + assertNotNull(mapTopicParts) + assertEquals(2, mapTopicParts.size()) + assertEquals(2, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test -- 2.3.2 (Apple Git-55)