From 96df554d31cda31cd874876e3b6802910390b516 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 17 Feb 2015 21:27:23 -0800 Subject: [PATCH] kafka-1952 --- .../main/scala/kafka/server/DelayedOperation.scala | 34 +++++++++++++++------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index fc06b01..1d11099 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -128,25 +128,37 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI * @return true iff the delayed operations can be completed by the caller */ def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = { + assert(watchKeys.size > 0, "The watch key list can't be empty") + + // The cost of tryComplete() is typically proportional to the number of keys. Calling + // tryComplete() for each key is going to be expensive if there are many keys. Instead, + // we do the check in the following way. Call tryComplete(). If the operation is not completed, + // we just add the operation to all keys. Then we call tryComplete() again. At this time, if + // the operation is still not completed, we are guaranteed that it won't miss any future triggering + // event since the operation is already on the watcher list for all keys. This does mean that + // if the operation is completed (by another thread) between the two tryComplete() calls, the + // operation is unnecessarily added for watch. However, this is a less severe issue since the + // expire reaper will clean it up periodically. + + var isCompletedByMe = operation synchronized operation.tryComplete() + if (isCompletedByMe) + return true + for(key <- watchKeys) { - // if the operation is already completed, stopping adding it to - // any further lists and return false + // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted()) return false val watchers = watchersFor(key) - // if the operation can by completed by myself, stop adding it to - // any further lists and return true immediately - if(operation synchronized operation.tryComplete()) { - return true - } else { - watchers.watch(operation) - } + watchers.watch(operation) } + isCompletedByMe = operation synchronized operation.tryComplete() + if (isCompletedByMe) + return true + // if it cannot be completed by now and hence is watched, add to the expire queue also - if (! operation.isCompleted()) { + if (! operation.isCompleted()) expirationReaper.enqueue(operation) - } false } -- 1.8.5.2 (Apple Git-48)