From 9f05286e3b13ed5233f8f6d9ff220e008765c8c5 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 13 Feb 2015 17:47:17 -0800 Subject: [PATCH 1/2] KAFKA-1952; High CPU Usage in 0.8.2 release --- .../main/scala/kafka/server/RequestPurgatory.scala | 62 +++++++++++++--------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index 9d76234..d6aa644 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -92,28 +92,51 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expirationThread.start() /** + * Is this request satisfied by the caller thread? + */ + private def isSatisfiedByMe(delayedRequest: T): Boolean = { + if(delayedRequest.satisfied.compareAndSet(false, true)) + return true + else + return false + } + + /** * Try to add the request for watch on all keys. Return true iff the request is * satisfied and the satisfaction is done by the caller. - * - * Requests can be watched on only a few of the keys if it is found satisfied when - * trying to add it to each one of the keys. In this case the request is still treated as satisfied - * and hence no longer watched. Those already added elements will be later purged by the expire reaper. */ def checkAndMaybeWatch(delayedRequest: T): Boolean = { + if (delayedRequest.keys.size <=0 ) + return isSatisfiedByMe(delayedRequest) + + // The cost of checkSatisfied() is typically proportional to the number of keys. Calling + // checkSatisfied() for each key is going to be expensive if there are many keys. Instead, + // we do the check in the following way. Call checkSatisfied(). If the request is not satisfied, + // we just add the request to all keys. Then we call checkSatisfied() again. At this time, if + // the request is still not satisfied, we are guaranteed that it won't miss any future triggering + // events since the request is already on the watcher list for all keys. This does mean that + // if the request is satisfied (by another thread) between the two checkSatisfied() calls, the + // request is unnecessarily added for watch. However, this is a less severe issue since the + // expire reaper will clean it up periodically. + + var isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest) + if (isSatisfied) + return isSatisfiedByMe(delayedRequest) + for(key <- delayedRequest.keys) { val lst = watchersFor(key) - if(!lst.checkAndMaybeAdd(delayedRequest)) { - if(delayedRequest.satisfied.compareAndSet(false, true)) - return true - else - return false - } + lst.add(delayedRequest) } - // if it is indeed watched, add to the expire queue also - expiredRequestReaper.enqueue(delayedRequest) + isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest) + if (isSatisfied) + return isSatisfiedByMe(delayedRequest) + else { + // If the request is still not satisfied, add to the expire queue also. + expiredRequestReaper.enqueue(delayedRequest) - false + return false + } } /** @@ -171,19 +194,10 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt // return the size of the watch list def watched() = requests.size() - // potentially add the element to watch if it is not satisfied yet - def checkAndMaybeAdd(t: T): Boolean = { + // add the element to the watcher list + def add(t: T) { synchronized { - // if it is already satisfied, do not add to the watch list - if (t.satisfied.get) - return false - // synchronize on the delayed request to avoid any race condition - // with expire and update threads on client-side. - if(t synchronized checkSatisfied(t)) { - return false - } requests.add(t) - return true } } -- 1.8.5.2 (Apple Git-48) From 4bfb7b23bd9d8974dbb5c1bfd1e32a3b61cdd93d Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sun, 15 Feb 2015 15:25:07 -0800 Subject: [PATCH 2/2] address review comments --- core/src/main/scala/kafka/server/RequestPurgatory.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index d6aa644..87ee3be 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -106,7 +106,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt * satisfied and the satisfaction is done by the caller. */ def checkAndMaybeWatch(delayedRequest: T): Boolean = { - if (delayedRequest.keys.size <=0 ) + if (delayedRequest.keys.size <= 0) return isSatisfiedByMe(delayedRequest) // The cost of checkSatisfied() is typically proportional to the number of keys. Calling @@ -125,7 +125,11 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt for(key <- delayedRequest.keys) { val lst = watchersFor(key) - lst.add(delayedRequest) + if (!lst.addIfNotSatisfied(delayedRequest)) { + // The request is already satisfied by another thread. No need to watch for the rest of + // the keys. + return false + } } isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest) @@ -194,11 +198,16 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt // return the size of the watch list def watched() = requests.size() - // add the element to the watcher list - def add(t: T) { + // add the element to the watcher list if it's not already satisfied + def addIfNotSatisfied(t: T): Boolean = { + if (t.satisfied.get) + return false + synchronized { requests.add(t) } + + return true } // traverse the list and purge satisfied elements -- 1.8.5.2 (Apple Git-48)