Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1233136) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -63,15 +63,20 @@ } val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true) val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2)) - try { - getMessages(nMessages*2, topicMessageStreams0) - fail("should get an exception") + + // no messages to consume, we should hit timeout; + // also the iterator should support re-entrant, so loop it twice + for (i <- 0 until 2) { + try { + getMessages(nMessages*2, topicMessageStreams0) + fail("should get an exception") + } + catch { + case e: ConsumerTimeoutException => // this is ok + case e => throw e + } } - catch { - case e: ConsumerTimeoutException => // this is ok - println("This is ok") - case e => throw e - } + zkConsumerConnector0.shutdown // send some messages to each broker Index: core/src/main/scala/kafka/utils/IteratorTemplate.scala =================================================================== --- core/src/main/scala/kafka/utils/IteratorTemplate.scala (revision 1233136) +++ core/src/main/scala/kafka/utils/IteratorTemplate.scala (working copy) @@ -72,6 +72,9 @@ def remove = throw new UnsupportedOperationException("Removal not supported") - + + protected def resetState() { + state = NOT_READY + } } Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1233136) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -58,6 +58,8 @@ else { currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) if (currentDataChunk == null) { + // reset state to make the iterator re-iterable + resetState() throw new ConsumerTimeoutException } }