diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index ac74931..d3c9a5b 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -165,8 +165,8 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV @nonthreadsafe class FetchRequestBuilder() { - private val correlationId = new AtomicInteger(0) private val versionId = FetchRequest.CurrentVersion + private var correlationId = 0 private var clientId = ConsumerConfig.DefaultClientId private var replicaId = Request.OrdinaryConsumerId private var maxWait = FetchRequest.DefaultMaxWait @@ -183,6 +183,11 @@ class FetchRequestBuilder() { this } + def correlationId(correlationId: Int): FetchRequestBuilder = { + this.correlationId = correlationId + this + } + /** * Only for internal use. Clients shouldn't set replicaId. */ @@ -201,5 +206,5 @@ class FetchRequestBuilder() { this } - def build() = FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap) + def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap) } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 374cd6b..8820a0b 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -65,7 +65,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { - Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) + Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, Utils.getNextNonNegativeInt(correlationId))) topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } @@ -74,7 +74,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // get topics of the outstanding produce requests and refresh metadata for those - Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) + Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, Utils.getNextNonNegativeInt(correlationId))) remainingRetries -= 1 producerStats.resendRate.mark() } @@ -181,7 +181,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = { - val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement) + val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, Utils.getNextNonNegativeInt(correlationId)) debug("Broker partitions registered for topic: %s are %s" .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(","))) val totalNumPartitions = topicPartitionsList.length @@ -232,7 +232,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, messagesPerTopic.map(_._1.toString).mkString(","))) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { - val currentCorrelationId = correlationId.getAndIncrement + val currentCorrelationId = Utils.getNextNonNegativeInt(correlationId) val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks, config.requestTimeoutMs, messagesPerTopic) var failedTopicPartitions = Seq.empty[TopicAndPartition] diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 0b286f0..56724c7 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -25,11 +25,11 @@ import kafka.message.MessageAndOffset import kafka.api.{FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import java.util.concurrent.atomic.AtomicLong -import kafka.utils.{Pool, ShutdownableThread} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import kafka.utils.{Utils, Pool, ShutdownableThread} /** @@ -46,6 +46,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val metricId = new ClientIdAndBroker(clientId, brokerInfo) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) + val correlationId = new AtomicInteger(0) /* callbacks to be defined in subclass */ @@ -66,6 +67,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def doWork() { val fetchRequestBuilder = new FetchRequestBuilder(). + correlationId(Utils.getNextNonNegativeInt(correlationId)). clientId(clientId). replicaId(fetcherBrokerId). maxWait(maxWait). diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 0185c14..4a3f28c 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -28,6 +28,7 @@ import scala.collection._ import scala.collection.mutable import java.util.Properties import kafka.common.KafkaException +import java.util.concurrent.atomic.AtomicInteger /** @@ -548,5 +549,10 @@ object Utils extends Logging { * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). */ def abs(n: Int) = n & 0x7fffffff - + + def getNextNonNegativeInt(counter: AtomicInteger): Int = { + if (counter.get == Integer.MAX_VALUE) + counter.set(0) + counter.getAndIncrement() + } } \ No newline at end of file