From 25af85b9570037e93bc033f875bd83bba1e7ae0c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 15 May 2015 11:09:07 -0700 Subject: [PATCH] v1 --- .../main/scala/kafka/server/DelayedOperation.scala | 44 +++++++++++++++++----- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2ed9b46..51da68b 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -19,11 +19,13 @@ package kafka.server import kafka.utils._ import kafka.utils.timer._ +import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.metrics.KafkaMetricsGroup import java.util.LinkedList import java.util.concurrent._ import java.util.concurrent.atomic._ +import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.kafka.common.utils.Utils @@ -122,7 +124,10 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val timeoutTimer = new Timer(executor) /* a list of operation watching keys */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + //private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) + + private val watchersLock = new ReentrantReadWriteLock() // the number of estimated total operations in the purgatory private[this] val estimatedTotalOperations = new AtomicInteger(0) @@ -217,7 +222,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * @return the number of completed operations during this process */ def checkAndComplete(key: Any): Int = { - val watchers = watchersForKey.get(key) + val watchers = inReadLock(watchersLock) { watchersForKey.get(key) } if(watchers == null) 0 else @@ -229,7 +234,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ - def watched() = watchersForKey.values.map(_.watched).sum + def watched() = allWatchers.map(_.watched).sum /** * Return the number of delayed operations in the expiry queue @@ -239,7 +244,22 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /* * Return the watch list of the given key */ - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) + private def watchersFor(key: Any) = inReadLock(watchersLock) { watchersForKey.getAndMaybePut(key) } + + /* + * Return all the current watcher lists + */ + private def allWatchers = inReadLock(watchersLock) { watchersForKey.values } + + /* + * Remove the key from watcher lists if its list is empty + */ + private def removeKey(key: Any) = inWriteLock(watchersLock) { + val watchers = watchersForKey.get(key) + if (watchers != null && watchers.watched == 0) { + watchersForKey.remove(key) + } + } /** * Shutdown the expire reaper thread @@ -252,11 +272,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /** * A linked list of watched delayed operations based on some key */ - private class Watchers { + private class Watchers(val key: Any) { private[this] val operations = new LinkedList[T]() - def watched(): Int = operations synchronized operations.size + def watched: Int = operations synchronized operations.size // add the element to watch def watch(t: T) { @@ -266,8 +286,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // traverse the list and try to complete some watched elements def tryCompleteWatched(): Int = { + var completed = 0 operations synchronized { - var completed = 0 val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() @@ -279,8 +299,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br iter.remove() } } - completed + + if (operations.size == 0) + removeKey(key) } + completed } // traverse the list and purge elements that are already completed by others @@ -295,6 +318,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br purged += 1 } } + + if (operations.size == 0) + removeKey(key) } purged } @@ -319,7 +345,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum + val purged = allWatchers.map(_.purgeCompleted()).sum debug("Purged %d elements from watch lists.".format(purged)) } } -- 1.7.12.4