From eabb1cb4c05fdfcbdf2d9ef3b69cc46a6f4bacbf Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 28 Aug 2014 09:50:24 -0700 Subject: [PATCH] dummy --- core/src/main/scala/kafka/server/RequestPurgatory.scala | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index ce06d2c..673ca6d 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -88,7 +88,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt newGauge( "NumDelayedRequests", new Gauge[Int] { - def value = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.numRequests() } ) @@ -210,7 +210,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr - expiredRequestReaper.satisfyRequest() } } } @@ -230,10 +229,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private val running = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) - /* The count of elements in the delay queue that are unsatisfied */ - private [kafka] val unsatisfied = new AtomicInteger(0) - - def numRequests = delayed.size() + def numRequests() = delayed.size() /** Main loop for the expiry thread */ def run() { @@ -263,7 +259,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt /** Add a request to be expired */ def enqueue(t: T) { delayed.add(t) - unsatisfied.incrementAndGet() } /** Shutdown the expiry thread*/ @@ -274,9 +269,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt debug("Shut down complete.") } - /** Record the fact that we satisfied a request in the stats for the expiry queue */ - def satisfyRequest(): Unit = unsatisfied.getAndDecrement() - /** * Get the next expired event */ @@ -287,7 +279,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt return null.asInstanceOf[T] val updated = curr.satisfied.compareAndSet(false, true) if(updated) { - unsatisfied.getAndDecrement() return curr } } -- 1.7.12.4