diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index ac74931..d3c9a5b 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -165,8 +165,8 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV @nonthreadsafe class FetchRequestBuilder() { - private val correlationId = new AtomicInteger(0) private val versionId = FetchRequest.CurrentVersion + private var correlationId = 0 private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait @@ -183,6 +183,11 @@ class FetchRequestBuilder() { this } + def correlationId(correlationId: Int): FetchRequestBuilder = { + this.correlationId = correlationId + this + } + /** * Only for internal use. Clients shouldn't set replicaId. */ @@ -201,5 +206,5 @@ class FetchRequestBuilder() { this } - def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap) } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 0b286f0..2629d54 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -25,11 +25,11 @@ import kafka.message.MessageAndOffset import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import java.util.concurrent.atomic.AtomicLong -import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import kafka.utils.{Pool, ShutdownableThread} /** @@ -46,6 +46,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val metricId = new ClientIdAndBroker(clientId, brokerInfo) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) + val correlationId = new AtomicInteger(0) /* callbacks to be defined in subclass */ @@ -66,6 +67,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { val fetchRequestBuilder = new FetchRequestBuilder(). + correlationId(correlationId.getAndIncrement()). clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait). diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index d8127a8..9f942c3 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -24,6 +24,7 @@ import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.Broker import scala.collection.JavaConversions._ +import java.util.concurrent.atomic.AtomicInteger /** * Command line program to dump out messages to standard out using the simple consumer @@ -107,6 +108,7 @@ object SimpleConsumerShell extends Logging { var startingOffset = options.valueOf(offsetOpt).longValue val fetchSize = options.valueOf(fetchSizeOpt).intValue val clientId = options.valueOf(clientIdOpt).toString + val correlationId = new AtomicInteger(0) val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false @@ -116,12 +118,6 @@ object SimpleConsumerShell extends Logging { val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) - val fetchRequestBuilder = new FetchRequestBuilder() - .clientId(clientId) - .replicaId(Request.DebuggingConsumerId) - .maxWait(maxWaitMs) - .minBytes(ConsumerConfig.MinFetchBytes) - // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) @@ -179,9 +175,14 @@ object SimpleConsumerShell extends Logging { var offset = startingOffset try { while(true) { - val fetchRequest = fetchRequestBuilder - .addFetch(topic, partitionId, offset, fetchSize) - .build() + val fetchRequest = new FetchRequestBuilder() + .clientId(clientId) + .correlationId(correlationId.getAndIncrement()) + .replicaId(Request.DebuggingConsumerId) + .maxWait(maxWaitMs) + .minBytes(ConsumerConfig.MinFetchBytes) + .addFetch(topic, partitionId, offset, fetchSize) + .build() val fetchResponse = simpleConsumer.fetch(fetchRequest) val messageSet = fetchResponse.messageSet(topic, partitionId) if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) {