From a08241106b72d0a0f809734ee52b3028ac91f604 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 27 Feb 2015 16:04:46 -0800 Subject: [PATCH 1/8] new purgatory implementation --- .../main/scala/kafka/server/DelayedOperation.scala | 174 +++++++++------------ .../main/scala/kafka/server/ReplicaManager.scala | 5 +- .../scala/kafka/utils/SinglyLinkedWeakList.scala | 66 ++++++++ core/src/main/scala/kafka/utils/timer/Timer.scala | 82 ++++++++++ .../main/scala/kafka/utils/timer/TimerTask.scala | 41 +++++ .../scala/kafka/utils/timer/TimerTaskList.scala | 129 +++++++++++++++ .../main/scala/kafka/utils/timer/TimingWheel.scala | 82 ++++++++++ .../unit/kafka/server/DelayedOperationTest.scala | 45 +++--- .../unit/kafka/utils/timer/TimerTaskListTest.scala | 93 +++++++++++ .../scala/unit/kafka/utils/timer/TimerTest.scala | 111 +++++++++++++ 10 files changed, 706 insertions(+), 122 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala create mode 100644 core/src/main/scala/kafka/utils/timer/Timer.scala create mode 100644 core/src/main/scala/kafka/utils/timer/TimerTask.scala create mode 100644 core/src/main/scala/kafka/utils/timer/TimerTaskList.scala create mode 100644 core/src/main/scala/kafka/utils/timer/TimingWheel.scala create mode 100644 core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala create mode 100644 core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index e317676..2f269ec 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -18,11 +18,13 @@ package kafka.server import kafka.utils._ +import kafka.utils.timer._ import kafka.metrics.KafkaMetricsGroup -import java.util +import java.lang.ref.ReferenceQueue import java.util.concurrent._ import java.util.concurrent.atomic._ + import scala.collection._ import com.yammer.metrics.core.Gauge @@ -41,7 +43,10 @@ import com.yammer.metrics.core.Gauge * * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). */ -abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { +abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { + + override val expirationMs = delayMs + System.currentTimeMillis() + private val completed = new AtomicBoolean(false) /* @@ -58,6 +63,8 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { */ def forceComplete(): Boolean = { if (completed.compareAndSet(false, true)) { + // cancel the timeout timer + cancel() onComplete() true } else { @@ -89,19 +96,30 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) { * This function needs to be defined in subclasses */ def tryComplete(): Boolean + + /* + * A task that runs on timeout + */ + def run(): Unit = { + forceComplete() + } } /** * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ -class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000) +class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0) extends Logging with KafkaMetricsGroup { + // timeout timer + private[this] val executor = Executors.newSingleThreadExecutor() + private[this] val timeoutTimer = new Timer(executor) + /* a list of operation watching keys */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) /* background thread expiring operations that have timed out */ - private val expirationReaper = new ExpiredOperationReaper + private val expirationReaper = new ExpiredOperationReaper(timeoutTimer) private val metricsTags = Map("delayedOperation" -> purgatoryName) @@ -166,8 +184,13 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br return true // if it cannot be completed by now and hence is watched, add to the expire queue also - if (! operation.isCompleted()) - expirationReaper.enqueue(operation) + if (! operation.isCompleted()) { + timeoutTimer.add(operation) + if (operation.isCompleted()) { + // cancel the timer task + operation.cancel() + } + } false } @@ -196,7 +219,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /** * Return the number of delayed operations in the expiry queue */ - def delayed() = expirationReaper.delayed + def delayed() = timeoutTimer.size /* * Return the watch list of the given key @@ -208,133 +231,90 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ def shutdown() { expirationReaper.shutdown() + executor.shutdown() } /** * A linked list of watched delayed operations based on some key */ private class Watchers { - private val operations = new util.LinkedList[T] - def watched = operations.size() + private[this] val refQueue = new ReferenceQueue[T]() + + private[this] val unreachable = new AtomicInteger(0) + + private[this] val operations = new SinglyLinkedWeakList[T](refQueue) + + def watched(): Int = operations synchronized operations.size // add the element to watch def watch(t: T) { - synchronized { - operations.add(t) - } + operations synchronized operations.add(t) } // traverse the list and try to complete some watched elements def tryCompleteWatched(): Int = { - var completed = 0 - synchronized { + refQueue synchronized { + while (refQueue.poll() != null) {} + } + + operations synchronized { + // the reference queue is drained. we can set the unreachable count to zero + unreachable.set(0) + + var completed = 0 val iter = operations.iterator() - while(iter.hasNext) { - val curr = iter.next - if (curr.isCompleted()) { + while (iter.hasNext()) { + val curr = iter.next() + if (curr == null || curr.isCompleted()) { // another thread has completed this operation, just remove it iter.remove() + } else if (curr synchronized curr.tryComplete()) { + completed += 1 + iter.remove() } else { - if(curr synchronized curr.tryComplete()) { - iter.remove() - completed += 1 - } } } + completed } - completed } - // traverse the list and purge elements that are already completed by others - def purgeCompleted(): Int = { - var purged = 0 - synchronized { - val iter = operations.iterator() - while (iter.hasNext) { - val curr = iter.next - if(curr.isCompleted()) { - iter.remove() - purged += 1 + def purge(threshold: Int): Unit = { + refQueue.synchronized { + while (refQueue.poll() != null) { + unreachable.incrementAndGet() + } + } + + if (unreachable.get > threshold) { + operations synchronized { + // the reference queue is drained. we can set the unreachable count to zero + unreachable.set(0) + + val iter = operations.iterator() + while (iter.hasNext()) { + val curr = iter.next() + if (curr == null || curr.isCompleted()) { + iter.remove() + } } } } - purged } } /** * A background reaper to expire delayed operations that have timed out */ - private class ExpiredOperationReaper extends ShutdownableThread( + private class ExpiredOperationReaper(timeoutTimer: Timer) extends ShutdownableThread( "ExpirationReaper-%d".format(brokerId), false) { - /* The queue storing all delayed operations */ - private val delayedQueue = new DelayQueue[T] - - /* - * Return the number of delayed operations kept by the reaper - */ - def delayed() = delayedQueue.size() - - /* - * Add an operation to be expired - */ - def enqueue(t: T) { - delayedQueue.add(t) - } - - /** - * Try to get the next expired event and force completing it - */ - private def expireNext() { - val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) - if (curr != null.asInstanceOf[T]) { - // if there is an expired operation, try to force complete it - val completedByMe: Boolean = curr synchronized { - curr.onExpiration() - curr.forceComplete() - } - if (completedByMe) - debug("Force complete expired delayed operation %s".format(curr)) - } - } - - /** - * Delete all satisfied events from the delay queue and the watcher lists - */ - private def purgeCompleted(): Int = { - var purged = 0 - - // purge the delayed queue - val iter = delayedQueue.iterator() - while (iter.hasNext) { - val curr = iter.next() - if (curr.isCompleted()) { - iter.remove() - purged += 1 - } - } - - purged - } + private[this] val tryPurge = (w: Watchers) => w.purge(100) override def doWork() { - // try to get the next expired operation and force completing it - expireNext() - // see if we need to purge the watch lists - if (DelayedOperationPurgatory.this.watched() >= purgeInterval) { - debug("Begin purging watch lists") - val purged = watchersForKey.values.map(_.purgeCompleted()).sum - debug("Purged %d elements from watch lists.".format(purged)) - } - // see if we need to purge the delayed operation queue - if (delayed() >= purgeInterval) { - debug("Begin purging delayed queue") - val purged = purgeCompleted() - debug("Purged %d operations from delayed queue.".format(purged)) - } + timeoutTimer.advanceClock(200L) + watchersForKey.values.foreach(tryPurge) } } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b06f00b..0a538f0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -107,10 +107,9 @@ class ReplicaManager(val config: KafkaConfig, val stateChangeLogger = KafkaController.stateChangeLogger val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( - purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests) + purgatoryName = "Produce", config.brokerId) val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( - purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) - + purgatoryName = "Fetch", config.brokerId) newGauge( "LeaderCount", diff --git a/core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala b/core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala new file mode 100644 index 0000000..91ae190 --- /dev/null +++ b/core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.lang.Iterable +import java.lang.ref.{ReferenceQueue, WeakReference} +import java.util.Iterator + +class SinglyLinkedWeakList[T](queue: ReferenceQueue[T]) extends Iterable[T] { + + private[this] class Entry(element: T, var nextEntry: Entry) extends WeakReference[T](element, queue) + + private[this] var list: Entry = null + private[this] var elementCount: Int = 0 + + def add(element: T): Unit = { + list = new Entry(element, list) + elementCount += 1 + } + + def size: Int = elementCount + + def iterator(): Iterator[T] = { + + new Iterator[T] { + private[this] var currEntry: Entry = null + private[this] var prevEntry: Entry = null + private[this] var nextEntry: Entry = list + + def hasNext(): Boolean = (nextEntry != null) + + def next(): T = { + if (nextEntry == null) throw new NoSuchElementException() + prevEntry = currEntry + currEntry = nextEntry + nextEntry = currEntry.nextEntry + currEntry.get() + } + + def remove(): Unit = { + if (currEntry == null) throw new IllegalStateException() + if (prevEntry == null) + list = nextEntry + else + prevEntry.nextEntry = nextEntry + elementCount -= 1 + } + } + } +} + diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala new file mode 100644 index 0000000..17244ee --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import java.util.concurrent.{DelayQueue, ExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.utils.threadsafe + +@threadsafe +class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = System.currentTimeMillis) { + + private[this] val delayQueue = new DelayQueue[TimerTaskList]() + private[this] val taskCounter = new AtomicInteger(0) + private[this] val timingWheel = new TimingWheel( + tickMs = tickMs, + wheelSize = wheelSize, + startMs = startMs, + taskCounter = taskCounter, + delayQueue + ) + + // Locks used to protect data structures while ticking + private[this] val readWriteLock = new ReentrantReadWriteLock() + private[this] val readLock = readWriteLock.readLock() + private[this] val writeLock = readWriteLock.writeLock() + + def add(timerTask: TimerTask): Unit = { + readLock.lock() + try { + addTimerTaskEntry(new TimerTaskEntry(timerTask)) + } finally { + readLock.unlock() + } + } + + private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { + if (!timingWheel.add(timerTaskEntry)) { + // already expired + taskExecutor.submit(timerTaskEntry.timerTask) + } + } + + private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) + + def advanceClock(timeoutMs: Long): Boolean = { + var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) + if (bucket != null) { + writeLock.lock() + try { + while (bucket != null) { + timingWheel.advanceClock(bucket.getExpiration()) + bucket.flush(reinsert) + bucket = delayQueue.poll() + } + } finally { + writeLock.unlock() + } + true + } else { + false + } + } + + def size(): Int = taskCounter.get +} + diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala new file mode 100644 index 0000000..0c528b4 --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +trait TimerTask extends Runnable { + + val expirationMs: Long // timestamp in millisecond + + private[this] var timerTaskEntry: TimerTaskEntry = null + + def cancel(): Unit = { + synchronized { + if (timerTaskEntry != null) timerTaskEntry.remove() + timerTaskEntry = null + } + } + + private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = { + synchronized { + if (timerTaskEntry != null && timerTaskEntry != entry) { + timerTaskEntry.remove() + } + timerTaskEntry = entry + } + } + +} diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala new file mode 100644 index 0000000..0ecc611 --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import java.util.concurrent.{TimeUnit, Delayed} +import java.util.concurrent.atomic.{AtomicLong, AtomicInteger} + +import kafka.utils.{SystemTime, threadsafe} + +import scala.math._ + +@threadsafe +private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { + + // TimerTaskList forms a doubly linked cyclic list using a dummy root entry + // root.next points to the head + // root.prev points to the tail + private[this] val root = new TimerTaskEntry(null) + root.next = root + root.prev = root + + private[this] val expiration = new AtomicLong(-1L) + + // Set the bucket's expiration time + // Returns true if the expiration time is changed + def setExpiration(expirationMs: Long): Boolean = { + expiration.getAndSet(expirationMs) != expirationMs + } + + // Get the bucket's expiration time + def getExpiration(): Long = { + expiration.get() + } + + // Apply the supplied function to each of tasks in this list + def foreach(f: (TimerTask)=>Unit): Unit = { + synchronized { + var entry = root.next + while (entry ne root) { + val nextEntry = entry.next + f(entry.timerTask) + entry = nextEntry + } + } + } + + // Add a timer task entry to this list + def add(timerTaskEntry: TimerTaskEntry): Unit = { + synchronized { + // put the timer task entry to the end of the list. (root.prev points to the tail entry) + val tail = root.prev + timerTaskEntry.next = root + timerTaskEntry.prev = tail + timerTaskEntry.list = this + tail.next = timerTaskEntry + root.prev = timerTaskEntry + taskCounter.incrementAndGet() + timerTaskEntry.timerTask.setTimerTaskEntry(timerTaskEntry) + } + } + + // Remove the specified timer task entry from this list + def remove(timerTaskEntry: TimerTaskEntry): Unit = { + synchronized { + if (timerTaskEntry.list != null) { + timerTaskEntry.next.prev = timerTaskEntry.prev + timerTaskEntry.prev.next = timerTaskEntry.next + timerTaskEntry.next = null + timerTaskEntry.prev = null + timerTaskEntry.list = null + taskCounter.decrementAndGet() + } + } + } + + // Remove all task entries and apply the supplied function to each of them + def flush(f: (TimerTaskEntry)=>Unit): Unit = { + synchronized { + var head = root.next + while (head ne root) { + remove(head) + f(head) + head = root.next + } + expiration.set(-1L) + } + } + + def getDelay(unit: TimeUnit): Long = { + unit.convert(max(getExpiration - SystemTime.milliseconds, 0), TimeUnit.MILLISECONDS) + } + + def compareTo(d: Delayed): Int = { + + val other = d.asInstanceOf[TimerTaskList] + + if(getExpiration < other.getExpiration) -1 + else if(getExpiration > other.getExpiration) 1 + else 0 + } + +} + +private[timer] class TimerTaskEntry(val timerTask: TimerTask) { + + var list: TimerTaskList = null + var next: TimerTaskEntry = null + var prev: TimerTaskEntry = null + + def remove(): Unit = { + if (list != null) list.remove(this) + } + +} + diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala new file mode 100644 index 0000000..94579ac --- /dev/null +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import kafka.utils.nonthreadsafe + +import java.util.concurrent.DelayQueue +import java.util.concurrent.atomic.AtomicInteger + +@nonthreadsafe +private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { + + private[this] val interval = tickMs * wheelSize + private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } + + @volatile + private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickSizeMs + private[this] var overflowWheel: TimingWheel = null + + private[this] def addOverflowWheel(): Unit = { + synchronized { + if (overflowWheel == null) { + overflowWheel = new TimingWheel( + tickMs = interval, + wheelSize = wheelSize, + startMs = currentTime, + taskCounter = taskCounter, + queue + ) + } + } + } + + def add(timerTaskEntry: TimerTaskEntry): Boolean = { + val expiration = timerTaskEntry.timerTask.expirationMs + + if (expiration < currentTime + tickMs) { + // Already expired + false + } else if (expiration < currentTime + interval) { + // Put it in an own bucket + val virtualId = expiration / tickMs + val bucket = buckets((virtualId % wheelSize.toLong).toInt) + bucket.add(timerTaskEntry) + + // Set the bucket expiration time + if (bucket.setExpiration(virtualId * tickMs)) { + // The bucket needs to be enqueued because it was an expired bucket + queue.offer(bucket) + } + true + } else { + // Out of the interval. Put it into the parent timer + if (overflowWheel == null) addOverflowWheel() + overflowWheel.add(timerTaskEntry) + } + } + + // Try to advance the clock + def advanceClock(timeMs: Long): Unit = { + if (timeMs >= currentTime + tickMs) { + currentTime = timeMs - (timeMs % tickMs) + + // Try to advance the clock of the overflow wheel if present + if (overflowWheel != null) overflowWheel.advanceClock(currentTime) + } + } +} diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 7a37617..77116dc 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -20,17 +20,16 @@ package kafka.server import org.junit.Test import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ -import kafka.utils.TestUtils class DelayedOperationTest extends JUnit3Suite { var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null - + override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0, 5) + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0) } - + override def tearDown() { purgatory.shutdown() super.tearDown() @@ -72,32 +71,34 @@ class DelayedOperationTest extends JUnit3Suite { def testRequestPurge() { val r1 = new MockDelayedOperation(100000L) val r2 = new MockDelayedOperation(100000L) + val r3 = new MockDelayedOperation(100000L) purgatory.tryCompleteElseWatch(r1, Array("test1")) purgatory.tryCompleteElseWatch(r2, Array("test1", "test2")) - purgatory.tryCompleteElseWatch(r1, Array("test2", "test3")) + purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) - assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) + assertEquals("Purgatory should have 5 watched elements", 6, purgatory.watched()) - // complete one of the operations, it should - // eventually be purged from the watch list with purge interval 5 + // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - TestUtils.waitUntilTrue(() => purgatory.watched() == 3, - "Purgatory should have 3 watched elements instead of " + purgatory.watched(), 1000L) - TestUtils.waitUntilTrue(() => purgatory.delayed() == 3, - "Purgatory should still have 3 total delayed operations instead of " + purgatory.delayed(), 1000L) + assertEquals("Purgatory should have 3 total delayed operations instead of ", 2, purgatory.delayed()) - // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5 - purgatory.tryCompleteElseWatch(r1, Array("test1")) - purgatory.tryCompleteElseWatch(r1, Array("test1")) + r3.completable = true + r3.tryComplete() + assertEquals("Purgatory should have 2 total delayed operations instead of ", 1, purgatory.delayed()) - TestUtils.waitUntilTrue(() => purgatory.watched() == 5, - "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L) - TestUtils.waitUntilTrue(() => purgatory.delayed() == 4, - "Purgatory should have 4 total delayed operations instead of " + purgatory.delayed(), 1000L) + // checking a watch should purge the watch list + purgatory.checkAndComplete("test1") + assertEquals("Purgatory should have 4 watched elements instead of ", 4, purgatory.watched()) + + purgatory.checkAndComplete("test2") + assertEquals("Purgatory should have 2 watched elements instead of ", 2, purgatory.watched()) + + purgatory.checkAndComplete("test3") + assertEquals("Purgatory should have 1 watched elements instead of ", 1, purgatory.watched()) } - + class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { var completable = false @@ -124,5 +125,5 @@ class DelayedOperationTest extends JUnit3Suite { } } } - -} \ No newline at end of file + +} diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala new file mode 100644 index 0000000..05a0165 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import junit.framework.Assert._ +import java.util.concurrent.atomic._ +import org.junit.{Test, After, Before} + +class TimerTaskListTest { + + private class TestTask(val expirationMs: Long) extends TimerTask { + def run(): Unit = { } + } + + private def size(list: TimerTaskList): Int = { + var count = 0 + list.foreach(_ => count += 1) + count + } + + @Test + def testAll() { + val sharedCounter = new AtomicInteger(0) + val runCounter = new AtomicInteger(0) + val execCounter = new AtomicInteger(0) + val list1 = new TimerTaskList(sharedCounter) + val list2 = new TimerTaskList(sharedCounter) + val list3 = new TimerTaskList(sharedCounter) + + val tasks = (1 to 10).map { i => + val task = new TestTask(10L) + val prevCount = sharedCounter.get + list1.add(new TimerTaskEntry(task)) + assertEquals(prevCount + 1, sharedCounter.get) + assertEquals(i, sharedCounter.get) + task + }.toSeq + + assertEquals(tasks.size, sharedCounter.get) + + tasks.take(4).foreach { task => + val prevCount = sharedCounter.get + list2.add(new TimerTaskEntry(task)) + assertEquals(prevCount, sharedCounter.get) + } + assertEquals(10 - 4, size(list1)) + assertEquals(4, size(list2)) + + assertEquals(tasks.size, sharedCounter.get) + + tasks.drop(4).foreach { task => + val prevCount = sharedCounter.get + list3.add(new TimerTaskEntry(task)) + assertEquals(prevCount, sharedCounter.get) + } + assertEquals(0, size(list1)) + assertEquals(4, size(list2)) + assertEquals(6, size(list3)) + + assertEquals(tasks.size, sharedCounter.get) + + // cancel tasks in lists + list1.foreach { _.cancel() } + assertEquals(0, size(list1)) + assertEquals(4, size(list2)) + assertEquals(6, size(list3)) + + list2.foreach { _.cancel() } + assertEquals(0, size(list1)) + assertEquals(0, size(list2)) + assertEquals(6, size(list3)) + + list3.foreach { _.cancel() } + assertEquals(0, size(list1)) + assertEquals(0, size(list2)) + assertEquals(0, size(list3)) + } + +} diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala new file mode 100644 index 0000000..5524cf6 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, TimeUnit} + +import junit.framework.Assert._ +import java.util.concurrent.atomic._ +import org.junit.{Test, After, Before} + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +class TimerTest { + + private class TestTask(val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask { + private[this] val completed = new AtomicBoolean(false) + def run(): Unit = { + if (completed.compareAndSet(false, true)) { + output.synchronized { output += id } + latch.countDown() + } + } + } + + private[this] var executor: ExecutorService = null + + @Before + def setup() { + executor = Executors.newSingleThreadExecutor() + } + + @After + def teardown(): Unit = { + executor.shutdown() + executor = null + } + + @Test + def testAlreadyExpiredTask(): Unit = { + val startTime = System.currentTimeMillis() + val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime) + val output = new ArrayBuffer[Int]() + + + val latches = (-5 until 0).map { i => + val latch = new CountDownLatch(1) + timer.add(new TestTask(startTime + i, i, latch, output)) + latch + } + + latches.take(5).foreach { latch => + assertEquals("already expired tasks should run immediately", true, latch.await(3, TimeUnit.SECONDS)) + } + + assertEquals("output of already expired tasks", Set(-5, -4, -3, -2, -1), output.toSet) + } + + @Test + def testTaskExpiration(): Unit = { + val startTime = System.currentTimeMillis() + val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime) + val output = new ArrayBuffer[Int]() + + val tasks = new ArrayBuffer[TestTask]() + val ids = new ArrayBuffer[Int]() + + val latches = + (0 until 5).map { i => + val latch = new CountDownLatch(1) + tasks += new TestTask(startTime + i, i, latch, output) + ids += i + latch + } ++ (10 until 100).map { i => + val latch = new CountDownLatch(2) + tasks += new TestTask(startTime + i, i, latch, output) + tasks += new TestTask(startTime + i, i, latch, output) + ids += i + ids += i + latch + } ++ (100 until 500).map { i => + val latch = new CountDownLatch(1) + tasks += new TestTask(startTime + i, i, latch, output) + ids += i + latch + } + + // randomly submit requests + Random.shuffle(tasks.toSeq).map { task => timer.add(task) } + + while (timer.advanceClock(1000)) {} + + latches.foreach { latch => latch.await() } + + assertEquals("output should match", ids.sorted, output.toSeq) + } +} -- 2.3.0 From 14781af861e24e29c8484e1e7b58bf9d0dcd9680 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 18 Mar 2015 20:28:59 -0700 Subject: [PATCH 2/8] review comment, and fix deadlock --- core/src/main/scala/kafka/server/DelayedOperation.scala | 5 +++-- core/src/main/scala/kafka/utils/timer/Timer.scala | 4 +++- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 1 - core/src/main/scala/kafka/utils/timer/TimingWheel.scala | 4 ++++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2f269ec..ec90348 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -98,9 +98,10 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { def tryComplete(): Boolean /* - * A task that runs on timeout + * run() method defines a task that is executed on timeout */ - def run(): Unit = { + override def run(): Unit = { + onExpiration() forceComplete() } } diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index 17244ee..ef0e99d 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -43,7 +43,9 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 def add(timerTask: TimerTask): Unit = { readLock.lock() try { - addTimerTaskEntry(new TimerTaskEntry(timerTask)) + val timerTaskEntry = new TimerTaskEntry(timerTask) + timerTask.setTimerTaskEntry(timerTaskEntry) + addTimerTaskEntry(timerTaskEntry) } finally { readLock.unlock() } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index 0ecc611..f06ffeb 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -69,7 +69,6 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { tail.next = timerTaskEntry root.prev = timerTaskEntry taskCounter.incrementAndGet() - timerTaskEntry.timerTask.setTimerTaskEntry(timerTaskEntry) } } diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index 94579ac..233270c 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -60,6 +60,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta // Set the bucket expiration time if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket + // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced + // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle + // will pass in the same value and hence return false, thus the bucket with the same expiration will not + // be enqueued multiple times. queue.offer(bucket) } true -- 2.3.0 From 30acd26367f5c6ced008545588cfed0e10d6fe2d Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 19 Mar 2015 11:16:31 -0700 Subject: [PATCH 3/8] ditched WeakRef --- .../main/scala/kafka/server/DelayedOperation.scala | 66 ++++++++++++++-------- .../scala/kafka/utils/SinglyLinkedWeakList.scala | 66 ---------------------- core/src/main/scala/kafka/utils/timer/Timer.scala | 4 +- .../scala/kafka/utils/timer/TimerTaskList.scala | 2 + 4 files changed, 45 insertions(+), 93 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index ec90348..a16d9f9 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.utils.timer._ import kafka.metrics.KafkaMetricsGroup -import java.lang.ref.ReferenceQueue +import java.util.LinkedList import java.util.concurrent._ import java.util.concurrent.atomic._ @@ -49,6 +49,10 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) + // shared counters to keep truck of completed operations for watchers. we increment them when the operation is completed + // so that the watchers know how many completed requests are in their list. + private val completionCounters = new LinkedList[AtomicInteger]() + /* * Force completing the delayed operation, if not already completed. * This function can be triggered when @@ -66,6 +70,7 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { // cancel the timeout timer cancel() onComplete() + incrementCompletionCounters() true } else { false @@ -98,6 +103,30 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { def tryComplete(): Boolean /* + * Add a watcher's completion counter. + * Returns false is the operation is already completed + */ + def addCompletionCounter(counter: AtomicInteger): Boolean = { + completionCounters synchronized { + if (!isCompleted()) { + completionCounters.add(counter) + true + } else { + false + } + } + } + + private def incrementCompletionCounters(): Unit = { + completionCounters synchronized { + val iter = completionCounters.iterator() + while (iter.hasNext()) { + iter.next().incrementAndGet() + } + } + } + + /* * run() method defines a task that is executed on timeout */ override def run(): Unit = { @@ -173,10 +202,13 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br return true for(key <- watchKeys) { - // If the operation is already completed, stop adding it to the rest of the watcher list. - if (operation.isCompleted()) - return false val watchers = watchersFor(key) + + // addCompletionCounter returns false if the operation is already completed + // in that case, stop adding it to the rest of the watcher list. + if (!operation.addCompletionCounter(watchers.completedCount)) + return true + watchers.watch(operation) } @@ -240,11 +272,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ private class Watchers { - private[this] val refQueue = new ReferenceQueue[T]() + val completedCount = new AtomicInteger(0) - private[this] val unreachable = new AtomicInteger(0) - - private[this] val operations = new SinglyLinkedWeakList[T](refQueue) + private[this] val operations = new LinkedList[T]() def watched(): Int = operations synchronized operations.size @@ -255,14 +285,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // traverse the list and try to complete some watched elements def tryCompleteWatched(): Int = { - refQueue synchronized { - while (refQueue.poll() != null) {} - } operations synchronized { - // the reference queue is drained. we can set the unreachable count to zero - unreachable.set(0) - var completed = 0 val iter = operations.iterator() while (iter.hasNext()) { @@ -270,9 +294,11 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br if (curr == null || curr.isCompleted()) { // another thread has completed this operation, just remove it iter.remove() + completedCount.decrementAndGet() } else if (curr synchronized curr.tryComplete()) { completed += 1 iter.remove() + completedCount.decrementAndGet() } else { } } @@ -281,22 +307,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } def purge(threshold: Int): Unit = { - refQueue.synchronized { - while (refQueue.poll() != null) { - unreachable.incrementAndGet() - } - } - - if (unreachable.get > threshold) { + if (completedCount.get > threshold) { operations synchronized { - // the reference queue is drained. we can set the unreachable count to zero - unreachable.set(0) - val iter = operations.iterator() while (iter.hasNext()) { val curr = iter.next() if (curr == null || curr.isCompleted()) { iter.remove() + completedCount.decrementAndGet() } } } diff --git a/core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala b/core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala deleted file mode 100644 index 91ae190..0000000 --- a/core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.lang.Iterable -import java.lang.ref.{ReferenceQueue, WeakReference} -import java.util.Iterator - -class SinglyLinkedWeakList[T](queue: ReferenceQueue[T]) extends Iterable[T] { - - private[this] class Entry(element: T, var nextEntry: Entry) extends WeakReference[T](element, queue) - - private[this] var list: Entry = null - private[this] var elementCount: Int = 0 - - def add(element: T): Unit = { - list = new Entry(element, list) - elementCount += 1 - } - - def size: Int = elementCount - - def iterator(): Iterator[T] = { - - new Iterator[T] { - private[this] var currEntry: Entry = null - private[this] var prevEntry: Entry = null - private[this] var nextEntry: Entry = list - - def hasNext(): Boolean = (nextEntry != null) - - def next(): T = { - if (nextEntry == null) throw new NoSuchElementException() - prevEntry = currEntry - currEntry = nextEntry - nextEntry = currEntry.nextEntry - currEntry.get() - } - - def remove(): Unit = { - if (currEntry == null) throw new IllegalStateException() - if (prevEntry == null) - list = nextEntry - else - prevEntry.nextEntry = nextEntry - elementCount -= 1 - } - } - } -} - diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index ef0e99d..17244ee 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -43,9 +43,7 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 def add(timerTask: TimerTask): Unit = { readLock.lock() try { - val timerTaskEntry = new TimerTaskEntry(timerTask) - timerTask.setTimerTaskEntry(timerTaskEntry) - addTimerTaskEntry(timerTaskEntry) + addTimerTaskEntry(new TimerTaskEntry(timerTask)) } finally { readLock.unlock() } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index f06ffeb..cc28781 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -120,6 +120,8 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { var next: TimerTaskEntry = null var prev: TimerTaskEntry = null + if (timerTask != null) timerTask.setTimerTaskEntry(this) + def remove(): Unit = { if (list != null) list.remove(this) } -- 2.3.0 From 80e9032c9b618b1a696ef9db08d6cfc908655520 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 19 Mar 2015 16:16:31 -0700 Subject: [PATCH 4/8] better purge condition --- .../main/scala/kafka/server/DelayedOperation.scala | 87 ++++++++-------------- .../main/scala/kafka/server/ReplicaManager.scala | 4 +- .../unit/kafka/server/DelayedOperationTest.scala | 2 +- 3 files changed, 35 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index a16d9f9..7ed712b 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -49,10 +49,6 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) - // shared counters to keep truck of completed operations for watchers. we increment them when the operation is completed - // so that the watchers know how many completed requests are in their list. - private val completionCounters = new LinkedList[AtomicInteger]() - /* * Force completing the delayed operation, if not already completed. * This function can be triggered when @@ -70,7 +66,6 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { // cancel the timeout timer cancel() onComplete() - incrementCompletionCounters() true } else { false @@ -103,30 +98,6 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { def tryComplete(): Boolean /* - * Add a watcher's completion counter. - * Returns false is the operation is already completed - */ - def addCompletionCounter(counter: AtomicInteger): Boolean = { - completionCounters synchronized { - if (!isCompleted()) { - completionCounters.add(counter) - true - } else { - false - } - } - } - - private def incrementCompletionCounters(): Unit = { - completionCounters synchronized { - val iter = completionCounters.iterator() - while (iter.hasNext()) { - iter.next().incrementAndGet() - } - } - } - - /* * run() method defines a task that is executed on timeout */ override def run(): Unit = { @@ -138,7 +109,7 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { /** * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ -class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0) +class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { // timeout timer @@ -148,6 +119,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /* a list of operation watching keys */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) + // the number of estimated total operations in the purgatory + private[this] val estimatedTotalOperations = new AtomicInteger(0) + /* background thread expiring operations that have timed out */ private val expirationReaper = new ExpiredOperationReaper(timeoutTimer) @@ -201,15 +175,18 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br if (isCompletedByMe) return true + var watchCreated = false for(key <- watchKeys) { + // If the operation is already completed, stop adding it to the rest of the watcher list. + if (operation.isCompleted()) + return false val watchers = watchersFor(key) - - // addCompletionCounter returns false if the operation is already completed - // in that case, stop adding it to the rest of the watcher list. - if (!operation.addCompletionCounter(watchers.completedCount)) - return true - watchers.watch(operation) + + if (!watchCreated) { + watchCreated = true + estimatedTotalOperations.incrementAndGet() + } } isCompletedByMe = operation synchronized operation.tryComplete() @@ -272,8 +249,6 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br */ private class Watchers { - val completedCount = new AtomicInteger(0) - private[this] val operations = new LinkedList[T]() def watched(): Int = operations synchronized operations.size @@ -291,31 +266,25 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext()) { val curr = iter.next() - if (curr == null || curr.isCompleted()) { + if (curr.isCompleted()) { // another thread has completed this operation, just remove it iter.remove() - completedCount.decrementAndGet() } else if (curr synchronized curr.tryComplete()) { completed += 1 iter.remove() - completedCount.decrementAndGet() - } else { } } completed } } - def purge(threshold: Int): Unit = { - if (completedCount.get > threshold) { - operations synchronized { - val iter = operations.iterator() - while (iter.hasNext()) { - val curr = iter.next() - if (curr == null || curr.isCompleted()) { - iter.remove() - completedCount.decrementAndGet() - } + def purgeCompleted(): Unit = { + operations synchronized { + val iter = operations.iterator() + while (iter.hasNext()) { + val curr = iter.next() + if (curr == null || curr.isCompleted()) { + iter.remove() } } } @@ -329,11 +298,19 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br "ExpirationReaper-%d".format(brokerId), false) { - private[this] val tryPurge = (w: Watchers) => w.purge(100) - override def doWork() { timeoutTimer.advanceClock(200L) - watchersForKey.values.foreach(tryPurge) + + // If the estimated total number of operations is more than purgeInterval greater than + // the number of pending operations, we clean up the watchers. The difference of the two numbers + // is the estimated number of completed operations still in the watchers. + if (estimatedTotalOperations.get - delayed > purgeInterval) { + // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to + // clean up watchers. Note that, if more operations are be completed during the clean up, we may end up with + // a little overestimated total number of operations. + estimatedTotalOperations.getAndSet(delayed) + watchersForKey.values.foreach(_.purgeCompleted()) + } } } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0a538f0..8ddd325 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -107,9 +107,9 @@ class ReplicaManager(val config: KafkaConfig, val stateChangeLogger = KafkaController.stateChangeLogger val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( - purgatoryName = "Produce", config.brokerId) + purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests) val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( - purgatoryName = "Fetch", config.brokerId) + purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) newGauge( "LeaderCount", diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index 77116dc..d186e89 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -27,7 +27,7 @@ class DelayedOperationTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0) + purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") } override def tearDown() { -- 2.3.0 From 3e78b8d4ae66d4591435c4b1b75b4689f13f298c Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 20 Mar 2015 08:42:09 -0700 Subject: [PATCH 5/8] purge count --- core/src/main/scala/kafka/server/DelayedOperation.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 7ed712b..505eac2 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -278,16 +278,20 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } } - def purgeCompleted(): Unit = { + // traverse the list and purge elements that are already completed by others + def purgeCompleted(): Int = { + var purged = 0 operations synchronized { val iter = operations.iterator() - while (iter.hasNext()) { + while (iter.hasNext) { val curr = iter.next() if (curr == null || curr.isCompleted()) { iter.remove() + purged += 1 } } } + purged } } @@ -309,7 +313,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br // clean up watchers. Note that, if more operations are be completed during the clean up, we may end up with // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) - watchersForKey.values.foreach(_.purgeCompleted()) + debug("Begin purging watch lists") + val purged = watchersForKey.values.map(_.purgeCompleted()).sum + debug("Purged %d elements from watch lists.".format(purged)) } } } -- 2.3.0 From b6a518323df62c99babc80ea1ab75f19c2ee8d43 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 1 Apr 2015 13:47:35 -0700 Subject: [PATCH 6/8] review comments --- .../main/scala/kafka/server/DelayedOperation.scala | 18 ++++++------ .../main/scala/kafka/utils/timer/TimingWheel.scala | 33 ++++++++++++++++++++-- .../unit/kafka/server/DelayedOperationTest.scala | 10 +++---- .../unit/kafka/utils/timer/TimerTaskListTest.scala | 4 +-- .../scala/unit/kafka/utils/timer/TimerTest.scala | 4 +-- 5 files changed, 49 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 505eac2..c39eaa0 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -123,7 +123,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br private[this] val estimatedTotalOperations = new AtomicInteger(0) /* background thread expiring operations that have timed out */ - private val expirationReaper = new ExpiredOperationReaper(timeoutTimer) + private val expirationReaper = new ExpiredOperationReaper() private val metricsTags = Map("delayedOperation" -> purgatoryName) @@ -264,9 +264,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br operations synchronized { var completed = 0 val iter = operations.iterator() - while (iter.hasNext()) { + while (iter.hasNext) { val curr = iter.next() - if (curr.isCompleted()) { + if (curr.isCompleted) { // another thread has completed this operation, just remove it iter.remove() } else if (curr synchronized curr.tryComplete()) { @@ -285,7 +285,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br val iter = operations.iterator() while (iter.hasNext) { val curr = iter.next() - if (curr == null || curr.isCompleted()) { + if (curr.isCompleted) { iter.remove() purged += 1 } @@ -298,19 +298,19 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br /** * A background reaper to expire delayed operations that have timed out */ - private class ExpiredOperationReaper(timeoutTimer: Timer) extends ShutdownableThread( + private class ExpiredOperationReaper extends ShutdownableThread( "ExpirationReaper-%d".format(brokerId), false) { override def doWork() { timeoutTimer.advanceClock(200L) - // If the estimated total number of operations is more than purgeInterval greater than - // the number of pending operations, we clean up the watchers. The difference of the two numbers - // is the estimated number of completed operations still in the watchers. + // Trigger a purge if the number of completed but still being watched operations is larger than + // the purge threshold. That number is computed by the difference btw the estimated total number of + // operations and the number of pending delayed operations. if (estimatedTotalOperations.get - delayed > purgeInterval) { // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to - // clean up watchers. Note that, if more operations are be completed during the clean up, we may end up with + // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with // a little overestimated total number of operations. estimatedTotalOperations.getAndSet(delayed) debug("Begin purging watch lists") diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index 233270c..288d928 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -21,6 +21,35 @@ import kafka.utils.nonthreadsafe import java.util.concurrent.DelayQueue import java.util.concurrent.atomic.AtomicInteger +/* + * Hierarchical Timing Wheels + * + * A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit. + * A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval. + * Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, + * the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), …, + * the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and + * moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task + * into the bucket for the current time since it is already expired. The timer immediately runs + * the expired task. The emptied bucket is then available for the next round, so if the current + * bucket is for the time t, it becomes the bucket for [t + u * n, t + (n + 1) * u) after a tick. + * A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue + * based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n) + * insert/delete cost. + * + * A major drawback of a simple timing wheel is that it assumes that a timer request is within + * the time interval of n * u from the current time. If a timer request is out of this interval, + * it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically + * organized timing wheels. The lowest level has the finest time resolution. As moving up the + * hierarchy, time resolutions become coarser. If the resolution of a wheel at one level is u and + * the size is n, the resolution of the next level should be n * u. At each level overflows are + * delegated to the wheel in one level higher. When the wheel in the higher level ticks, it reinsert + * timer tasks to the lower level. A overflow wheel can be created on-demand. When a bucket in a + * overflow bucket expires, all tasks in it are reinserted into the timer recursively. The tasks + * are then moved the finer grain wheels or be executed. The insert (start-timer) cost is O(m) + * where m is the number of wheels, which is usually very small compared to the number of requests + * in the system, and the delete (stop-timer) cost is still O(1). + */ @nonthreadsafe private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { @@ -28,7 +57,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } @volatile - private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickSizeMs + private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs private[this] var overflowWheel: TimingWheel = null private[this] def addOverflowWheel(): Unit = { @@ -52,7 +81,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta // Already expired false } else if (expiration < currentTime + interval) { - // Put it in an own bucket + // Put in its own bucket val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index d186e89..a0e4bcf 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -82,21 +82,21 @@ class DelayedOperationTest extends JUnit3Suite { // complete the operations, it should immediately be purged from the delayed operation r2.completable = true r2.tryComplete() - assertEquals("Purgatory should have 3 total delayed operations instead of ", 2, purgatory.delayed()) + assertEquals("Purgatory should have 2 total delayed operations instead of " + purgatory.delayed(), 2, purgatory.delayed()) r3.completable = true r3.tryComplete() - assertEquals("Purgatory should have 2 total delayed operations instead of ", 1, purgatory.delayed()) + assertEquals("Purgatory should have 1 total delayed operations instead of " + purgatory.delayed(), 1, purgatory.delayed()) // checking a watch should purge the watch list purgatory.checkAndComplete("test1") - assertEquals("Purgatory should have 4 watched elements instead of ", 4, purgatory.watched()) + assertEquals("Purgatory should have 4 watched elements instead of " + purgatory.watched(), 4, purgatory.watched()) purgatory.checkAndComplete("test2") - assertEquals("Purgatory should have 2 watched elements instead of ", 2, purgatory.watched()) + assertEquals("Purgatory should have 2 watched elements instead of " + purgatory.watched(), 2, purgatory.watched()) purgatory.checkAndComplete("test3") - assertEquals("Purgatory should have 1 watched elements instead of ", 1, purgatory.watched()) + assertEquals("Purgatory should have 1 watched elements instead of " + purgatory.watched(), 1, purgatory.watched()) } class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) { diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala index 05a0165..38f35e9 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala @@ -43,15 +43,14 @@ class TimerTaskListTest { val tasks = (1 to 10).map { i => val task = new TestTask(10L) - val prevCount = sharedCounter.get list1.add(new TimerTaskEntry(task)) - assertEquals(prevCount + 1, sharedCounter.get) assertEquals(i, sharedCounter.get) task }.toSeq assertEquals(tasks.size, sharedCounter.get) + // reinserting the existing tasks shouldn't change the task count tasks.take(4).foreach { task => val prevCount = sharedCounter.get list2.add(new TimerTaskEntry(task)) @@ -62,6 +61,7 @@ class TimerTaskListTest { assertEquals(tasks.size, sharedCounter.get) + // reinserting the existing tasks shouldn't change the task count tasks.drop(4).foreach { task => val prevCount = sharedCounter.get list3.add(new TimerTaskEntry(task)) diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala index 5524cf6..8507592 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala @@ -27,7 +27,7 @@ import scala.util.Random class TimerTest { - private class TestTask(val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask { + private class TestTask(override val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask { private[this] val completed = new AtomicBoolean(false) def run(): Unit = { if (completed.compareAndSet(false, true)) { @@ -100,7 +100,7 @@ class TimerTest { } // randomly submit requests - Random.shuffle(tasks.toSeq).map { task => timer.add(task) } + Random.shuffle(tasks.toSeq).foreach { task => timer.add(task) } while (timer.advanceClock(1000)) {} -- 2.3.0 From 18ee72ed9aa1bb22e90dca25ff7241993f39f8e3 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Tue, 7 Apr 2015 14:46:14 -0700 Subject: [PATCH 7/8] review comments --- .../main/scala/kafka/server/DelayedOperation.scala | 12 +++++-- .../main/scala/kafka/utils/timer/TimerTask.scala | 2 ++ .../scala/kafka/utils/timer/TimerTaskList.scala | 2 ++ .../main/scala/kafka/utils/timer/TimingWheel.scala | 41 ++++++++++++++++++++-- .../unit/kafka/server/DelayedOperationTest.scala | 2 +- .../unit/kafka/utils/timer/TimerTaskListTest.scala | 2 ++ 6 files changed, 56 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index c39eaa0..534deb1 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -101,7 +101,12 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { * run() method defines a task that is executed on timeout */ override def run(): Unit = { - onExpiration() + try { + onExpiration() + } catch { + case e: Throwable => // log then ignore + error("exception in calling onExpiration:", e) + } forceComplete() } } @@ -113,7 +118,10 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br extends Logging with KafkaMetricsGroup { // timeout timer - private[this] val executor = Executors.newSingleThreadExecutor() + private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() { + def newThread(runnable: Runnable): Thread = + Utils.newThread("executor-"+purgatoryName, runnable, true) + }) private[this] val timeoutTimer = new Timer(executor) /* a list of operation watching keys */ diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index 0c528b4..3407138 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -31,6 +31,8 @@ trait TimerTask extends Runnable { private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = { synchronized { + // if this timerTask is already held by an existing timer task entry, + // we will remove such an entry first. if (timerTaskEntry != null && timerTaskEntry != entry) { timerTaskEntry.remove() } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index cc28781..e7a9657 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -120,6 +120,8 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { var next: TimerTaskEntry = null var prev: TimerTaskEntry = null + // if this timerTask is already held by an existing timer task entry, + // setTimerTaskEntry will remove it. if (timerTask != null) timerTask.setTimerTaskEntry(this) def remove(): Unit = { diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index 288d928..f763db1 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -44,11 +44,48 @@ import java.util.concurrent.atomic.AtomicInteger * hierarchy, time resolutions become coarser. If the resolution of a wheel at one level is u and * the size is n, the resolution of the next level should be n * u. At each level overflows are * delegated to the wheel in one level higher. When the wheel in the higher level ticks, it reinsert - * timer tasks to the lower level. A overflow wheel can be created on-demand. When a bucket in a + * timer tasks to the lower level. An overflow wheel can be created on-demand. When a bucket in an * overflow bucket expires, all tasks in it are reinserted into the timer recursively. The tasks - * are then moved the finer grain wheels or be executed. The insert (start-timer) cost is O(m) + * are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m) * where m is the number of wheels, which is usually very small compared to the number of requests * in the system, and the delete (stop-timer) cost is still O(1). + * + * Example + * Let's say that u is 1 and n is 3. If the start time is c, + * then the buckets at different levels are: + * + * level buckets + * 1 [c,c] [c+1,c+1] [c+2,c+2] + * 2 [c,c+1] [c+2,c+3] [c+4,c+5] + * 3 [c,c+3] [c+4,c+7] [c+8,c+11]\ + * + * [c,c], [c,c+1] and [c,c+3] are expired. + * + * At time = c+1, [c+3,c+3], [c+6,c+7] and [c+12,c+15] are created, + * + * 1 [c+1,c+1] [c+2,c+2] [c+3,c+3] + * 2 [c+2,c+3] [c+4,c+5] [c+6,c+7] + * 3 [c+4,c+7] [c+8,c+11] [c+12,c+15] + * + * and [c+1,c+1] is newly expired. + * + * At time = c+2, [c+4,c+4] is created, + * + * 1 [c+2,c+2] [c+3,c+3] [c+4,c+4] + * 2 [c+2,c+3] [c+4,c+5] [c+6,c+7] + * 3 [c+4,c+7] [c+8,c+11] [c+12,c+15] + * + * and [c+2,c+2] and [c+2,c+3] are newly expired. + * + * As you can see, buckets in different levels can overlap and expired in different timings. + * + * The hierarchical timing wheels works especially well when operations are completed before they time out. + * Even when everything times out, it still has advantageous when there are many items in the timer. + * Its insert cost (including reinsert) and delete cost are O(m) and O(1), respectively while priority + * queue based timers takes O(log N) for both insert and delete where N is the number of items in the queue. + * + * This class is not thread-safe. There should not be ant add calls while advanceClock is executing. + * It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. */ @nonthreadsafe private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index a0e4bcf..9186c90 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -77,7 +77,7 @@ class DelayedOperationTest extends JUnit3Suite { purgatory.tryCompleteElseWatch(r3, Array("test1", "test2", "test3")) assertEquals("Purgatory should have 3 total delayed operations", 3, purgatory.delayed()) - assertEquals("Purgatory should have 5 watched elements", 6, purgatory.watched()) + assertEquals("Purgatory should have 6 watched elements", 6, purgatory.watched()) // complete the operations, it should immediately be purged from the delayed operation r2.completable = true diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala index 38f35e9..052aecd 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala @@ -53,6 +53,7 @@ class TimerTaskListTest { // reinserting the existing tasks shouldn't change the task count tasks.take(4).foreach { task => val prevCount = sharedCounter.get + // new TimerTaskEntry(task) will remove the existing entry from the list list2.add(new TimerTaskEntry(task)) assertEquals(prevCount, sharedCounter.get) } @@ -64,6 +65,7 @@ class TimerTaskListTest { // reinserting the existing tasks shouldn't change the task count tasks.drop(4).foreach { task => val prevCount = sharedCounter.get + // new TimerTaskEntry(task) will remove the existing entry from the list list3.add(new TimerTaskEntry(task)) assertEquals(prevCount, sharedCounter.get) } -- 2.3.0 From 3502370dee846f6ba68d46f9e36f9abff2ba5963 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Tue, 7 Apr 2015 14:57:20 -0700 Subject: [PATCH 8/8] fix merge problem --- core/src/main/scala/kafka/server/DelayedOperation.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 534deb1..f7ed5c2 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -25,6 +25,8 @@ import java.util.LinkedList import java.util.concurrent._ import java.util.concurrent.atomic._ +import org.apache.kafka.common.utils.Utils + import scala.collection._ import com.yammer.metrics.core.Gauge -- 2.3.0