From b55015fb9f99e5c05b4e875aea2ba0f526003487 Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 27 May 2015 13:57:45 -0700 Subject: [PATCH] handle a race condition in TimerTaskEntry.remove --- core/src/main/scala/kafka/utils/timer/TimerTaskList.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index e7a9657..c5f2ecc 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -61,6 +61,9 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Add a timer task entry to this list def add(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { + // remove the timer task entry if it is already in any other list + timerTaskEntry.remove() + // put the timer task entry to the end of the list. (root.prev points to the tail entry) val tail = root.prev timerTaskEntry.next = root @@ -75,7 +78,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // Remove the specified timer task entry from this list def remove(timerTaskEntry: TimerTaskEntry): Unit = { synchronized { - if (timerTaskEntry.list != null) { + if (timerTaskEntry.list eq this) { timerTaskEntry.next.prev = timerTaskEntry.prev timerTaskEntry.prev.next = timerTaskEntry.next timerTaskEntry.next = null @@ -116,6 +119,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { private[timer] class TimerTaskEntry(val timerTask: TimerTask) { + @volatile var list: TimerTaskList = null var next: TimerTaskEntry = null var prev: TimerTaskEntry = null @@ -125,7 +129,10 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { if (timerTask != null) timerTask.setTimerTaskEntry(this) def remove(): Unit = { - if (list != null) list.remove(this) + // try until success + while (list != null) { + list.remove(this) + } } } -- 2.3.0