From 9237909c94aa70ee87e6701fb7e3d0b09ac6b592 Mon Sep 17 00:00:00 2001
From: Evelina Stepanova <estepanova@griddynamics.com>
Date: Wed, 25 Dec 2013 18:06:06 +0400
Subject: [PATCH 1/2] KAFKA-1184 configurable fetcher threads number

---
 .../main/scala/kafka/consumer/ConsumerConfig.scala |    4 ++++
 .../kafka/consumer/ConsumerFetcherManager.scala    |    2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |    1 +
 3 files changed, 6 insertions(+), 1 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index c8c4212..e6875d6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -28,6 +28,7 @@ object ConsumerConfig extends Config {
   val SocketBufferSize = 64*1024
   val FetchSize = 1024 * 1024
   val MaxFetchSize = 10*FetchSize
+  val NumConsumerFetchers = 1
   val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
   val AutoCommitInterval = 60 * 1000
@@ -93,6 +94,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   
   /** the number of byes of messages to attempt to fetch */
   val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
+
+  /** the number threads used to fetch data */
+  val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
   val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index e4451bb..b9e2bea 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -41,7 +41,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
         extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
-                                       config.clientId, 1) {
+                                       config.clientId, config.numConsumerFetchers) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d88b6c3..426b1a7 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -150,6 +150,7 @@ object TestUtils extends Logging {
     props.put("auto.commit.interval.ms", "1000")
     props.put("rebalance.max.retries", "4")
     props.put("auto.offset.reset", "smallest")
+    props.put("num.consumer.fetchers", "2")
 
     props
   }
-- 
1.7.0.4


From 54e946c052af2571efac01261ab85e7fc0cf3e5a Mon Sep 17 00:00:00 2001
From: Evelina Stepanova <estepanova@griddynamics.com>
Date: Fri, 10 Jan 2014 17:09:39 +0400
Subject: [PATCH 2/2] KAFKA-1184 add fetcher threads number option to consumer performance test

---
 .../scala/kafka/perf/ConsumerPerformance.scala     |    6 ++++++
 1 files changed, 6 insertions(+), 0 deletions(-)

diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
index ec3cd29..6d39c73 100644
--- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
@@ -112,6 +112,11 @@ object ConsumerPerformance {
                            .describedAs("count")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(10)
+    val numFetchersOpt = parser.accepts("fetch-threads", "Number of fetching threads.")
+                               .withRequiredArg
+                               .describedAs("count")
+                               .ofType(classOf[java.lang.Integer])
+                               .defaultsTo(1)
 
     val options = parser.parse(args : _*)
 
@@ -130,6 +135,7 @@ object ConsumerPerformance {
     props.put("auto.offset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
     props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", "5000")
+    props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
     val consumerConfig = new ConsumerConfig(props)
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val topic = options.valueOf(topicOpt)
-- 
1.7.0.4

