From b55015fb9f99e5c05b4e875aea2ba0f526003487 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 27 May 2015 13:57:45 -0700 Subject: [PATCH 1/3] handle a race condition in TimerTaskEntry.remove --- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index e7a9657..c5f2ecc 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -61,6 +61,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { + // remove the timer task entry if it is already in any other list + timerTaskEntry.remove() + // put the timer task entry to the end of the list. (root.prev points to the tail entry) val tail = root.prev timerTaskEntry.next = root @@ -75,7 +78,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Remove the specified timer task entry from this list def remove(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { - if (timerTaskEntry.list != null) { + if (timerTaskEntry.list eq this) { timerTaskEntry.next.prev = timerTaskEntry.prev timerTaskEntry.prev.next = timerTaskEntry.next timerTaskEntry.next = null @@ -116,6 +119,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { private[timer] class TimerTaskEntry(val timerTask: TimerTask) { + @volatile var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null @@ -125,7 +129,10 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { if (timerTask != null) timerTask.setTimerTaskEntry(this) def remove(): Unit = { - if (list != null) list.remove(this) + // try until success + while (list != null) { + list.remove(this) + } } } -- 2.3.0 From 2c43f1a610b74ccde396690d765a5e4fcf8d0ec0 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 28 May 2015 17:15:26 -0700 Subject: [PATCH 2/3] add cancel flag to timer entry, add sync on entry --- core/src/main/scala/kafka/utils/timer/Timer.scala | 4 +- .../main/scala/kafka/utils/timer/TimerTask.scala | 4 +- .../scala/kafka/utils/timer/TimerTaskList.scala | 72 ++++++++++++++-------- .../main/scala/kafka/utils/timer/TimingWheel.scala | 6 +- 4 files changed, 57 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index b8cde82..bb64954 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -52,7 +52,9 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { // already expired - taskExecutor.submit(timerTaskEntry.timerTask) + if (!timerTaskEntry.canceled) { + taskExecutor.submit(timerTaskEntry.timerTask) + } } } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index 3407138..8a3c169 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -24,7 +24,9 @@ trait TimerTask extends Runnable { def cancel(): Unit = { synchronized { - if (timerTaskEntry != null) timerTaskEntry.remove() + if (timerTaskEntry != null) { + timerTaskEntry.cancel() + } timerTaskEntry = null } } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index c5f2ecc..b3f555d 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -52,7 +52,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { var entry = root.next while (entry ne root) { val nextEntry = entry.next - f(entry.timerTask) + if (!entry.canceled) { + f(entry.timerTask) + } entry = nextEntry } } @@ -61,30 +63,37 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { - // remove the timer task entry if it is already in any other list - timerTaskEntry.remove() - - // put the timer task entry to the end of the list. (root.prev points to the tail entry) - val tail = root.prev - timerTaskEntry.next = root - timerTaskEntry.prev = tail - timerTaskEntry.list = this - tail.next = timerTaskEntry - root.prev = timerTaskEntry - taskCounter.incrementAndGet() + timerTaskEntry.synchronized { + // remove the timer task entry if it is already in any other list + timerTaskEntry.remove() + + // put the timer task entry to the end of the list. (root.prev points to the tail entry) + val tail = root.prev + timerTaskEntry.next = root + timerTaskEntry.prev = tail + timerTaskEntry.list = this + tail.next = timerTaskEntry + root.prev = timerTaskEntry + taskCounter.incrementAndGet() + } } } // Remove the specified timer task entry from this list - def remove(timerTaskEntry: TimerTaskEntry): Unit = { + def remove(timerTaskEntry: TimerTaskEntry): Boolean = { synchronized { - if (timerTaskEntry.list eq this) { - timerTaskEntry.next.prev = timerTaskEntry.prev - timerTaskEntry.prev.next = timerTaskEntry.next - timerTaskEntry.next = null - timerTaskEntry.prev = null - timerTaskEntry.list = null - taskCounter.decrementAndGet() + timerTaskEntry.synchronized { + if (timerTaskEntry.list eq this) { + timerTaskEntry.next.prev = timerTaskEntry.prev + timerTaskEntry.prev.next = timerTaskEntry.next + timerTaskEntry.next = null + timerTaskEntry.prev = null + timerTaskEntry.list = null + taskCounter.decrementAndGet() + true + } else { + false + } } } } @@ -94,8 +103,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { synchronized { var head = root.next while (head ne root) { - remove(head) - f(head) + if (remove(head)) { + f(head) + } head = root.next } expiration.set(-1L) @@ -123,15 +133,27 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null + @volatile + var canceled: Boolean = false // if this timerTask is already held by an existing timer task entry, // setTimerTaskEntry will remove it. if (timerTask != null) timerTask.setTimerTaskEntry(this) + def cancel(): Unit = { + canceled = true + remove() + } + def remove(): Unit = { - // try until success - while (list != null) { - list.remove(this) + var currentList = list + // If remove is called when another thread is moving the entry from a task entry list to another, + // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null. + // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later. + // A canceled task removed by calling cancel() to prevent it. + while (currentList != null) { + currentList.remove(this) + currentList = list } } diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index e92aba3..6dc96d5 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -125,8 +125,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.timerTask.expirationMs - if (expiration < currentTime + tickMs) { - // Already expired + if (timerTaskEntry.canceled) { + false + } else if (expiration < currentTime + tickMs) { + // Already expired or canceled false } else if (expiration < currentTime + interval) { // Put in its own bucket -- 2.3.0 From c7df88b5db8df0a8089f01d6d48b63827843bbed Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 29 May 2015 10:48:39 -0700 Subject: [PATCH 3/3] fix comments, revert remove() from boolean to unit --- core/src/main/scala/kafka/utils/timer/Timer.scala | 2 +- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 12 ++++-------- core/src/main/scala/kafka/utils/timer/TimingWheel.scala | 3 ++- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index bb64954..0bd3990 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -51,7 +51,7 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { - // already expired + // Already expired or canceled if (!timerTaskEntry.canceled) { taskExecutor.submit(timerTaskEntry.timerTask) } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index b3f555d..d120b28 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -80,7 +80,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } // Remove the specified timer task entry from this list - def remove(timerTaskEntry: TimerTaskEntry): Boolean = { + def remove(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { timerTaskEntry.synchronized { if (timerTaskEntry.list eq this) { @@ -90,9 +90,6 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { timerTaskEntry.prev = null timerTaskEntry.list = null taskCounter.decrementAndGet() - true - } else { - false } } } @@ -103,9 +100,8 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { synchronized { var head = root.next while (head ne root) { - if (remove(head)) { - f(head) - } + remove(head) + f(head) head = root.next } expiration.set(-1L) @@ -150,7 +146,7 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { // If remove is called when another thread is moving the entry from a task entry list to another, // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null. // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later. - // A canceled task removed by calling cancel() to prevent it. + // To cancel a task, it should be removed by calling cancel() to prevent it from reinsert. while (currentList != null) { currentList.remove(this) currentList = list diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index 6dc96d5..d8f9165 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -126,9 +126,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta val expiration = timerTaskEntry.timerTask.expirationMs if (timerTaskEntry.canceled) { + // Canceled false } else if (expiration < currentTime + tickMs) { - // Already expired or canceled + // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket -- 2.3.0