From b90cd0d47bc1bc0aa8038f0abbcebedda8630e19 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Apr 2015 13:47:41 -0700 Subject: [PATCH 1/4] dummy --- .../main/scala/kafka/server/DelayedOperation.scala | 26 +++++++++++++--------- core/src/main/scala/kafka/utils/timer/Timer.scala | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 2 +- .../unit/kafka/server/DelayedOperationTest.scala | 14 ++++++------ 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2ed9b46..7ff4737 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -135,7 +135,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched() + def value = watched }, metricsTags ) @@ -143,7 +143,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br newGauge( "NumDelayedOperations", new Gauge[Int] { - def value = delayed() + def value = delayed }, metricsTags ) @@ -218,10 +218,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ def checkAndComplete(key: Any): Int = { val watchers = watchersForKey.get(key) - if(watchers == null) + if(watchers == null) { 0 - else - watchers.tryCompleteWatched() + } else { + val completed = watchers.tryCompleteWatched() + if (watchers.watched == 0) + watchersForKey.remove(key) + completed + } } /** @@ -229,12 +233,12 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ - def watched() = watchersForKey.values.map(_.watched).sum + def watched = watchersForKey.values.map(_.watched).sum /** * Return the number of delayed operations in the expiry queue */ - def delayed() = timeoutTimer.size + def delayed = timeoutTimer.size /* * Return the watch list of the given key @@ -256,7 +260,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val operations = new LinkedList[T]() - def watched(): Int = operations synchronized operations.size + def watched: Int = operations synchronized operations.size // add the element to watch def watch(t: T) { @@ -271,7 +275,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted) { + if (curr.isCompleted()) { // another thread has completed this operation, just remove it iter.remove() } else if (curr synchronized curr.tryComplete()) { @@ -290,7 +294,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted) { + if (curr.isCompleted()) { iter.remove() purged += 1 } @@ -320,6 +324,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") val purged = watchersForKey.values.map(_.purgeCompleted()).sum + val emptied = watchersForKey.filter(_._2.watched == 0).map(_._1) + emptied.foreach(watchersForKey.remove(_)) debug("Purged %d elements from watch lists.".format(purged)) } } diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index b8cde82..e428d9d 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -81,6 +81,6 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 } } - def size(): Int = taskCounter.get + def size: Int = taskCounter.get } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index f3546ad..0dfc28b 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -74,7 +74,7 @@ class LogConfigTest extends JUnit3Suite { case LogConfig.UncleanLeaderElectionEnableProp => return case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) - case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); + case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar") case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1") diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index f3ab3f4..02640f7 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -76,27 +76,27 @@ class DelayedOperationTest extends JUnit3Suite { purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) - assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) - assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched()) + assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed) + assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched) // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed(), 2, purgatory.delayed()) + assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed, 2, purgatory.delayed) r3.completable = true r3.tryComplete() - assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed(), 1, purgatory.delayed()) + assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed, 1, purgatory.delayed) // checking a watch should purge the watch list purgatory.checkAndComplete("test1") - assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched(), 4, purgatory.watched()) + assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched, 4, purgatory.watched) purgatory.checkAndComplete("test2") - assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched(), 2, purgatory.watched()) + assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched, 2, purgatory.watched) purgatory.checkAndComplete("test3") - assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched(), 1, purgatory.watched()) + assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched, 1, purgatory.watched) } class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { -- 1.7.12.4 From 603f2c3632078156687d515312178dff130e669a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Apr 2015 15:19:43 -0700 Subject: [PATCH 2/4] v2 --- .../main/scala/kafka/server/DelayedOperation.scala | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 7ff4737..859d053 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -221,10 +221,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br if(watchers == null) { 0 } else { - val completed = watchers.tryCompleteWatched() - if (watchers.watched == 0) - watchersForKey.remove(key) - completed + watchers.tryCompleteWatched(watchersForKey.remove(key)) } } @@ -268,10 +265,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } // traverse the list and try to complete some watched elements - def tryCompleteWatched(): Int = { - + def tryCompleteWatched(removeIfEmpty: => Unit): Int = { + var completed = 0 operations synchronized { - var completed = 0 val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() @@ -283,12 +279,15 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br iter.remove() } } - completed + + if (operations.isEmpty) + removeIfEmpty } + completed } // traverse the list and purge elements that are already completed by others - def purgeCompleted(): Int = { + def purgeCompleted(removeIfEmpty: => Unit): Int = { var purged = 0 operations synchronized { val iter = operations.iterator() @@ -299,6 +298,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br purged += 1 } } + + if (operations.isEmpty) + removeIfEmpty } purged } @@ -323,9 +325,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum - val emptied = watchersForKey.filter(_._2.watched == 0).map(_._1) - emptied.foreach(watchersForKey.remove(_)) + val purged = watchersForKey.map { case (key, watchers) => + watchers.purgeCompleted(watchersForKey.remove(key)) + }.sum debug("Purged %d elements from watch lists.".format(purged)) } } -- 1.7.12.4 From f6942f5037df372a5aaf3c0f6202f6db8218b8b7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 1 May 2015 16:02:28 -0700 Subject: [PATCH 3/4] dummy --- .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../main/scala/kafka/server/DelayedOperation.scala | 37 ++++++++------- .../main/scala/kafka/server/DelayedProduce.scala | 2 +- core/src/main/scala/kafka/utils/Pool.scala | 53 ++++++++++++++++++++-- 4 files changed, 70 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index aa8d940..949d121 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -686,7 +686,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( - valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) + createFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions val topicPartitions = partitionAssignment.keySet.toSeq diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 859d053..05ad782 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -122,7 +122,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val timeoutTimer = new Timer(executor) /* a list of operation watching keys */ - private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) // the number of estimated total operations in the purgatory private[this] val estimatedTotalOperations = new AtomicInteger(0) @@ -185,8 +185,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false - val watchers = watchersFor(key) - watchers.watch(operation) + watchForKey(key) if (!watchCreated) { watchCreated = true @@ -217,14 +216,23 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * @return the number of completed operations during this process */ def checkAndComplete(key: Any): Int = { + watchersForKey.updateAndMaybeRemove[Int] (key, watchers => watchers.tryCompleteWatched()) + val watchers = watchersForKey.get(key) if(watchers == null) { 0 } else { - watchers.tryCompleteWatched(watchersForKey.remove(key)) + watchers.tryCompleteWatched() } } + /* + * Add the operation into the watchers by key + */ + private def watchForKey(key: Any, operation: T) = { + watchersForKey.updateAndMaybePut(key, watchers => watchers.watch(T)) + } + /** * Return the total size of watch lists the purgatory. Since an operation may be watched * on multiple lists, and some of its watched entries may still be in the watch lists @@ -237,11 +245,6 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ def delayed = timeoutTimer.size - /* - * Return the watch list of the given key - */ - private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) - /** * Shutdown the expire reaper thread */ @@ -253,7 +256,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /** * A linked list of watched delayed operations based on some key */ - private class Watchers { + private class Watchers(val watchKey: Any) { private[this] val operations = new LinkedList[T]() @@ -265,7 +268,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } // traverse the list and try to complete some watched elements - def tryCompleteWatched(removeIfEmpty: => Unit): Int = { + def tryCompleteWatched(): Int = { var completed = 0 operations synchronized { val iter = operations.iterator() @@ -280,14 +283,15 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } } + // if the list is empty, remove the key from watchers if (operations.isEmpty) - removeIfEmpty + watchersForKey.remove(watchKey) } completed } // traverse the list and purge elements that are already completed by others - def purgeCompleted(removeIfEmpty: => Unit): Int = { + def purgeCompleted(): Int = { var purged = 0 operations synchronized { val iter = operations.iterator() @@ -299,8 +303,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } } + // if the list is empty, remove the key from watchers if (operations.isEmpty) - removeIfEmpty + watchersForKey.remove(watchKey) } purged } @@ -325,9 +330,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") - val purged = watchersForKey.map { case (key, watchers) => - watchers.purgeCompleted(watchersForKey.remove(key)) - }.sum + val purged = watchersForKey.values.map(_.purgeCompleted()).sum debug("Purged %d elements from watch lists.".format(purged)) } } diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 05078b2..4d15a46 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -139,7 +139,7 @@ object DelayedProduceMetrics extends KafkaMetricsGroup { "requests", TimeUnit.SECONDS, tags = Map("topic" -> key.topic, "partition" -> key.partition.toString)) - private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory)) + private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](createFactory = Some(partitionExpirationMeterFactory)) def recordExpiration(partition: TopicAndPartition) { aggregateExpirationMeter.mark() diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 9ddcde7..963e50b 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -25,7 +25,7 @@ import kafka.common.KafkaException import java.lang.Object -class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { +class Pool[K,V](createFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { private val pool = new ConcurrentHashMap[K, V] private val createLock = new Object @@ -51,19 +51,62 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] * put a value. */ def getAndMaybePut(key: K) = { - if (valueFactory.isEmpty) + if (createFactory.isEmpty) throw new KafkaException("Empty value factory in pool.") val curr = pool.get(key) if (curr == null) { createLock synchronized { val curr = pool.get(key) if (curr == null) - pool.put(key, valueFactory.get(key)) + pool.put(key, createFactory.get(key)) pool.get(key) } - } - else + } else { curr + } + } + + /** + * Updates the value associate with the given key. If there is no associated + * value, then create the value using the pool's value factory. + * The user should declare the factory method + * as lazy if its side-effects need to be avoided + * @param key The key to lookup. + */ + def updateAndMaybePut(key: K, updateFunc: (V) => Unit) { + if (createFactory.isEmpty) + throw new KafkaException("Empty value factory in pool.") + createLock synchronized { + val curr = pool.get(key) + if (curr == null) { + pool.put(key, createFactory.get(key)) + } else { + updateFunc(curr) + } + } + } + + /** + * Updates the value associate with the given key. If there is no associated + * value, then create the value using the pool's value factory. + * The user should declare the factory method + * as lazy if its side-effects need to be avoided + * @param key The key to lookup. + */ + def updateAndMaybeRemove[T](key: K, updateFunc: (V) => T, removeCond: (V) => Boolean, defaultIfEmpty: T): T = { + if (createFactory.isEmpty) + throw new KafkaException("Empty value factory in pool.") + createLock synchronized { + val curr = pool.get(key) + if (curr != null) { + val result = updateFunc(curr) + if (removeCond) + remove(key) + result + } else { + defaultIfEmpty + } + } } def contains(id: K) = pool.containsKey(id) -- 1.7.12.4 From 8d055f4c04b59d971b5065ebf6b2522cb335083c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 6 May 2015 16:30:57 -0700 Subject: [PATCH 4/4] v3 --- .../main/scala/kafka/server/DelayedOperation.scala | 26 +++-- core/src/main/scala/kafka/utils/Pool.scala | 127 +++++++++++++-------- 2 files changed, 94 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 05ad782..b9bb571 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -185,7 +185,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false - watchForKey(key) + watchForKey(key, operation) if (!watchCreated) { watchCreated = true @@ -216,21 +216,19 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * @return the number of completed operations during this process */ def checkAndComplete(key: Any): Int = { - watchersForKey.updateAndMaybeRemove[Int] (key, watchers => watchers.tryCompleteWatched()) - - val watchers = watchersForKey.get(key) - if(watchers == null) { - 0 - } else { - watchers.tryCompleteWatched() - } + var completed = 0 + watchersForKey.updateAndMaybeRemove(key, + watchers => completed = watchers.tryCompleteWatched(), + watchers => watchers.watched == 0 + ) + completed } /* * Add the operation into the watchers by key */ private def watchForKey(key: Any, operation: T) = { - watchersForKey.updateAndMaybePut(key, watchers => watchers.watch(T)) + watchersForKey.updateAndMaybeRemove(key, watchers => watchers.watch(operation), watchers => false) } /** @@ -330,7 +328,13 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum + var purged = 0 + watchersForKey.keys.foreach { case key => + watchersForKey.updateAndMaybeRemove(key, + watchers => purged += watchers.purgeCompleted(), + watchers => watchers.watched == 0 + ) + } debug("Purged %d elements from watch lists.".format(purged)) } } diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 963e50b..ab51e9e 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -25,19 +25,35 @@ import kafka.common.KafkaException import java.lang.Object -class Pool[K,V](createFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { +class Pool[K, V](createFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { - private val pool = new ConcurrentHashMap[K, V] - private val createLock = new Object + private case class LockableValue (value: V) { + val valueLock = new Object + } + + private val pool = new ConcurrentHashMap[K, LockableValue]() + private val globalLock = new Object def this(m: collection.Map[K, V]) { this() - m.foreach(kv => pool.put(kv._1, kv._2)) + m.foreach(kv => pool.put(kv._1, LockableValue(kv._2))) } - def put(k: K, v: V) = pool.put(k, v) + def put(k: K, v: V): V = { + val oldValue = pool.put(k, LockableValue(v)) + if (oldValue == null) + null.asInstanceOf[V] + else + oldValue.value + } - def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v) + def putIfNotExists(k: K, v: V): V = { + val oldValue = pool.putIfAbsent(k, LockableValue(v)) + if (oldValue == null) + null.asInstanceOf[V] + else + oldValue.value + } /** * Gets the value associated with the given key. If there is no associated @@ -50,70 +66,85 @@ class Pool[K,V](createFactory: Option[(K) => V] = None) extends Iterable[(K, V)] * the value created by the factory if another thread successfully * put a value. */ - def getAndMaybePut(key: K) = { + def getAndMaybePut(key: K): V = { if (createFactory.isEmpty) throw new KafkaException("Empty value factory in pool.") - val curr = pool.get(key) + var curr: LockableValue = pool.get(key) if (curr == null) { - createLock synchronized { - val curr = pool.get(key) - if (curr == null) - pool.put(key, createFactory.get(key)) - pool.get(key) + globalLock synchronized { + curr = pool.get(key) + if (curr == null) { + curr = LockableValue(createFactory.get(key)) + pool.put(key, curr) + curr.value + } else { + curr.value + } } } else { - curr + curr.value } } /** * Updates the value associate with the given key. If there is no associated - * value, then create the value using the pool's value factory. + * value, then create the value using the pool's value factory. After the value + * is updated, if the remove condition is satisfied then remove this key-value + * pair from the map. + * + * NOTE that this function should only be called when value type is not primitive. + * * The user should declare the factory method * as lazy if its side-effects need to be avoided * @param key The key to lookup. */ - def updateAndMaybePut(key: K, updateFunc: (V) => Unit) { + def updateAndMaybeRemove(key: K, updateFunc: (V) => Unit, removeCond: (V) => Boolean) { if (createFactory.isEmpty) throw new KafkaException("Empty value factory in pool.") - createLock synchronized { - val curr = pool.get(key) + + // repeatedly check if the key exists in the map after + // acquiring the lock of the key, if not then create the + // key-value pair first to avoid race condition + var updated = false + while (!updated) { + var curr: LockableValue = pool.get(key) if (curr == null) { - pool.put(key, createFactory.get(key)) - } else { - updateFunc(curr) + globalLock synchronized { + curr = pool.get(key) + if (curr == null) { + curr = LockableValue(createFactory.get(key)) + pool.put(key, curr) + } + } } - } - } - - /** - * Updates the value associate with the given key. If there is no associated - * value, then create the value using the pool's value factory. - * The user should declare the factory method - * as lazy if its side-effects need to be avoided - * @param key The key to lookup. - */ - def updateAndMaybeRemove[T](key: K, updateFunc: (V) => T, removeCond: (V) => Boolean, defaultIfEmpty: T): T = { - if (createFactory.isEmpty) - throw new KafkaException("Empty value factory in pool.") - createLock synchronized { - val curr = pool.get(key) - if (curr != null) { - val result = updateFunc(curr) - if (removeCond) - remove(key) - result - } else { - defaultIfEmpty + curr.valueLock synchronized { + if (curr == pool.get(key)) { + updateFunc(curr.value) + if (removeCond(curr.value)) + pool.remove(key) + updated = true + } } } } def contains(id: K) = pool.containsKey(id) - def get(key: K): V = pool.get(key) + def get(key: K): V = { + val lockableValue = pool.get(key) + if (lockableValue == null) + null.asInstanceOf[V] + else + lockableValue.value + } - def remove(key: K): V = pool.remove(key) + def remove(key: K): V = { + val lockableValue = pool.remove(key) + if (lockableValue == null) + null.asInstanceOf[V] + else + lockableValue.value + } def keys: mutable.Set[K] = { import JavaConversions._ @@ -122,7 +153,7 @@ class Pool[K,V](createFactory: Option[(K) => V] = None) extends Iterable[(K, V)] def values: Iterable[V] = { import JavaConversions._ - new ArrayList[V](pool.values()) + new ArrayList[V](pool.values().map(_.value)) } def clear() { pool.clear() } @@ -135,9 +166,9 @@ class Pool[K,V](createFactory: Option[(K) => V] = None) extends Iterable[(K, V)] def hasNext: Boolean = iter.hasNext - def next: (K, V) = { + def next(): (K, V) = { val n = iter.next - (n.getKey, n.getValue) + (n.getKey, n.getValue.value) } } -- 1.7.12.4