diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index af5d231..a3d88ea 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -58,10 +58,11 @@ object ClientUtils extends Logging{ * @param clientId The client's identifier * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = { val props = new Properties() props.put("broker.list", brokers.map(_.getConnectionString()).mkString(",")) props.put("client.id", clientId) + props.put("request.timeout.ms", timeoutMs.toString) val producerConfig = new ProducerConfig(props) fetchTopicMetadata(topics, brokers, producerConfig, 0) } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 5dffa7e..7e84043 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -79,6 +79,11 @@ object ConsoleConsumer extends Logging { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(2 * 1024 * 1024) + val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.SocketTimeout) val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " + "of time without incoming messages") .withRequiredArg @@ -146,6 +151,7 @@ object ConsoleConsumer extends Logging { val props = new Properties() props.put("group.id", options.valueOf(groupIdOpt)) props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString) props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString) props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 69c6b3e..3aa7b08 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -54,7 +54,10 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { trace("Partitions without leader %s".format(noLeaderPartitionSet)) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, + brokers, + config.clientId, + config.socketTimeoutMs).topicsMetadata val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] topicsMetadata.foreach( tmd => { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c1f8513..a780a41 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -400,7 +400,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, + brokers, + config.clientId, + config.socketTimeoutMs).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] topicsMetadata.foreach(m =>{ val topic = m.topic diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 1842c03..d8127a8 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1)