diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 8fb9865..3aaf38e 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -88,18 +88,11 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper) expirationThread.start() - def purgeSatisfied() { - expiredRequestReaper.forcePurge() - } - /** * Add a new delayed request watching the contained keys */ def watch(delayedRequest: T) { - if (requestCounter.getAndIncrement() >= purgeInterval) { - requestCounter.set(0) - purgeSatisfied() - } + requestCounter.getAndIncrement() for(key <- delayedRequest.keys) { var lst = watchersFor(key) @@ -218,15 +211,19 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge while(running.get) { try { val curr = pollExpired() - curr synchronized { - expire(curr) + if (curr != null) { + curr synchronized { + expire(curr) + } } - } catch { - case ie: InterruptedException => + if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge + requestCounter.set(0) val purged = purgeSatisfied() debug("Purged %d requests from delay queue.".format(purged)) val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers)) + } + } catch { case e: Exception => error("Error in long poll expiry thread: ", e) } @@ -240,10 +237,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge unsatisfied.incrementAndGet() } - def forcePurge() { - expirationThread.interrupt() - } - /** Shutdown the expiry thread*/ def shutdown() { debug("Shutting down.") @@ -261,7 +254,9 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge */ private def pollExpired(): T = { while(true) { - val curr = delayed.take() + val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) + if (curr == null) + return null.asInstanceOf[T] val updated = curr.satisfied.compareAndSet(false, true) if(updated) { unsatisfied.getAndDecrement()