From e0c05d6bfa13da22d16861482d1bf1cc7f63e37f Mon Sep 17 00:00:00 2001 From: asingh Date: Fri, 17 Jul 2015 16:56:47 -0700 Subject: [PATCH 1/3] KAFKA-2275: Add a "Map> partitionsFor(String... topics)" API to the new consumer --- .../apache/kafka/clients/consumer/Consumer.java | 5 +++ .../kafka/clients/consumer/KafkaConsumer.java | 45 +++++++++++++++++++++- .../kafka/clients/consumer/MockConsumer.java | 17 ++++++++ .../scala/integration/kafka/api/ConsumerTest.scala | 15 ++++++++ 4 files changed, 81 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 252b759..bf65989 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -114,6 +114,11 @@ public interface Consumer extends Closeable { public List partitionsFor(String topic); /** + * @see KafkaConsumer#partitionsFor(String...) + */ + public Map> partitionsFor(String... topics); + + /** * @see KafkaConsumer#close() */ public void close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bea3d73..617538d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -1003,7 +1004,7 @@ public class KafkaConsumer implements Consumer { /** * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it * does not already have any metadata about the given topic. - * + * * @param topic The topic to get partition metadata for * @return The list of partitions */ @@ -1024,6 +1025,48 @@ public class KafkaConsumer implements Consumer { } } + /** + * Get metadata about the partitions for a given list of topics. This method will issue a + * remote call to the server if it does not already have any metadata about the given topic. + * + * @param topics The list of topics to get partition metadata for + * @return The map of topic and its list of partitions pair + */ + @Override + public Map> partitionsFor(String... topics) { + acquire(); + try { + Map> topicAndPartitionInfoMap + = new HashMap>(); + List missingTopics = new ArrayList(); + + // add metadata for topics that are available and add the remaining to missing topics + for (String topic: topics) { + final Cluster cluster = this.metadata.fetch(); + List parts = cluster.partitionsForTopic(topic); + if (parts == null) { + missingTopics.add(topic); + metadata.add(topic); + } else { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } + + // get metadata for topics that are missing metadata + client.awaitMetadataUpdate(); + + // add metadata for topics that were missing metadata before metadata update + final Cluster cluster = this.metadata.fetch(); + for (String topic: missingTopics) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + + return topicAndPartitionInfoMap; + } finally { + release(); + } + } + @Override public void close() { acquire(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c14eed1..4015cac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -177,6 +177,23 @@ public class MockConsumer implements Consumer { return parts; } + @Override + public synchronized Map> partitionsFor(String... topics) { + ensureNotClosed(); + Map> map = new HashMap>(); + + for (String topic: topics) { + List parts = this.partitions.get(topic); + if (parts == null) { + map.put(topic, Collections.emptyList()); + } else { + map.put(topic, parts); + } + } + + return map; + } + public synchronized void updatePartitions(String topic, List partitions) { ensureNotClosed(); this.partitions.put(topic, partitions); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..22be9fd 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -186,6 +186,21 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } + def testPartitionsForListOfTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + val mapTopicParts = this.consumers(0).partitionsFor(topic1, topic2) + assertNotNull(mapTopicParts) + assertEquals(2, mapTopicParts.size()) + assertEquals(2, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test -- 2.3.2 (Apple Git-55) From ef9f27fc168309f66138dfab52d8d357713a9783 Mon Sep 17 00:00:00 2001 From: asingh Date: Fri, 17 Jul 2015 21:38:53 -0700 Subject: [PATCH 2/3] Return metadata for all topics if empty list is passed to partitionsFor --- .../kafka/clients/consumer/KafkaConsumer.java | 55 ++++++++++++++-------- .../kafka/clients/consumer/MockConsumer.java | 19 +++++--- .../scala/integration/kafka/api/ConsumerTest.scala | 18 ++++++- 3 files changed, 65 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 617538d..0ae4bde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1026,10 +1026,13 @@ public class KafkaConsumer implements Consumer { } /** - * Get metadata about the partitions for a given list of topics. This method will issue a - * remote call to the server if it does not already have any metadata about the given topic. + * Get metadata about the partitions for a given list of topics. If given list of topics is + * empty, metadata about all the topics in the cluster will be returned. This method + * will issue a remote call to the server if it does not already have metadata about all + * of the topics in the provided list of topics or if the list is empty. * - * @param topics The list of topics to get partition metadata for + * @param topics The list of topics to get partition metadata for or an empty list to get + * metadata for all topics * @return The map of topic and its list of partitions pair */ @Override @@ -1038,27 +1041,39 @@ public class KafkaConsumer implements Consumer { try { Map> topicAndPartitionInfoMap = new HashMap>(); - List missingTopics = new ArrayList(); - // add metadata for topics that are available and add the remaining to missing topics - for (String topic: topics) { - final Cluster cluster = this.metadata.fetch(); - List parts = cluster.partitionsForTopic(topic); - if (parts == null) { - missingTopics.add(topic); - metadata.add(topic); - } else { - topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + if (topics.length > 0) { // return info about topics in the provided list + List missingTopics = new ArrayList(); + + // add metadata for topics that are available and add the remaining to missing topics + for (String topic : topics) { + final Cluster cluster = this.metadata.fetch(); + List parts = cluster.partitionsForTopic(topic); + if (parts == null) { + missingTopics.add(topic); + metadata.add(topic); + } else { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } } - } - // get metadata for topics that are missing metadata - client.awaitMetadataUpdate(); + // get metadata for topics that are missing metadata + client.awaitMetadataUpdate(); - // add metadata for topics that were missing metadata before metadata update - final Cluster cluster = this.metadata.fetch(); - for (String topic: missingTopics) { - topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + // add metadata for topics that were missing metadata before metadata update + final Cluster cluster = this.metadata.fetch(); + for (String topic : missingTopics) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } else { // Return info about all topics in the system + client.awaitMetadataUpdate(); + final Cluster cluster = this.metadata.fetch(); + for (String topic: cluster.topics()) { + final String OffsetsTopicName = "__consumer_offsets"; + if (!topic.equals(OffsetsTopicName)) { + topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); + } + } } return topicAndPartitionInfoMap; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 4015cac..7c29dae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients.consumer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -182,12 +183,18 @@ public class MockConsumer implements Consumer { ensureNotClosed(); Map> map = new HashMap>(); - for (String topic: topics) { - List parts = this.partitions.get(topic); - if (parts == null) { - map.put(topic, Collections.emptyList()); - } else { - map.put(topic, parts); + if (topics.length > 0) { + for (String topic : topics) { + List parts = this.partitions.get(topic); + if (parts == null) { + map.put(topic, Collections.emptyList()); + } else { + map.put(topic, parts); + } + } + } else { + for (String key: this.partitions.keySet()) { + map.put(key, this.partitions.get(key)); } } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 22be9fd..c43fca0 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -198,7 +198,23 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertEquals(2, mapTopicParts.keySet().size()) assertEquals(2, mapTopicParts.get(topic1).length) assertEquals(2, mapTopicParts.get(topic2).length) - assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + + def testPartitionsForAllTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + val topic3: String = "part-test-topic-3" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers) + val mapTopicParts = this.consumers(0).partitionsFor() + assertNotNull(mapTopicParts) + assertEquals(4, mapTopicParts.size()) + assertEquals(4, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + assertEquals(2, mapTopicParts.get(topic3).length) } def testPartitionReassignmentCallback() { -- 2.3.2 (Apple Git-55) From 4614d2f6134e0b3c118529e0f5f15aa79c5c2432 Mon Sep 17 00:00:00 2001 From: asingh Date: Mon, 20 Jul 2015 10:43:17 -0700 Subject: [PATCH 3/3] Add logic to get all topics when needMetadataForAllTopics is set on metadata --- .../java/org/apache/kafka/clients/Metadata.java | 26 +++++++++++++++++++++- .../org/apache/kafka/clients/NetworkClient.java | 3 ++- .../kafka/clients/consumer/KafkaConsumer.java | 8 ++++--- .../consumer/internals/ConsumerNetworkClient.java | 8 +++++++ .../main/java/org/apache/kafka/common/Cluster.java | 18 ++++++++++++++- .../scala/integration/kafka/api/ConsumerTest.scala | 9 +++++++- 6 files changed, 65 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 0387f26..e8d39ec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -41,6 +41,16 @@ public final class Metadata { private boolean needUpdate; private final Set topics; + public boolean needMetadataForAllTopics() { + return needMetadataForAllTopics; + } + + public void setNeedMetadataForAllTopics(boolean needMetadataForAllTopics) { + this.needMetadataForAllTopics = needMetadataForAllTopics; + } + + private boolean needMetadataForAllTopics; + /** * Create a metadata instance with reasonable defaults */ @@ -63,12 +73,20 @@ public final class Metadata { this.cluster = Cluster.empty(); this.needUpdate = false; this.topics = new HashSet(); + this.needMetadataForAllTopics = false; } /** - * Get the current cluster info without blocking + * Get the current cluster info for topics that are added to metadata without blocking */ public synchronized Cluster fetch() { + return this.cluster.pruneCluster(topics); + } + + /** + * Get the current cluster info without blocking + */ + public synchronized Cluster fetchAll() { return this.cluster; } @@ -151,6 +169,12 @@ public final class Metadata { this.lastSuccessfulRefreshMs = now; this.version += 1; this.cluster = cluster; + + // turn off need to fetch metadata for all topics + if (needMetadataForAllTopics) { + needMetadataForAllTopics = false; + } + notifyAll(); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 48fe796..7b1269e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; @@ -474,7 +475,7 @@ public class NetworkClient implements KafkaClient { if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) { - Set topics = metadata.topics(); + Set topics = metadata.needMetadataForAllTopics()? Collections.EMPTY_SET: metadata.topics(); this.metadataFetchInProgress = true; ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 0ae4bde..88f573c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1058,7 +1058,9 @@ public class KafkaConsumer implements Consumer { } // get metadata for topics that are missing metadata - client.awaitMetadataUpdate(); + if (!missingTopics.isEmpty()) { + client.awaitMetadataUpdate(); + } // add metadata for topics that were missing metadata before metadata update final Cluster cluster = this.metadata.fetch(); @@ -1066,8 +1068,8 @@ public class KafkaConsumer implements Consumer { topicAndPartitionInfoMap.put(topic, cluster.partitionsForTopic(topic)); } } else { // Return info about all topics in the system - client.awaitMetadataUpdate(); - final Cluster cluster = this.metadata.fetch(); + client.awaitMetadataUpdateForAllTopicsInCluster(); + final Cluster cluster = this.metadata.fetchAll(); for (String topic: cluster.topics()) { final String OffsetsTopicName = "__consumer_offsets"; if (!topic.equals(OffsetsTopicName)) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 9517d9d..76b13ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -127,6 +127,14 @@ public class ConsumerNetworkClient implements Closeable { } /** + * Block until the metadata has been obtained for all topics in cluster. + */ + public void awaitMetadataUpdateForAllTopicsInCluster() { + this.metadata.setNeedMetadataForAllTopics(true); + awaitMetadataUpdate(); + } + + /** * Wakeup an active poll. This will cause the polling thread to throw an exception either * on the current poll if one is active, or the next poll. */ diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 60594a7..db08cdc 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -39,7 +39,7 @@ public final class Cluster { List copy = new ArrayList(nodes); Collections.shuffle(copy); this.nodes = Collections.unmodifiableList(copy); - + this.nodesById = new HashMap(); for (Node node: nodes) this.nodesById.put(node.id(), node); @@ -185,4 +185,20 @@ public final class Cluster { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; } + public Cluster pruneCluster(Set topics) { + Set partitionInfos = new HashSet(); + for (String topic: topics) { + if (partitionsByTopic != null) { + final List partitions = partitionsByTopic.get(topic); + if (partitions != null) { + for (PartitionInfo partitionInfo : partitions) { + partitionInfos.add(partitionInfo); + } + } + } + } + + return new Cluster(nodes, partitionInfos); + } + } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index c43fca0..d978179 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -208,7 +208,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging { TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers) - val mapTopicParts = this.consumers(0).partitionsFor() + var mapTopicParts = this.consumers(0).partitionsFor(topic1, topic2) + assertNotNull(mapTopicParts) + assertEquals(2, mapTopicParts.size()) + assertEquals(2, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + + mapTopicParts = this.consumers(0).partitionsFor() assertNotNull(mapTopicParts) assertEquals(4, mapTopicParts.size()) assertEquals(4, mapTopicParts.keySet().size()) -- 2.3.2 (Apple Git-55)