From befc7c8b25ce8d9abfbd6c4791ee32df7533d1b4 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 25 Apr 2015 07:47:41 -0700 Subject: [PATCH 1/2] KAFKA-2150. FetcherThread backoff need to grab lock before wait on condition. --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a439046..34d61a9 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -92,7 +92,9 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke processFetchRequest(fetchRequest) else { trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) - partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + inLock(partitionMapLock) { + partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) + } } } -- 2.3.2 (Apple Git-55) From 90ba0606353fcff0c7a78cd361f78e81b329cc60 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 25 Apr 2015 13:11:46 -0700 Subject: [PATCH 2/2] KAFKA-2150. FetcherThread backoff need to grab lock before wait on condition. --- .../main/scala/kafka/server/AbstractFetcherThread.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 34d61a9..2e949d4 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -76,7 +76,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } override def doWork() { - + var fetchRequest: FetchRequest = null inLock(partitionMapLock) { partitionMap.foreach { case((topicAndPartition, partitionFetchState)) => @@ -84,18 +84,14 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) } - } - val fetchRequest = fetchRequestBuilder.build() - - if (!fetchRequest.requestInfo.isEmpty) - processFetchRequest(fetchRequest) - else { - trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) - inLock(partitionMapLock) { + fetchRequest = fetchRequestBuilder.build() + if (fetchRequest != null && fetchRequest.requestInfo.isEmpty) { + trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } + processFetchRequest(fetchRequest) } private def processFetchRequest(fetchRequest: FetchRequest) { -- 2.3.2 (Apple Git-55)