diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index ac74931..d3c9a5b 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -165,8 +165,8 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV @nonthreadsafe class FetchRequestBuilder() { - private val correlationId = new AtomicInteger(0) private val versionId = FetchRequest.CurrentVersion + private var correlationId = 0 private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait @@ -183,6 +183,11 @@ class FetchRequestBuilder() { this } + def correlationId(correlationId: Int): FetchRequestBuilder = { + this.correlationId = correlationId + this + } + /** * Only for internal use. Clients shouldn't set replicaId. */ @@ -201,5 +206,5 @@ class FetchRequestBuilder() { this } - def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap) } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 0b286f0..2629d54 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -25,11 +25,11 @@ import kafka.message.MessageAndOffset import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import java.util.concurrent.atomic.AtomicLong -import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import kafka.utils.{Pool, ShutdownableThread} /** @@ -46,6 +46,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val metricId = new ClientIdAndBroker(clientId, brokerInfo) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) + val correlationId = new AtomicInteger(0) /* callbacks to be defined in subclass */ @@ -66,6 +67,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { val fetchRequestBuilder = new FetchRequestBuilder(). + correlationId(correlationId.getAndIncrement()). clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait).