diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index ac74931..19c961e 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -201,5 +201,9 @@ class FetchRequestBuilder() { this } - def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + def build() = { + val fetchRequest = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + requestMap.clear() + fetchRequest + } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 0b286f0..1ccf578 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -46,6 +46,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val metricId = new ClientIdAndBroker(clientId, brokerInfo) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) + val fetchRequestBuilder = new FetchRequestBuilder(). + clientId(clientId). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) /* callbacks to be defined in subclass */ @@ -65,12 +70,6 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(clientId). - replicaId(fetcherBrokerId). - maxWait(maxWait). - minBytes(minBytes) - partitionMapLock.lock() try { while (partitionMap.isEmpty)