Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1157908) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -288,6 +288,19 @@ ZkUtils.updatePersistentPath(zkClient, path, offset.toString) } + + def getMessageIterator(iter: Iterator[MessageAndOffset]): Iterator[Message] = { + new IteratorTemplate[Message] { + val it = iter + override def makeNext(): Message = { + if (it.hasNext) + return it.next.message + else + return allDone() + } + } + } + } object TestZKUtils { Index: core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (revision 1157908) +++ core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (working copy) @@ -20,6 +20,7 @@ import java.nio._ import junit.framework.Assert._ import org.junit.Test +import kafka.utils.{IteratorTemplate, TestUtils} class ByteBufferMessageSetTest extends BaseMessageSetTestCases { @@ -49,4 +50,28 @@ assertTrue(messages.equals(moreMessages)) } + + @Test + def testIterator() { + val messageList = List( + new Message("msg1".getBytes), + new Message("msg2".getBytes), + new Message("msg3".getBytes) + ) + + { + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*) + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + //make sure ByteBufferMessageSet is re-iterable. + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + } + + { + val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + //make sure ByteBufferMessageSet is re-iterable. + TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) + } + } + }