From d1ed9c1464828ff9329e928c31f8253c01875eb1 Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 21 Jul 2015 23:32:33 -0700 Subject: [PATCH 1/2] KAFKA-2275: Add a ListTopics() API to the new consumer --- .../org/apache/kafka/clients/ClientRequest.java | 29 ++++++++++++-- .../org/apache/kafka/clients/NetworkClient.java | 5 ++- .../apache/kafka/clients/consumer/Consumer.java | 5 +++ .../kafka/clients/consumer/KafkaConsumer.java | 16 ++++++++ .../kafka/clients/consumer/MockConsumer.java | 6 +++ .../consumer/internals/ConsumerNetworkClient.java | 44 ++++++++++++++++++++++ .../scala/integration/kafka/api/ConsumerTest.scala | 18 +++++++++ 7 files changed, 118 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index ed4c0d9..3990879 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -23,6 +23,7 @@ public final class ClientRequest { private final boolean expectResponse; private final RequestSend request; private final RequestCompletionHandler callback; + private final boolean isInitiatedByNetworkClient; /** * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. @@ -30,17 +31,35 @@ public final class ClientRequest { * @param request The request * @param callback A callback to execute when the response has been received (or null if no callback is necessary) */ - public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback) { + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, + RequestCompletionHandler callback) { + this(createdMs, expectResponse, request, callback, false); + } + + /** + * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. + * @param expectResponse Should we expect a response message or is this request complete once it is sent? + * @param request The request + * @param callback A callback to execute when the response has been received (or null if no callback is necessary) + * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its + * response will be consumed by network client + */ + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, + RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) { this.createdMs = createdMs; this.callback = callback; this.request = request; this.expectResponse = expectResponse; + this.isInitiatedByNetworkClient = isInitiatedByNetworkClient; } @Override public String toString() { - return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request - + ")"; + return "ClientRequest(expectResponse=" + expectResponse + + ", callback=" + callback + + ", request=" + request + + (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient": "") + + ")"; } public boolean expectResponse() { @@ -63,4 +82,8 @@ public final class ClientRequest { return createdMs; } + public boolean isInitiatedByNetworkClient() { + return isInitiatedByNetworkClient; + } + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 48fe796..114bc36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -15,6 +15,7 @@ package org.apache.kafka.clients; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; @@ -378,7 +379,7 @@ public class NetworkClient implements KafkaClient { short apiKey = req.request().header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request().header(), header); - if (apiKey == ApiKeys.METADATA.id) { + if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) { handleMetadataResponse(req.request().header(), body, now); } else { // need to add body/header to response here @@ -454,7 +455,7 @@ public class NetworkClient implements KafkaClient { private ClientRequest metadataRequest(long now, String node, Set topics) { MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); - return new ClientRequest(now, true, send, null); + return new ClientRequest(now, true, send, null, true); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 252b759..23e410b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -114,6 +114,11 @@ public interface Consumer extends Closeable { public List partitionsFor(String topic); /** + * @see KafkaConsumer#listTopics() + */ + public Map> listTopics(); + + /** * @see KafkaConsumer#close() */ public void close(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index bea3d73..c40dd4e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1024,6 +1024,22 @@ public class KafkaConsumer implements Consumer { } } + /** + * Get metadata about partitions for all topics. This method will issue a remote call to the + * server. + * + * @return The map of topics and its partitions + */ + @Override + public Map> listTopics() { + acquire(); + try { + return client.getAllTopics(requestTimeoutMs); + } finally { + release(); + } + } + @Override public void close() { acquire(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c14eed1..5b22fa0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -177,6 +177,12 @@ public class MockConsumer implements Consumer { return parts; } + @Override + public Map> listTopics() { + ensureNotClosed(); + return partitions; + } + public synchronized void updatePartitions(String topic, List partitions) { ensureNotClosed(); this.partitions.put(topic, partitions); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 9517d9d..28ce373 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -19,15 +19,20 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.clients.consumer.ConsumerWakeupException; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -284,6 +289,45 @@ public class ConsumerNetworkClient implements Closeable { client.close(); } + /** + * Get metadata for all topics present in Kafka cluster + * + * @param timeout time for which getting all topics is attempted + * @return The map of topics and its partitions + */ + public Map> getAllTopics(long timeout) { + final HashMap> topicsPartitionInfos = new HashMap<>(); + long startTime = time.milliseconds(); + + while (time.milliseconds() - startTime < timeout) { + final Node node = client.leastLoadedNode(time.milliseconds()); + if (node != null) { + MetadataRequest metadataRequest = new MetadataRequest(Collections.emptyList()); + final RequestFuture requestFuture = send(node, ApiKeys.METADATA, metadataRequest); + + poll(requestFuture); + + if (requestFuture.succeeded()) { + MetadataResponse response = new MetadataResponse(requestFuture.value().responseBody()); + + for (String topic : response.cluster().topics()) + if (!topic.equals("__consumer_offsets")) // do not include offsets topic + topicsPartitionInfos.put(topic, response.cluster().availablePartitionsForTopic(topic)); + + return topicsPartitionInfos; + } + + if (!requestFuture.isRetriable()) { + throw requestFuture.exception(); + } + } + + Utils.sleep(retryBackoffMs); + } + + return topicsPartitionInfos; + } + public static class RequestFutureCompletionHandler extends RequestFuture implements RequestCompletionHandler { diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 3eb5f95..e12fd25 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -186,6 +186,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging { assertNull(this.consumers(0).partitionsFor("non-exist-topic")) } + def testListTopics() { + val numParts = 2 + val topic1: String = "part-test-topic-1" + val topic2: String = "part-test-topic-2" + val topic3: String = "part-test-topic-3" + TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers) + TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers) + + val mapTopicParts = this.consumers.head.listTopics() + assertNotNull(mapTopicParts) + assertEquals(4, mapTopicParts.size()) + assertEquals(4, mapTopicParts.keySet().size()) + assertEquals(2, mapTopicParts.get(topic1).length) + assertEquals(2, mapTopicParts.get(topic2).length) + assertEquals(2, mapTopicParts.get(topic3).length) + } + def testPartitionReassignmentCallback() { val callback = new TestConsumerReassignmentCallback() this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test -- 2.3.2 (Apple Git-55) From 8ae94c6d627212e92dc790a6df6842534c1355cb Mon Sep 17 00:00:00 2001 From: asingh Date: Wed, 22 Jul 2015 16:09:16 -0700 Subject: [PATCH 2/2] Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all topics. Remove braces for single line if --- .../org/apache/kafka/clients/ClientRequest.java | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 1 - .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../consumer/internals/ConsumerNetworkClient.java | 44 ---------------------- .../kafka/clients/consumer/internals/Fetcher.java | 44 ++++++++++++++++++++++ .../clients/consumer/internals/FetcherTest.java | 34 +++++++++++++++++ .../scala/integration/kafka/api/ConsumerTest.scala | 4 +- 7 files changed, 82 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index 3990879..dc8f0f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -58,7 +58,7 @@ public final class ClientRequest { return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request + - (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient": "") + + (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") + ")"; } diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 114bc36..0e51d7b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -15,7 +15,6 @@ package org.apache.kafka.clients; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c40dd4e..923ff99 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1034,7 +1034,7 @@ public class KafkaConsumer implements Consumer { public Map> listTopics() { acquire(); try { - return client.getAllTopics(requestTimeoutMs); + return fetcher.getAllTopics(requestTimeoutMs); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 28ce373..9517d9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -19,20 +19,15 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.clients.consumer.ConsumerWakeupException; import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; -import org.apache.kafka.common.requests.MetadataRequest; -import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -289,45 +284,6 @@ public class ConsumerNetworkClient implements Closeable { client.close(); } - /** - * Get metadata for all topics present in Kafka cluster - * - * @param timeout time for which getting all topics is attempted - * @return The map of topics and its partitions - */ - public Map> getAllTopics(long timeout) { - final HashMap> topicsPartitionInfos = new HashMap<>(); - long startTime = time.milliseconds(); - - while (time.milliseconds() - startTime < timeout) { - final Node node = client.leastLoadedNode(time.milliseconds()); - if (node != null) { - MetadataRequest metadataRequest = new MetadataRequest(Collections.emptyList()); - final RequestFuture requestFuture = send(node, ApiKeys.METADATA, metadataRequest); - - poll(requestFuture); - - if (requestFuture.succeeded()) { - MetadataResponse response = new MetadataResponse(requestFuture.value().responseBody()); - - for (String topic : response.cluster().topics()) - if (!topic.equals("__consumer_offsets")) // do not include offsets topic - topicsPartitionInfos.put(topic, response.cluster().availablePartitionsForTopic(topic)); - - return topicsPartitionInfos; - } - - if (!requestFuture.isRetriable()) { - throw requestFuture.exception(); - } - } - - Utils.sleep(retryBackoffMs); - } - - return topicsPartitionInfos; - } - public static class RequestFutureCompletionHandler extends RequestFuture implements RequestCompletionHandler { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d595c1c..8931e63 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -39,6 +39,8 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -160,6 +162,48 @@ public class Fetcher { } } + + + /** + * Get metadata for all topics present in Kafka cluster + * + * @param timeout time for which getting all topics is attempted + * @return The map of topics and its partitions + */ + public Map> getAllTopics(long timeout) { + final HashMap> topicsPartitionInfos = new HashMap<>(); + long startTime = time.milliseconds(); + + while (time.milliseconds() - startTime < timeout) { + final Node node = client.leastLoadedNode(); + if (node != null) { + MetadataRequest metadataRequest = new MetadataRequest(Collections.emptyList()); + final RequestFuture requestFuture = + client.send(node, ApiKeys.METADATA, metadataRequest); + + client.poll(requestFuture); + + if (requestFuture.succeeded()) { + MetadataResponse response = + new MetadataResponse(requestFuture.value().responseBody()); + + for (String topic : response.cluster().topics()) + topicsPartitionInfos.put( + topic, response.cluster().availablePartitionsForTopic(topic)); + + return topicsPartitionInfos; + } + + if (!requestFuture.isRetriable()) + throw requestFuture.exception(); + } + + Utils.sleep(retryBackoffMs); + } + + return topicsPartitionInfos; + } + /** * Reset offsets for the given partition using the offset reset strategy. * diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 7a4e586..28801ec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -29,6 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.test.TestUtils; @@ -37,6 +39,7 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -163,6 +166,37 @@ public class FetcherTest { assertEquals(null, subscriptions.consumed(tp)); } + @Test + public void testGetAllTopics() throws InterruptedException { + final Map> allTopics = new HashMap<>(); + Runnable runnable = new TopicsFetchThread(allTopics, fetcher); + final Thread topicsFetcher = new Thread(runnable); + topicsFetcher.start(); + + client.respond( + new MetadataResponse(cluster, Collections.emptyMap()).toStruct()); + topicsFetcher.join(); + + assertEquals(cluster.topics().size(), allTopics.size()); + } + + private class TopicsFetchThread implements Runnable { + private final Fetcher fetcher; + private final Map> allTopics; + + public TopicsFetchThread( + Map> allTopics, Fetcher fetcher) { + this.allTopics = allTopics; + this.fetcher = fetcher; + } + + @Override + public void run() { + final Map> allTopics = fetcher.getAllTopics(5000L); + this.allTopics.putAll(allTopics); + } + } + private Struct fetchResponse(ByteBuffer buffer, short error, long hw) { FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer))); return response.toStruct(); diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index e12fd25..392529a 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -197,8 +197,8 @@ class ConsumerTest extends IntegrationTestHarness with Logging { val mapTopicParts = this.consumers.head.listTopics() assertNotNull(mapTopicParts) - assertEquals(4, mapTopicParts.size()) - assertEquals(4, mapTopicParts.keySet().size()) + assertEquals(5, mapTopicParts.size()) + assertEquals(5, mapTopicParts.keySet().size()) assertEquals(2, mapTopicParts.get(topic1).length) assertEquals(2, mapTopicParts.get(topic2).length) assertEquals(2, mapTopicParts.get(topic3).length) -- 2.3.2 (Apple Git-55)