From 35487fa5baa8f6167108f7dc5e4bd889d8957d82 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 13 Nov 2014 23:56:50 -0800 Subject: [PATCH] Build succeeded. Unit test passed. --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala | 1 - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index ac491b4..78fbf75 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -71,7 +71,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk } if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { debug("Received the shutdown command") - channel.offer(currentDataChunk) return allDone } else { currentTopicInfo = currentDataChunk.topicInfo diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..f476973 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -256,7 +256,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def sendShutdownToAllQueues() = { - for (queue <- topicThreadIdAndQueues.values) { + for (queue <- topicThreadIdAndQueues.values.toSet[BlockingQueue[FetchedDataChunk]]) { debug("Clearing up queue") queue.clear() queue.put(ZookeeperConsumerConnector.shutdownCommand) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 151ba7c..c0355cc 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -80,7 +80,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val receivedMessages = (0 until 5).map(i => iter.next.message).toList assertFalse(iter.hasNext) - assertEquals(1, queue.size) // This is only the shutdown command. + assertEquals(0, queue.size) // Shutdown command has been consumed. assertEquals(5, receivedMessages.size) val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) assertEquals(unconsumed, receivedMessages) -- 1.8.3.4 (Apple Git-47)