From 6d18239bcdb5ebc4d2fe3e124deb1564872cea61 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 2 Feb 2015 13:57:03 -0800 Subject: [PATCH] Fixing KAFKA-1886. SimpleConsumer should not swallow ClosedByInterruptException --- .../main/scala/kafka/consumer/SimpleConsumer.scala | 4 +++ .../unit/kafka/integration/PrimitiveApiTest.scala | 30 ++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index cbef84a..eb91419 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -22,6 +22,8 @@ import kafka.network._ import kafka.utils._ import kafka.common.{ErrorMapping, TopicAndPartition} import org.apache.kafka.common.utils.Utils._ +import java.nio.channels.ClosedByInterruptException + /** * A consumer of kafka messages @@ -70,6 +72,8 @@ class SimpleConsumer(val host: String, blockingChannel.send(request) response = blockingChannel.receive() } catch { + case e : ClosedByInterruptException => + throw e case e : Throwable => info("Reconnect due to socket error: %s".format(e.toString)) // retry once diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index aeb7a19..ac56846 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -33,6 +33,8 @@ import kafka.utils.{StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties import TestUtils._ +import java.nio.channels.ClosedByInterruptException + /** * End to end tests of the primitive apis against a local server @@ -267,4 +269,32 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } + + /** + * This test creates a blocking request using a SimpleConsumer to a topic that does not contain any data. + * This checks that the thread performing the fetch throws a ClosedByInterruptException if interrupted. + */ + def testInterruptSimpleConsumer() { + val newTopic = "new-topic" + TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers) + var exception : Throwable = null + val thread = new Thread { + override def run { + try + { + val fetchResponse = consumer.fetch(new FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0, 10000).build()) + } + catch { + case e: Throwable =>{ + exception = e + } + } + } + } + thread.start() + Thread.sleep(100) + thread.interrupt() + thread.join() + assertTrue("Exception thrown from SimpleConsumer should be an instance of ClosedByInterruptException", exception.isInstanceOf[ClosedByInterruptException]) + } } -- 1.7.12.4