From 9a4e342ea45694119b7ec30c769fd16fe976cfd1 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Apr 2015 13:47:41 -0700 Subject: [PATCH] dummy --- .../main/scala/kafka/server/DelayedOperation.scala | 26 +++++++++++++--------- core/src/main/scala/kafka/utils/timer/Timer.scala | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 2 +- .../unit/kafka/server/DelayedOperationTest.scala | 14 ++++++------ 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2ed9b46..7ff4737 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -135,7 +135,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched() + def value = watched }, metricsTags ) @@ -143,7 +143,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br newGauge( "NumDelayedOperations", new Gauge[Int] { - def value = delayed() + def value = delayed }, metricsTags ) @@ -218,10 +218,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ def checkAndComplete(key: Any): Int = { val watchers = watchersForKey.get(key) - if(watchers == null) + if(watchers == null) { 0 - else - watchers.tryCompleteWatched() + } else { + val completed = watchers.tryCompleteWatched() + if (watchers.watched == 0) + watchersForKey.remove(key) + completed + } } /** @@ -229,12 +233,12 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * on multiple lists, and some of its watched entries may still be in the watch lists * even when it has been completed, this number may be larger than the number of real operations watched */ - def watched() = watchersForKey.values.map(_.watched).sum + def watched = watchersForKey.values.map(_.watched).sum /** * Return the number of delayed operations in the expiry queue */ - def delayed() = timeoutTimer.size + def delayed = timeoutTimer.size /* * Return the watch list of the given key @@ -256,7 +260,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val operations = new LinkedList[T]() - def watched(): Int = operations synchronized operations.size + def watched: Int = operations synchronized operations.size // add the element to watch def watch(t: T) { @@ -271,7 +275,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted) { + if (curr.isCompleted()) { // another thread has completed this operation, just remove it iter.remove() } else if (curr synchronized curr.tryComplete()) { @@ -290,7 +294,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted) { + if (curr.isCompleted()) { iter.remove() purged += 1 } @@ -320,6 +324,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") val purged = watchersForKey.values.map(_.purgeCompleted()).sum + val emptied = watchersForKey.filter(_._2.watched == 0).map(_._1) + emptied.foreach(watchersForKey.remove(_)) debug("Purged %d elements from watch lists.".format(purged)) } } diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index b8cde82..e428d9d 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -81,6 +81,6 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 } } - def size(): Int = taskCounter.get + def size: Int = taskCounter.get } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index f3546ad..0dfc28b 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -74,7 +74,7 @@ class LogConfigTest extends JUnit3Suite { case LogConfig.UncleanLeaderElectionEnableProp => return case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) - case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); + case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar") case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1") diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index f3ab3f4..02640f7 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -76,27 +76,27 @@ class DelayedOperationTest extends JUnit3Suite { purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) - assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) - assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched()) + assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed) + assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched) // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed(), 2, purgatory.delayed()) + assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed, 2, purgatory.delayed) r3.completable = true r3.tryComplete() - assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed(), 1, purgatory.delayed()) + assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed, 1, purgatory.delayed) // checking a watch should purge the watch list purgatory.checkAndComplete("test1") - assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched(), 4, purgatory.watched()) + assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched, 4, purgatory.watched) purgatory.checkAndComplete("test2") - assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched(), 2, purgatory.watched()) + assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched, 2, purgatory.watched) purgatory.checkAndComplete("test3") - assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched(), 1, purgatory.watched()) + assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched, 1, purgatory.watched) } class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { -- 1.7.12.4