From 191920e212790f6ca7e0eeac727f59461604c470 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 6 Jun 2015 21:48:47 -0700 Subject: [PATCH] v1 --- .../main/scala/kafka/server/DelayedOperation.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 123078d..17dae07 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -253,10 +253,16 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /* * Remove the key from watcher lists if its list is empty */ - private def removeKeyIfEmpty(key: Any) = inWriteLock(removeWatchersLock) { - val watchers = watchersForKey.get(key) - if (watchers != null && watchers.watched == 0) { - watchersForKey.remove(key) + private def removeKeyIfEmpty(key: Any, watchers: Watchers) { + inWriteLock(removeWatchersLock) { + // if the current key is no longer correlated to the watchers to remove, skip + if (watchersForKey.get(key) != watchers) + return + + //val watchers = watchersForKey.get(key) + if (watchers != null && watchers.watched == 0) { + watchersForKey.remove(key) + } } } @@ -300,7 +306,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } if (operations.size == 0) - removeKeyIfEmpty(key) + removeKeyIfEmpty(key, this) } completed } @@ -319,7 +325,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } if (operations.size == 0) - removeKeyIfEmpty(key) + removeKeyIfEmpty(key, this) } purged } -- 1.7.10.2 (Apple Git-33)