From 9a4e342ea45694119b7ec30c769fd16fe976cfd1 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Apr 2015 13:47:41 -0700 Subject: [PATCH 1/2] dummy --- .../main/scala/kafka/server/DelayedOperation.scala | 26 +++++++++++++--------- core/src/main/scala/kafka/utils/timer/Timer.scala | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 2 +- .../unit/kafka/server/DelayedOperationTest.scala | 14 ++++++------ 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2ed9b46..7ff4737 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -135,7 +135,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched() + def value = watched }, metricsTags ) @@ -143,7 +143,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br newGauge( "NumDelayedOperations", new Gauge[Int] { - def value = delayed() + def value = delayed }, metricsTags ) @@ -218,10 +218,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ def checkAndComplete(key: Any): Int = { val watchers = watchersForKey.get(key) - if(watchers == null) + if(watchers == null) { 0 - else - watchers.tryCompleteWatched() + } else { + val completed = watchers.tryCompleteWatched() + if (watchers.watched == 0) + watchersForKey.remove(key) + completed + } } /** @@ -229,12 +233,12 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ - def watched() = watchersForKey.values.map(_.watched).sum + def watched = watchersForKey.values.map(_.watched).sum /** * Return the number of delayed operations in the expiry queue */ - def delayed() = timeoutTimer.size + def delayed = timeoutTimer.size /* * Return the watch list of the given key @@ -256,7 +260,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val operations = new LinkedList[T]() - def watched(): Int = operations synchronized operations.size + def watched: Int = operations synchronized operations.size // add the element to watch def watch(t: T) { @@ -271,7 +275,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted) { + if (curr.isCompleted()) { // another thread has completed this operation, just remove it iter.remove() } else if (curr synchronized curr.tryComplete()) { @@ -290,7 +294,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted) { + if (curr.isCompleted()) { iter.remove() purged += 1 } @@ -320,6 +324,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") val purged = watchersForKey.values.map(_.purgeCompleted()).sum + val emptied = watchersForKey.filter(_._2.watched == 0).map(_._1) + emptied.foreach(watchersForKey.remove(_)) debug("Purged %d elements from watch lists.".format(purged)) } } diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index b8cde82..e428d9d 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -81,6 +81,6 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 } } - def size(): Int = taskCounter.get + def size: Int = taskCounter.get } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index f3546ad..0dfc28b 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -74,7 +74,7 @@ class LogConfigTest extends JUnit3Suite { case LogConfig.UncleanLeaderElectionEnableProp => return case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) - case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); + case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar") case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1") diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index f3ab3f4..02640f7 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -76,27 +76,27 @@ class DelayedOperationTest extends JUnit3Suite { purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) - assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) - assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched()) + assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed) + assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched) // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed(), 2, purgatory.delayed()) + assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed, 2, purgatory.delayed) r3.completable = true r3.tryComplete() - assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed(), 1, purgatory.delayed()) + assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed, 1, purgatory.delayed) // checking a watch should purge the watch list purgatory.checkAndComplete("test1") - assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched(), 4, purgatory.watched()) + assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched, 4, purgatory.watched) purgatory.checkAndComplete("test2") - assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched(), 2, purgatory.watched()) + assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched, 2, purgatory.watched) purgatory.checkAndComplete("test3") - assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched(), 1, purgatory.watched()) + assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched, 1, purgatory.watched) } class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { -- 1.7.12.4 From 99c6bdb8c87a3459b9062780542d9033d59a5791 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Apr 2015 15:19:43 -0700 Subject: [PATCH 2/2] v2 --- .../main/scala/kafka/server/DelayedOperation.scala | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 7ff4737..859d053 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -221,10 +221,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br if(watchers == null) { 0 } else { - val completed = watchers.tryCompleteWatched() - if (watchers.watched == 0) - watchersForKey.remove(key) - completed + watchers.tryCompleteWatched(watchersForKey.remove(key)) } } @@ -268,10 +265,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } // traverse the list and try to complete some watched elements - def tryCompleteWatched(): Int = { - + def tryCompleteWatched(removeIfEmpty: => Unit): Int = { + var completed = 0 operations synchronized { - var completed = 0 val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() @@ -283,12 +279,15 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br iter.remove() } } - completed + + if (operations.isEmpty) + removeIfEmpty } + completed } // traverse the list and purge elements that are already completed by others - def purgeCompleted(): Int = { + def purgeCompleted(removeIfEmpty: => Unit): Int = { var purged = 0 operations synchronized { val iter = operations.iterator() @@ -299,6 +298,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br purged += 1 } } + + if (operations.isEmpty) + removeIfEmpty } purged } @@ -323,9 +325,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum - val emptied = watchersForKey.filter(_._2.watched == 0).map(_._1) - emptied.foreach(watchersForKey.remove(_)) + val purged = watchersForKey.map { case (key, watchers) => + watchers.purgeCompleted(watchersForKey.remove(key)) + }.sum debug("Purged %d elements from watch lists.".format(purged)) } } -- 1.7.12.4