From 9701809203e2e1f27e5b06eef23917d63676715b Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 11 Nov 2014 14:58:16 -0800 Subject: [PATCH 1/2] fix for KAFKA-1764 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..2402b45 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -256,7 +256,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def sendShutdownToAllQueues() = { - for (queue <- topicThreadIdAndQueues.values) { + for (queue <- topicThreadIdAndQueues.values.toSet) { debug("Clearing up queue") queue.clear() queue.put(ZookeeperConsumerConnector.shutdownCommand) -- 1.8.3.4 (Apple Git-47) From af59dffbea55cbc1c258d3cee4137423ce1dcb9a Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 12 Nov 2014 13:47:40 -0800 Subject: [PATCH 2/2] Changed Consumer iterator to stop putting the shutdown message back into channel. --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index ac491b4..78fbf75 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -71,7 +71,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk } if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { debug("Received the shutdown command") - channel.offer(currentDataChunk) return allDone } else { currentTopicInfo = currentDataChunk.topicInfo -- 1.8.3.4 (Apple Git-47)