From eabb1cb4c05fdfcbdf2d9ef3b69cc46a6f4bacbf Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 28 Aug 2014 09:50:24 -0700 Subject: [PATCH 1/2] dummy --- core/src/main/scala/kafka/server/RequestPurgatory.scala | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index ce06d2c..673ca6d 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -88,7 +88,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt newGauge( "NumDelayedRequests", new Gauge[Int] { - def value = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.numRequests() } ) @@ -210,7 +210,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr - expiredRequestReaper.satisfyRequest() } } } @@ -230,10 +229,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private val running = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) - /* The count of elements in the delay queue that are unsatisfied */ - private [kafka] val unsatisfied = new AtomicInteger(0) - - def numRequests = delayed.size() + def numRequests() = delayed.size() /** Main loop for the expiry thread */ def run() { @@ -263,7 +259,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt /** Add a request to be expired */ def enqueue(t: T) { delayed.add(t) - unsatisfied.incrementAndGet() } /** Shutdown the expiry thread*/ @@ -274,9 +269,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt debug("Shut down complete.") } - /** Record the fact that we satisfied a request in the stats for the expiry queue */ - def satisfyRequest(): Unit = unsatisfied.getAndDecrement() - /** * Get the next expired event */ @@ -287,7 +279,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt return null.asInstanceOf[T] val updated = curr.satisfied.compareAndSet(false, true) if(updated) { - unsatisfied.getAndDecrement() return curr } } -- 1.7.12.4 From ceb90aa20dea57fc91e85e54dc2cdc1ca47e8408 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 28 Aug 2014 10:12:02 -0700 Subject: [PATCH 2/2] v1 --- .../main/scala/kafka/server/RequestPurgatory.scala | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 673ca6d..66eed23 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -81,14 +81,14 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched.get() + expiredRequestReaper.numRequests + def value = size() } ) newGauge( "NumDelayedRequests", new Gauge[Int] { - def value = expiredRequestReaper.numRequests() + def value = expiredRequestReaper.enqueued() } ) @@ -130,6 +130,13 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt w.collectSatisfiedRequests() } + /* + * Return the size of the purgatory, which is size of watch lists. Since an operation may + * still be in the watch lists even when it has been completed, this number may be larger + * than the number of real operations watched + */ + protected def size() = watchersForKey.values.map(_.watched).sum + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** @@ -156,6 +163,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private class Watchers { private val requests = new util.ArrayList[T] + // return the size of the watch list + def watched() = requests.size() + // potentially add the element to watch if it is not satisfied yet def checkAndMaybeAdd(t: T): Boolean = { synchronized { @@ -168,7 +178,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt return false } requests.add(t) - watched.getAndIncrement() return true } } @@ -182,7 +191,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val curr = iter.next if(curr.satisfied.get()) { iter.remove() - watched.getAndDecrement() purged += 1 } } @@ -206,7 +214,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() - watched.getAndDecrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr @@ -229,7 +236,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private val running = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) - def numRequests() = delayed.size() + def enqueued() = delayed.size() /** Main loop for the expiry thread */ def run() { @@ -241,7 +248,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expire(curr) } } - if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge + if (size >= purgeInterval) { // see if we need to force a full purge debug("Beginning purgatory purge") val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) -- 1.7.12.4