diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index dc066c2..9ce78be 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -201,7 +201,7 @@ object ConsoleConsumer extends Logging { for(messageAndTopic <- iter) { try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) + formatter.writeTo(messageAndTopic.key, messageAndTopic.value, System.out) numMessages += 1 } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index a4227a4..55ecf8c 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -35,76 +35,86 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], val clientId: String) - extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { + extends Iterator[MessageAndMetadata[K, V]] with java.util.Iterator[MessageAndMetadata[K, V]] with Logging { - private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) - override def next(): MessageAndMetadata[K, V] = { - val item = super.next() - if(consumedOffset < 0) - throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) - currentTopicInfo.resetConsumeOffset(consumedOffset) - val topic = currentTopicInfo.topic - trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) - consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() - consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() - item - } + private val iter: IteratorTemplate[MessageAndOffset] = new IteratorTemplate[MessageAndOffset] { + def makeNext(): MessageAndOffset = { + var currentDataChunk: FetchedDataChunk = null + // if we don't have an iterator, get one + var localCurrent = current.get() + if(localCurrent == null || !localCurrent.hasNext) { + if (consumerTimeoutMs < 0) + currentDataChunk = channel.take + else { + currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) + if (currentDataChunk == null) { + // There is no more data in the chunk, reset state to make the iterator re-iterable + resetState() + throw new ConsumerTimeoutException + } + } + if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { + debug("Received the shutdown command") + channel.offer(currentDataChunk) + return allDone + } else { + currentTopicInfo = currentDataChunk.topicInfo + val cdcFetchOffset = currentDataChunk.fetchOffset + val ctiConsumeOffset = currentTopicInfo.getConsumeOffset + if (ctiConsumeOffset < cdcFetchOffset) { + error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" + .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) + currentTopicInfo.resetConsumeOffset(cdcFetchOffset) + } + localCurrent = currentDataChunk.messages.iterator - protected def makeNext(): MessageAndMetadata[K, V] = { - var currentDataChunk: FetchedDataChunk = null - // if we don't have an iterator, get one - var localCurrent = current.get() - if(localCurrent == null || !localCurrent.hasNext) { - if (consumerTimeoutMs < 0) - currentDataChunk = channel.take - else { - currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) - if (currentDataChunk == null) { - // reset state to make the iterator re-iterable - resetState() - throw new ConsumerTimeoutException + current.set(localCurrent) } + // if we just updated the current chunk and it is empty that means the fetch size is too small! + if(currentDataChunk.messages.validBytes == 0) + throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." + .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } - if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { - debug("Received the shutdown command") - channel.offer(currentDataChunk) - return allDone - } else { - currentTopicInfo = currentDataChunk.topicInfo - val cdcFetchOffset = currentDataChunk.fetchOffset - val ctiConsumeOffset = currentTopicInfo.getConsumeOffset - if (ctiConsumeOffset < cdcFetchOffset) { - error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" - .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) - currentTopicInfo.resetConsumeOffset(cdcFetchOffset) - } - localCurrent = currentDataChunk.messages.iterator + var item = localCurrent.next() - current.set(localCurrent) + // reject the messages that have already been consumed + while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { + item = localCurrent.next() } - // if we just updated the current chunk and it is empty that means the fetch size is too small! - if(currentDataChunk.messages.validBytes == 0) - throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + - "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." - .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) - } - var item = localCurrent.next() - // reject the messages that have already been consumed - while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { - item = localCurrent.next() + consumedOffset = item.nextOffset + + // validate checksum of message to ensure it is valid + item.message.ensureValid() + item } - consumedOffset = item.nextOffset - item.message.ensureValid() // validate checksum of message to ensure it is valid + } + + def hasNext(): Boolean = iter.hasNext + + // Currently the iterator template does not support remove, + // hence calling this function will throw an exception directly + def remove() = iter.remove + + def next(): MessageAndMetadata[K, V] = { + val item = iter.next() + if(consumedOffset < 0) + throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) - val keyBuffer = item.message.key - val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) - val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) - new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset) + // update the consumer offsets and metrics + currentTopicInfo.resetConsumeOffset(consumedOffset) + val topic = currentTopicInfo.topic + val partitionId = currentTopicInfo.partitionId + trace("Setting [%s,%d] consumed offset to %d".format(topic, partitionId, consumedOffset)) + consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() + consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() + MessageAndMetadata(topic, partitionId, item.message, item.offset, keyDecoder, valueDecoder) } def clearCurrentChunk() { diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala index 20c0e70..1a0efc1 100644 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -17,5 +17,18 @@ package kafka.message -case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int, offset: Long) +import kafka.serializer.Decoder +import kafka.utils.Utils + +case class MessageAndMetadata[K, V](topic: String, partition: Int, + message: Message, offset: Long, + keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { + + /** + * Return the decoded message key and payload + */ + def key(): K = if(message.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(message.key)) + + def value(): V = if(message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(message.payload)) +} diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index f0f871c..216b8d2 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -193,13 +193,13 @@ object MirrorMaker extends Logging { // Otherwise use a pre-assigned producer to send the message if (msgAndMetadata.key == null) { trace("Send the non-keyed message the producer channel.") - val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.value) producerDataChannel.sendRequest(pd) } else { val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) val producer = producers(producerId) - val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.value) producer.send(pd) } } diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 814d61a..baca4b4 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -163,7 +163,7 @@ object ReplayLogProducer extends Logging { stream for (messageAndMetadata <- iter) { try { - producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message)) + producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.value)) if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0) Thread.sleep(config.delayedMSBtwSend) messageCount += 1 diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala index 22b16e5..1d06dfc 100644 --- a/core/src/test/scala/other/kafka/TestLogCleaning.scala +++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala @@ -291,7 +291,7 @@ object TestLogCleaning { try { for(item <- stream) { val delete = item.message == null - val value = if(delete) -1L else item.message.toLong + val value = if(delete) -1L else item.value.toLong consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString) consumedWriter.newLine() } diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index 31534ca..1b3755b 100644 --- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -61,7 +61,7 @@ private class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]]) exte override def run() { println("Starting consumer thread..") for (messageAndMetadata <- stream) { - println("consumed: " + new String(messageAndMetadata.message, "UTF-8")) + println("consumed: " + new String(messageAndMetadata.value, "UTF-8")) } shutdownLatch.countDown println("thread shutdown !" ) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index ef1de83..ce2e14a 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -85,7 +85,42 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { assertFalse(iter.hasNext) assertEquals(1, queue.size) // This is only the shutdown command. assertEquals(5, receivedMessages.size) - val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) + val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => m.message) assertEquals(unconsumed, receivedMessages) } + + @Test + def testConsumerIteratorDecodingFailure() { + val messageStrings = (0 until 10).map(_.toString).toList + val messages = messageStrings.map(s => new Message(s.getBytes)) + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*) + + topicInfos(0).enqueue(messageSet) + assertEquals(1, queue.size) + + val iter = new ConsumerIterator[String, String](queue, + -1, + new FailDecoder(), + new FailDecoder(), + clientId = "") + + val receivedMessages = (0 until 5).map{ i => + val message = iter.next + assertEquals(message.offset, i + consumedOffset) + + try { + message.value // should fail + } + catch { + case e: UnsupportedOperationException => // this is ok + case e2: Throwable => fail("Unexpected exception when iterating the message set. " + e2.getMessage) + } + } + } + + class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] { + def fromBytes(bytes: Array[Byte]): String = { + throw new UnsupportedOperationException("This decoder does not work at all..") + } + } } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 8fe7259..4a2a5b6 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -294,9 +294,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val iterator = messageStream.iterator for (i <- 0 until nMessages * 2) { assertTrue(iterator.hasNext()) - val message = iterator.next().message - receivedMessages ::= message - debug("received message: " + message) + val value = iterator.next().value + receivedMessages ::= value + debug("received message payload: " + value) } } } @@ -396,9 +396,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val iterator = messageStream.iterator for(i <- 0 until nMessagesPerThread) { assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) + val value = iterator.next.value + messages ::= value + debug("received message payload: " + value) } } } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 43af649..9a41e3e 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -110,9 +110,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val iterator = messageStream.iterator for (i <- 0 until nMessagesPerThread) { assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) + val value = iterator.next.value + messages ::= value + debug("received message payload: " + value) } } } diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 13135b9..019c08e 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -59,6 +59,6 @@ public class Consumer extends Thread KafkaStream stream = consumerMap.get(topic).get(0); ConsumerIterator it = stream.iterator(); while(it.hasNext()) - System.out.println(new String(it.next().message())); + System.out.println(new String(it.next().value())); } } diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index ec3cd29..090e21f 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -160,7 +160,7 @@ object ConsumerPerformance { try { for (messageAndMetadata <- stream if messagesRead < config.numMessages) { messagesRead += 1 - bytesRead += messageAndMetadata.message.length + bytesRead += messageAndMetadata.value.length if (messagesRead % config.reportingInterval == 0) { if(config.showDetailedStats)