From 0e410db1e31f8e033d0be3c8b6f8addbadeb3105 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 2 Jun 2015 16:49:39 -0700 Subject: [PATCH] KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) --- .../main/scala/kafka/consumer/SimpleConsumer.scala | 9 ++++++++- .../scala/kafka/server/AbstractFetcherThread.scala | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 31a2639..edb2405 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,7 +18,7 @@ package kafka.consumer -import java.nio.channels.ClosedByInterruptException +import java.nio.channels.{AsynchronousCloseException, ClosedByInterruptException} import kafka.api._ import kafka.network._ @@ -58,6 +58,11 @@ class SimpleConsumer(val host: String, connect() } + def forceClose() = { + disconnect() + isClosed = true + } + def close() { lock synchronized { disconnect() @@ -75,6 +80,8 @@ class SimpleConsumer(val host: String, } catch { case e : ClosedByInterruptException => throw e + case e: AsynchronousCloseException => + throw e case e : Throwable => info("Reconnect due to socket error: %s".format(e.toString)) // retry once diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 83fc474..381e1ae 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -68,11 +68,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke override def shutdown(){ initiateShutdown() + simpleConsumer.forceClose() inLock(partitionMapLock) { partitionMapCond.signalAll() } awaitShutdown() - simpleConsumer.close() } override def doWork() { -- 1.7.9.5