From b55015fb9f99e5c05b4e875aea2ba0f526003487 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 27 May 2015 13:57:45 -0700 Subject: [PATCH 1/6] handle a race condition in TimerTaskEntry.remove --- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index e7a9657..c5f2ecc 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -61,6 +61,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { + // remove the timer task entry if it is already in any other list + timerTaskEntry.remove() + // put the timer task entry to the end of the list. (root.prev points to the tail entry) val tail = root.prev timerTaskEntry.next = root @@ -75,7 +78,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Remove the specified timer task entry from this list def remove(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { - if (timerTaskEntry.list != null) { + if (timerTaskEntry.list eq this) { timerTaskEntry.next.prev = timerTaskEntry.prev timerTaskEntry.prev.next = timerTaskEntry.next timerTaskEntry.next = null @@ -116,6 +119,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { private[timer] class TimerTaskEntry(val timerTask: TimerTask) { + @volatile var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null @@ -125,7 +129,10 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { if (timerTask != null) timerTask.setTimerTaskEntry(this) def remove(): Unit = { - if (list != null) list.remove(this) + // try until success + while (list != null) { + list.remove(this) + } } } -- 2.3.0 From 2c43f1a610b74ccde396690d765a5e4fcf8d0ec0 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Thu, 28 May 2015 17:15:26 -0700 Subject: [PATCH 2/6] add cancel flag to timer entry, add sync on entry --- core/src/main/scala/kafka/utils/timer/Timer.scala | 4 +- .../main/scala/kafka/utils/timer/TimerTask.scala | 4 +- .../scala/kafka/utils/timer/TimerTaskList.scala | 72 ++++++++++++++-------- .../main/scala/kafka/utils/timer/TimingWheel.scala | 6 +- 4 files changed, 57 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index b8cde82..bb64954 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -52,7 +52,9 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { // already expired - taskExecutor.submit(timerTaskEntry.timerTask) + if (!timerTaskEntry.canceled) { + taskExecutor.submit(timerTaskEntry.timerTask) + } } } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index 3407138..8a3c169 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -24,7 +24,9 @@ trait TimerTask extends Runnable { def cancel(): Unit = { synchronized { - if (timerTaskEntry != null) timerTaskEntry.remove() + if (timerTaskEntry != null) { + timerTaskEntry.cancel() + } timerTaskEntry = null } } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index c5f2ecc..b3f555d 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -52,7 +52,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { var entry = root.next while (entry ne root) { val nextEntry = entry.next - f(entry.timerTask) + if (!entry.canceled) { + f(entry.timerTask) + } entry = nextEntry } } @@ -61,30 +63,37 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { - // remove the timer task entry if it is already in any other list - timerTaskEntry.remove() - - // put the timer task entry to the end of the list. (root.prev points to the tail entry) - val tail = root.prev - timerTaskEntry.next = root - timerTaskEntry.prev = tail - timerTaskEntry.list = this - tail.next = timerTaskEntry - root.prev = timerTaskEntry - taskCounter.incrementAndGet() + timerTaskEntry.synchronized { + // remove the timer task entry if it is already in any other list + timerTaskEntry.remove() + + // put the timer task entry to the end of the list. (root.prev points to the tail entry) + val tail = root.prev + timerTaskEntry.next = root + timerTaskEntry.prev = tail + timerTaskEntry.list = this + tail.next = timerTaskEntry + root.prev = timerTaskEntry + taskCounter.incrementAndGet() + } } } // Remove the specified timer task entry from this list - def remove(timerTaskEntry: TimerTaskEntry): Unit = { + def remove(timerTaskEntry: TimerTaskEntry): Boolean = { synchronized { - if (timerTaskEntry.list eq this) { - timerTaskEntry.next.prev = timerTaskEntry.prev - timerTaskEntry.prev.next = timerTaskEntry.next - timerTaskEntry.next = null - timerTaskEntry.prev = null - timerTaskEntry.list = null - taskCounter.decrementAndGet() + timerTaskEntry.synchronized { + if (timerTaskEntry.list eq this) { + timerTaskEntry.next.prev = timerTaskEntry.prev + timerTaskEntry.prev.next = timerTaskEntry.next + timerTaskEntry.next = null + timerTaskEntry.prev = null + timerTaskEntry.list = null + taskCounter.decrementAndGet() + true + } else { + false + } } } } @@ -94,8 +103,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { synchronized { var head = root.next while (head ne root) { - remove(head) - f(head) + if (remove(head)) { + f(head) + } head = root.next } expiration.set(-1L) @@ -123,15 +133,27 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null + @volatile + var canceled: Boolean = false // if this timerTask is already held by an existing timer task entry, // setTimerTaskEntry will remove it. if (timerTask != null) timerTask.setTimerTaskEntry(this) + def cancel(): Unit = { + canceled = true + remove() + } + def remove(): Unit = { - // try until success - while (list != null) { - list.remove(this) + var currentList = list + // If remove is called when another thread is moving the entry from a task entry list to another, + // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null. + // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later. + // A canceled task removed by calling cancel() to prevent it. + while (currentList != null) { + currentList.remove(this) + currentList = list } } diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index e92aba3..6dc96d5 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -125,8 +125,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.timerTask.expirationMs - if (expiration < currentTime + tickMs) { - // Already expired + if (timerTaskEntry.canceled) { + false + } else if (expiration < currentTime + tickMs) { + // Already expired or canceled false } else if (expiration < currentTime + interval) { // Put in its own bucket -- 2.3.0 From c7df88b5db8df0a8089f01d6d48b63827843bbed Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 29 May 2015 10:48:39 -0700 Subject: [PATCH 3/6] fix comments, revert remove() from boolean to unit --- core/src/main/scala/kafka/utils/timer/Timer.scala | 2 +- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 12 ++++-------- core/src/main/scala/kafka/utils/timer/TimingWheel.scala | 3 ++- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index bb64954..0bd3990 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -51,7 +51,7 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { - // already expired + // Already expired or canceled if (!timerTaskEntry.canceled) { taskExecutor.submit(timerTaskEntry.timerTask) } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index b3f555d..d120b28 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -80,7 +80,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } // Remove the specified timer task entry from this list - def remove(timerTaskEntry: TimerTaskEntry): Boolean = { + def remove(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { timerTaskEntry.synchronized { if (timerTaskEntry.list eq this) { @@ -90,9 +90,6 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { timerTaskEntry.prev = null timerTaskEntry.list = null taskCounter.decrementAndGet() - true - } else { - false } } } @@ -103,9 +100,8 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { synchronized { var head = root.next while (head ne root) { - if (remove(head)) { - f(head) - } + remove(head) + f(head) head = root.next } expiration.set(-1L) @@ -150,7 +146,7 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { // If remove is called when another thread is moving the entry from a task entry list to another, // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null. // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later. - // A canceled task removed by calling cancel() to prevent it. + // To cancel a task, it should be removed by calling cancel() to prevent it from reinsert. while (currentList != null) { currentList.remove(this) currentList = list diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index 6dc96d5..d8f9165 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -126,9 +126,10 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta val expiration = timerTaskEntry.timerTask.expirationMs if (timerTaskEntry.canceled) { + // Canceled false } else if (expiration < currentTime + tickMs) { - // Already expired or canceled + // Already expired false } else if (expiration < currentTime + interval) { // Put in its own bucket -- 2.3.0 From 5f3df2957e75e1755e580ed947d82c2f9c1c9e3c Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 29 May 2015 14:10:40 -0700 Subject: [PATCH 4/6] removed the cancel flag, check which entry is pointed by the task instead --- core/src/main/scala/kafka/utils/timer/Timer.scala | 5 +-- .../main/scala/kafka/utils/timer/TimerTask.scala | 13 +++--- .../scala/kafka/utils/timer/TimerTaskList.scala | 46 ++++++++++++---------- .../main/scala/kafka/utils/timer/TimingWheel.scala | 4 +- 4 files changed, 37 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index 0bd3990..bdd0e75 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -51,10 +51,9 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { - // Already expired or canceled - if (!timerTaskEntry.canceled) { + // Already expired or cancelled + if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) - } } } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index 8a3c169..edbb033 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -24,9 +24,6 @@ trait TimerTask extends Runnable { def cancel(): Unit = { synchronized { - if (timerTaskEntry != null) { - timerTaskEntry.cancel() - } timerTaskEntry = null } } @@ -35,11 +32,17 @@ trait TimerTask extends Runnable { synchronized { // if this timerTask is already held by an existing timer task entry, // we will remove such an entry first. - if (timerTaskEntry != null && timerTaskEntry != entry) { + if (timerTaskEntry != null && timerTaskEntry != entry) timerTaskEntry.remove() - } + timerTaskEntry = entry } } + private[timer] def isHeldBy(entry: TimerTaskEntry): Boolean = { + synchronized { + entry == timerTaskEntry + } + } + } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index d120b28..1393ec5 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -52,9 +52,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { var entry = root.next while (entry ne root) { val nextEntry = entry.next - if (!entry.canceled) { - f(entry.timerTask) - } + + if (!entry.cancelled) f(entry.timerTask) + entry = nextEntry } } @@ -62,19 +62,26 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { - synchronized { - timerTaskEntry.synchronized { - // remove the timer task entry if it is already in any other list - timerTaskEntry.remove() - - // put the timer task entry to the end of the list. (root.prev points to the tail entry) - val tail = root.prev - timerTaskEntry.next = root - timerTaskEntry.prev = tail - timerTaskEntry.list = this - tail.next = timerTaskEntry - root.prev = timerTaskEntry - taskCounter.incrementAndGet() + var done = false + while (!done) { + // remove the timer task entry if it is already in any other list + // we do this outside of the sync block below to avoid deadlocking. + timerTaskEntry.remove() + + synchronized { + timerTaskEntry.synchronized { + if (timerTaskEntry.list == null) { + // put the timer task entry to the end of the list. (root.prev points to the tail entry) + val tail = root.prev + timerTaskEntry.next = root + timerTaskEntry.prev = tail + timerTaskEntry.list = this + tail.next = timerTaskEntry + root.prev = timerTaskEntry + taskCounter.incrementAndGet() + done = true + } + } } } } @@ -129,16 +136,13 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null - @volatile - var canceled: Boolean = false // if this timerTask is already held by an existing timer task entry, // setTimerTaskEntry will remove it. if (timerTask != null) timerTask.setTimerTaskEntry(this) - def cancel(): Unit = { - canceled = true - remove() + def cancelled: Boolean = { + !timerTask.isHeldBy(this) } def remove(): Unit = { diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index d8f9165..f5b6efe 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -125,8 +125,8 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.timerTask.expirationMs - if (timerTaskEntry.canceled) { - // Canceled + if (timerTaskEntry.cancelled) { + // Cancelled false } else if (expiration < currentTime + tickMs) { // Already expired -- 2.3.0 From 3148f764e26c69ee4ff474db1b129325c0420e28 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 29 May 2015 15:02:16 -0700 Subject: [PATCH 5/6] removed the cancel flag, check which entry is pointed by the task instead --- core/src/main/scala/kafka/utils/timer/TimerTask.scala | 7 +++---- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 7 ++++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index edbb033..d6b3a2e 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -24,6 +24,7 @@ trait TimerTask extends Runnable { def cancel(): Unit = { synchronized { + if (timerTaskEntry != null) timerTaskEntry.remove() timerTaskEntry = null } } @@ -39,10 +40,8 @@ trait TimerTask extends Runnable { } } - private[timer] def isHeldBy(entry: TimerTaskEntry): Boolean = { - synchronized { - entry == timerTaskEntry - } + private[timer] def getTimerTaskEntry(): TimerTaskEntry = { + timerTaskEntry } } diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index 1393ec5..9b5ac26 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -64,8 +64,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { def add(timerTaskEntry: TimerTaskEntry): Unit = { var done = false while (!done) { - // remove the timer task entry if it is already in any other list - // we do this outside of the sync block below to avoid deadlocking. + // Remove the timer task entry if it is already in any other list + // We do this outside of the sync block below to avoid deadlocking. + // We may retry until timerTaskEntry.list becomes null. timerTaskEntry.remove() synchronized { @@ -142,7 +143,7 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { if (timerTask != null) timerTask.setTimerTaskEntry(this) def cancelled: Boolean = { - !timerTask.isHeldBy(this) + timerTask.getTimerTaskEntry != this } def remove(): Unit = { -- 2.3.0 From 747498c1882af243246d86d2325b40f47440a3ae Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Fri, 29 May 2015 15:10:09 -0700 Subject: [PATCH 6/6] removed the cancel flag, check which entry is pointed by the task instead --- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index 9b5ac26..c4aeb5d 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -151,7 +151,6 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { // If remove is called when another thread is moving the entry from a task entry list to another, // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null. // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later. - // To cancel a task, it should be removed by calling cancel() to prevent it from reinsert. while (currentList != null) { currentList.remove(this) currentList = list -- 2.3.0