From 861fb8320e1c9f1ef4e89b5a1aae130299d1f0eb Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Thu, 9 Jul 2015 15:32:14 -0700 Subject: [PATCH] KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer) --- .../main/scala/kafka/consumer/SimpleConsumer.scala | 15 ++++++++++++++- .../scala/kafka/server/AbstractFetcherThread.scala | 4 +++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index c16f7ed..db2b108 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -18,7 +18,7 @@ package kafka.consumer -import java.nio.channels.ClosedByInterruptException +import java.nio.channels.{AsynchronousCloseException, ClosedByInterruptException} import kafka.api._ import kafka.network._ @@ -59,6 +59,16 @@ class SimpleConsumer(val host: String, connect() } + /** + * Unblock thread by closing channel and triggering AsynchronousCloseException if a read operation is in progress. + * + * This handles a bug found in Java 1.7 and below, where interrupting a thread can not correctly unblock + * the thread from waiting on ReadableByteChannel.read(). + */ + def unblockHandleJavaBug() = { + disconnect() + } + def close() { lock synchronized { disconnect() @@ -76,6 +86,9 @@ class SimpleConsumer(val host: String, } catch { case e : ClosedByInterruptException => throw e + // Should not observe this exception when running Kafka with Java 1.8 + case e: AsynchronousCloseException => + throw e case e : Throwable => info("Reconnect due to socket error: %s".format(e.toString)) // retry once diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 83fc474..0f79e4a 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -67,7 +67,9 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) override def shutdown(){ - initiateShutdown() + val justShutdown = initiateShutdown() + if (justShutdown && isInterruptible) + simpleConsumer.unblockHandleJavaBug() inLock(partitionMapLock) { partitionMapCond.signalAll() } -- 1.7.9.5