From ecdec685691f24089fb1250a9f5a090421dd7997 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 1 Sep 2014 13:04:30 -0700 Subject: [PATCH 1/5] dummy --- .../main/scala/kafka/server/RequestPurgatory.scala | 30 +++++++++----------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index ce06d2c..66eed23 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -81,14 +81,14 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched.get() + expiredRequestReaper.numRequests + def value = size() } ) newGauge( "NumDelayedRequests", new Gauge[Int] { - def value = expiredRequestReaper.unsatisfied.get() + def value = expiredRequestReaper.enqueued() } ) @@ -130,6 +130,13 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt w.collectSatisfiedRequests() } + /* + * Return the size of the purgatory, which is size of watch lists. Since an operation may + * still be in the watch lists even when it has been completed, this number may be larger + * than the number of real operations watched + */ + protected def size() = watchersForKey.values.map(_.watched).sum + private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** @@ -156,6 +163,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private class Watchers { private val requests = new util.ArrayList[T] + // return the size of the watch list + def watched() = requests.size() + // potentially add the element to watch if it is not satisfied yet def checkAndMaybeAdd(t: T): Boolean = { synchronized { @@ -168,7 +178,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt return false } requests.add(t) - watched.getAndIncrement() return true } } @@ -182,7 +191,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val curr = iter.next if(curr.satisfied.get()) { iter.remove() - watched.getAndDecrement() purged += 1 } } @@ -206,11 +214,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() - watched.getAndDecrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr - expiredRequestReaper.satisfyRequest() } } } @@ -230,10 +236,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private val running = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) - /* The count of elements in the delay queue that are unsatisfied */ - private [kafka] val unsatisfied = new AtomicInteger(0) - - def numRequests = delayed.size() + def enqueued() = delayed.size() /** Main loop for the expiry thread */ def run() { @@ -245,7 +248,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expire(curr) } } - if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge + if (size >= purgeInterval) { // see if we need to force a full purge debug("Beginning purgatory purge") val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) @@ -263,7 +266,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt /** Add a request to be expired */ def enqueue(t: T) { delayed.add(t) - unsatisfied.incrementAndGet() } /** Shutdown the expiry thread*/ @@ -274,9 +276,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt debug("Shut down complete.") } - /** Record the fact that we satisfied a request in the stats for the expiry queue */ - def satisfyRequest(): Unit = unsatisfied.getAndDecrement() - /** * Get the next expired event */ @@ -287,7 +286,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt return null.asInstanceOf[T] val updated = curr.satisfied.compareAndSet(false, true) if(updated) { - unsatisfied.getAndDecrement() return curr } } -- 1.7.10.2 (Apple Git-33) From c78dd4b5d872a4529d2ee779938e195a4dbf9777 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 1 Sep 2014 13:30:38 -0700 Subject: [PATCH 2/5] Incorporated Jun's comments --- .../main/scala/kafka/server/RequestPurgatory.scala | 19 ++++++++------ .../unit/kafka/server/RequestPurgatoryTest.scala | 27 ++++++++++++++++++-- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 66eed23..7c51459 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -71,9 +71,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - /* the number of requests being watched, duplicates added on different watchers are also counted */ - private val watched = new AtomicInteger(0) - /* background thread expiring requests that have been waiting too long */ private val expiredRequestReaper = new ExpiredRequestReaper private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) @@ -88,7 +85,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt newGauge( "NumDelayedRequests", new Gauge[Int] { - def value = expiredRequestReaper.enqueued() + def value = enqueued() } ) @@ -131,12 +128,20 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt } /* - * Return the size of the purgatory, which is size of watch lists. Since an operation may + * Return the size of the purgatory, which is the size of watch lists. Since an operation may * still be in the watch lists even when it has been completed, this number may be larger * than the number of real operations watched */ protected def size() = watchersForKey.values.map(_.watched).sum + /* + * Return the number of requests in the expiry reaper's queue + */ + protected def enqueued() = expiredRequestReaper.delayed.size() + + /* + * Return the watch list for the given watch key + */ private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** @@ -231,12 +236,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt */ private class ExpiredRequestReaper extends Runnable with Logging { this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId) - - private val delayed = new DelayQueue[T] private val running = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) - def enqueued() = delayed.size() + val delayed = new DelayQueue[T] /** Main loop for the expiry thread */ def run() { diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 168712d..be8fbb9 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -34,7 +34,7 @@ class RequestPurgatoryTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new MockRequestPurgatory() + purgatory = new MockRequestPurgatory(5) } override def tearDown() { @@ -73,8 +73,27 @@ class RequestPurgatoryTest extends JUnit3Suite { assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } + + @Test + def testRequestPurge() { + val r1 = new DelayedRequest(Array("test1"), null, 100000L) + val r12 = new DelayedRequest(Array("test1", "test2"), null, 100000L) + val r23 = new DelayedRequest(Array("test2", "test3"), null, 100000L) + assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1)) + assertFalse("r12 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r12)) + assertFalse("r23 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r23)) + + assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) + assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed()) + + // satisfy one of the request, it should be purged by the reaper + purgatory.satisfied += r12 + Thread.sleep(1000L) + assertEquals("Purgatory should have 3 watched elements", 3, purgatory.watched()) + assertEquals("Purgatory should have 2 total delayed requests", 2, purgatory.delayed()) + } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { + class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = { @@ -89,6 +108,10 @@ class RequestPurgatoryTest extends JUnit3Suite { delayed.notify() } } + + def watched() = size() + + def delayed() = enqueued() } } \ No newline at end of file -- 1.7.10.2 (Apple Git-33) From 7c7e497bd85bd383e01e1406aec7b1dc75b69a67 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 1 Sep 2014 14:36:10 -0700 Subject: [PATCH 3/5] minor fix --- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index be8fbb9..ab6393b 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -87,7 +87,7 @@ class RequestPurgatoryTest extends JUnit3Suite { assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed()) // satisfy one of the request, it should be purged by the reaper - purgatory.satisfied += r12 + r12.satisfied.set(true) Thread.sleep(1000L) assertEquals("Purgatory should have 3 watched elements", 3, purgatory.watched()) assertEquals("Purgatory should have 2 total delayed requests", 2, purgatory.delayed()) -- 1.7.10.2 (Apple Git-33) From 8beda04c8dadff8677018403c022ec9c2f14513e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 2 Sep 2014 11:45:04 -0700 Subject: [PATCH 4/5] incorporated Jun's comment round two --- core/src/main/scala/kafka/server/RequestPurgatory.scala | 2 +- .../src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 7c51459..b5fc87a 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -251,7 +251,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expire(curr) } } - if (size >= purgeInterval) { // see if we need to force a full purge + if (RequestPurgatory.this.size >= purgeInterval) { // see if we need to force a full purge debug("Beginning purgatory purge") val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index ab6393b..0671c82 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -86,11 +86,10 @@ class RequestPurgatoryTest extends JUnit3Suite { assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed()) - // satisfy one of the request, it should be purged by the reaper + // satisfy one of the requests, it should then be purged by the reaper r12.satisfied.set(true) - Thread.sleep(1000L) - assertEquals("Purgatory should have 3 watched elements", 3, purgatory.watched()) - assertEquals("Purgatory should have 2 total delayed requests", 2, purgatory.delayed()) + TestUtils.waitUntilTrue(purgatory.watched() = 3, "Purgatory should have 3 watched elements", 1000L) + TestUtils.waitUntilTrue(purgatory.delayed() = 2, "Purgatory should have 2 total delayed requests", 1000L) } class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) { -- 1.7.10.2 (Apple Git-33) From 19620bac0332a7e0d8caa1820f7d3b6ff4d64ccc Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 3 Sep 2014 12:52:16 -0700 Subject: [PATCH 5/5] Incorporate Jun's comment round four --- core/src/main/scala/kafka/server/RequestPurgatory.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index b5fc87a..fd02feb 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -251,12 +251,17 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expire(curr) } } - if (RequestPurgatory.this.size >= purgeInterval) { // see if we need to force a full purge - debug("Beginning purgatory purge") + // see if we need to purge the watch lists + if (RequestPurgatory.this.size >= purgeInterval) { + debug("Beginning purgatory watch lists purge") + val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum + debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers)) + } + // see if we need to purge the delayed request queue + if (RequestPurgatory.this.enqueued > purgeInterval) { + debug("Beginning purgatory delay queue purge") val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) - val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers)) } } catch { case e: Exception => -- 1.7.10.2 (Apple Git-33)