diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index a4227a4..8bf1fee 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -35,76 +35,91 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val clientId: String)
- extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
+ extends Iterator[MessageAndMetadata[K, V]] with java.util.Iterator[MessageAndMetadata[K, V]] with Logging {
- private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
+ private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
- override def next(): MessageAndMetadata[K, V] = {
- val item = super.next()
- if(consumedOffset < 0)
- throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
- currentTopicInfo.resetConsumeOffset(consumedOffset)
- val topic = currentTopicInfo.topic
- trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
- consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
- consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
- item
- }
+ private val iter: IteratorTemplate[MessageAndOffset] = new IteratorTemplate[MessageAndOffset] {
+ def makeNext(): MessageAndOffset = {
+ var currentDataChunk: FetchedDataChunk = null
+ // if we don't have an iterator, get one
+ var localCurrent = current.get()
+ if(localCurrent == null || !localCurrent.hasNext) {
+ if (consumerTimeoutMs < 0)
+ currentDataChunk = channel.take
+ else {
+ currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
+ if (currentDataChunk == null) {
+ // There is no more data in the chunk, reset state to make the iterator re-iterable
+ resetState()
+ throw new ConsumerTimeoutException
+ }
+ }
+ if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
+ debug("Received the shutdown command")
+ channel.offer(currentDataChunk)
+ return allDone
+ } else {
+ currentTopicInfo = currentDataChunk.topicInfo
+ val cdcFetchOffset = currentDataChunk.fetchOffset
+ val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
+ if (ctiConsumeOffset < cdcFetchOffset) {
+ error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+ .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
+ currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
+ }
+ localCurrent = currentDataChunk.messages.iterator
- protected def makeNext(): MessageAndMetadata[K, V] = {
- var currentDataChunk: FetchedDataChunk = null
- // if we don't have an iterator, get one
- var localCurrent = current.get()
- if(localCurrent == null || !localCurrent.hasNext) {
- if (consumerTimeoutMs < 0)
- currentDataChunk = channel.take
- else {
- currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
- if (currentDataChunk == null) {
- // reset state to make the iterator re-iterable
- resetState()
- throw new ConsumerTimeoutException
+ current.set(localCurrent)
}
+ // if we just updated the current chunk and it is empty that means the fetch size is too small!
+ if(currentDataChunk.messages.validBytes == 0)
+ throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
+ "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
+ .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
}
- if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
- debug("Received the shutdown command")
- channel.offer(currentDataChunk)
- return allDone
- } else {
- currentTopicInfo = currentDataChunk.topicInfo
- val cdcFetchOffset = currentDataChunk.fetchOffset
- val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
- if (ctiConsumeOffset < cdcFetchOffset) {
- error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
- .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
- currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
- }
- localCurrent = currentDataChunk.messages.iterator
+ var item = localCurrent.next()
- current.set(localCurrent)
+ // reject the messages that have already been consumed
+ while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
+ item = localCurrent.next()
}
- // if we just updated the current chunk and it is empty that means the fetch size is too small!
- if(currentDataChunk.messages.validBytes == 0)
- throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
- "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
- .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
- }
- var item = localCurrent.next()
- // reject the messages that have already been consumed
- while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
- item = localCurrent.next()
+ consumedOffset = item.nextOffset
+
+ // validate checksum of message to ensure it is valid
+ item.message.ensureValid()
+ item
}
- consumedOffset = item.nextOffset
- item.message.ensureValid() // validate checksum of message to ensure it is valid
+ }
+
+ def hasNext(): Boolean = iter.hasNext
+
+ // Currently the iterator template does not support remove,
+ // hence calling this function will throw an exception directly
+ def remove() = iter.remove
+
+ def next(): MessageAndMetadata[K, V] = {
+ val item = iter.next()
+ if(consumedOffset < 0)
+ throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
+ // decode the message
val keyBuffer = item.message.key
val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
- new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
+
+ // update the consumer offsets and metrics
+ currentTopicInfo.resetConsumeOffset(consumedOffset)
+ val topic = currentTopicInfo.topic
+ val partitionId = currentTopicInfo.partitionId
+ trace("Setting [%s,%d] consumed offset to %d".format(topic, partitionId, consumedOffset))
+ consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
+ consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
+ MessageAndMetadata(key, value, topic, partitionId, item.offset)
}
def clearCurrentChunk() {
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index ef1de83..d69ec8d 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -88,4 +88,40 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
assertEquals(unconsumed, receivedMessages)
}
+
+ @Test
+ def testConsumerIteratorDecodingFailure() {
+ val messageStrings = (0 until 10).map(_.toString).toList
+ val messages = messageStrings.map(s => new Message(s.getBytes))
+ val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages:_*)
+
+ topicInfos(0).enqueue(messageSet)
+ assertEquals(1, queue.size)
+
+ val iter = new ConsumerIterator[String, String](queue,
+ consumerConfig.consumerTimeoutMs,
+ new FailDecoder(),
+ new FailDecoder(),
+ clientId = "")
+
+ var needBreak = false
+ while (iter.hasNext() && !needBreak) {
+ try {
+ iter.next
+ }
+ catch {
+ case e: UnsupportedOperationException => needBreak = true // this is ok
+ case e2: Throwable => throw e2
+ }
+ }
+
+ // the offset should not have changed
+ assertEquals(5, consumedOffset)
+ }
+
+ class FailDecoder(props: VerifiableProperties = null) extends Decoder[String] {
+ def fromBytes(bytes: Array[Byte]): String = {
+ throw new UnsupportedOperationException("This decoder does not work at all..")
+ }
+ }
}
diff --git a/project/Build.scala b/project/Build.scala
index 098e874..500040b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -71,9 +71,7 @@ object KafkaBuild extends Build {
- ,
- mappings in packageBin in Compile += file("LICENSE") -> "LICENSE",
- mappings in packageBin in Compile += file("NOTICE") -> "NOTICE"
+
)
val hadoopSettings = Seq(