From 1814e7c904072f0f67c5128d53a20d26ebb56b1a Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 12 May 2015 15:37:21 -0700 Subject: [PATCH 1/2] synchronize on getting size from watchers --- core/src/main/scala/kafka/server/RequestPurgatory.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 87ee3be..098679c 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -196,7 +196,11 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private val requests = new util.LinkedList[T] // return the size of the watch list - def watched() = requests.size() + def watched() = { + synchronized { + requests.size() + } + } // add the element to the watcher list if it's not already satisfied def addIfNotSatisfied(t: T): Boolean = { -- 1.8.5.2 (Apple Git-48) From ef381eaefea768eb95da279d1bf197aeab27a66b Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Thu, 14 May 2015 09:04:38 -0700 Subject: [PATCH 2/2] add instrumentation --- core/src/main/scala/kafka/server/FetchRequestPurgatory.scala | 2 +- core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala | 2 +- core/src/main/scala/kafka/server/RequestPurgatory.scala | 6 +++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala index ed13188..5a20233 100644 --- a/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/FetchRequestPurgatory.scala @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit * The purgatory holding delayed fetch requests */ class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) { + extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests, "FetchPurgatory") { this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId) private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala index e7ff411..852841a 100644 --- a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit * The purgatory holding delayed producer requests */ class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel) - extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) { + extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests, "ProducePurgatory") { this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId) private class DelayedProducerRequestMetrics(metricId: Option[TopicAndPartition]) extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 098679c..701583e 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -65,7 +65,7 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de * this function handles delayed requests that have hit their time limit without being satisfied. * */ -abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000) +abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000, purgatoryName: String = "") extends Logging with KafkaMetricsGroup { /* a list of requests watching each key */ @@ -280,6 +280,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expire(curr) } } + + debug("Checking for purging in %s; watched items: %d, purge threshold: %d". + format(purgatoryName, RequestPurgatory.this.watched(), purgeInterval)) + // see if we need to purge the watch lists if (RequestPurgatory.this.watched() >= purgeInterval) { debug("Begin purging watch lists") -- 1.8.5.2 (Apple Git-48)