From 191920e212790f6ca7e0eeac727f59461604c470 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 6 Jun 2015 21:48:47 -0700 Subject: [PATCH 1/3] v1 --- .../main/scala/kafka/server/DelayedOperation.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 123078d..17dae07 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -253,10 +253,16 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /* * Remove the key from watcher lists if its list is empty */ - private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) { - val watchers = watchersForKey.get(key) - if (watchers != null && watchers.watched == 0) { - watchersForKey.remove(key) + private def removeKeyIfEmpty(key: Any, watchers: Watchers) { + inWriteLock(removeWatchersLock) { + // if the current key is no longer correlated to the watchers to remove, skip + if (watchersForKey.get(key) != watchers) + return + + //val watchers = watchersForKey.get(key) + if (watchers != null && watchers.watched == 0) { + watchersForKey.remove(key) + } } } @@ -300,7 +306,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } if (operations.size == 0) - removeKeyIfEmpty(key) + removeKeyIfEmpty(key, this) } completed } @@ -319,7 +325,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } if (operations.size == 0) - removeKeyIfEmpty(key) + removeKeyIfEmpty(key, this) } purged } -- 1.7.10.2 (Apple Git-33) From a5f94f120a792ff2b431ba85da0aa2273e130641 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 7 Jun 2015 21:44:23 -0700 Subject: [PATCH 2/3] v2 --- .../main/scala/kafka/server/DelayedOperation.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 17dae07..d1eb76e 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -189,8 +189,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false - val watchers = watchersFor(key) - watchers.watch(operation) + watchForKey(key, operation) if (!watchCreated) { watchCreated = true @@ -241,14 +240,20 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br def delayed() = timeoutTimer.size /* - * Return the watch list of the given key + * Return all the current watcher lists */ - private def watchersFor(key: Any) = inReadLock(removeWatchersLock) { watchersForKey.getAndMaybePut(key) } + private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values } /* - * Return all the current watcher lists + * Return the watch list of the given key, note that we need to + * grab the removeWatchersLock to avoid the operation being added to a removed watcher list */ - private def allWatchers = inReadLock(removeWatchersLock) { watchersForKey.values } + private def watchForKey(key: Any, operation: T) { + inReadLock(removeWatchersLock) { + val watcher = watchersForKey.getAndMaybePut(key) + watcher.watch(operation) + } + } /* * Remove the key from watcher lists if its list is empty -- 1.7.10.2 (Apple Git-33) From 4f231f71ce890b2822f625e9d9e57df6bfa7679f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 7 Jun 2015 21:46:09 -0700 Subject: [PATCH 3/3] minor --- core/src/main/scala/kafka/server/DelayedOperation.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index d1eb76e..91ad129 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -264,7 +264,6 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br if (watchersForKey.get(key) != watchers) return - //val watchers = watchersForKey.get(key) if (watchers != null && watchers.watched == 0) { watchersForKey.remove(key) } -- 1.7.10.2 (Apple Git-33)