From 7019f23dd1212ce96190b5bd5697894bef557167 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Sat, 15 Nov 2014 20:00:58 -0800 Subject: [PATCH] KAFKA-1780 Add peek and poll methods to ConsumerIterator. --- .../scala/kafka/consumer/ConsumerIterator.scala | 34 ++++++++--- .../main/scala/kafka/utils/IteratorTemplate.scala | 67 +++++++++++++++----- .../unit/kafka/consumer/ConsumerIteratorTest.scala | 41 +++++++++++++ .../unit/kafka/utils/IteratorTemplateTest.scala | 71 +++++++++++++++++++--- 4 files changed, 180 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 78fbf75..bb85e8b 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -17,7 +17,7 @@ package kafka.consumer -import kafka.utils.{IteratorTemplate, Logging, Utils} +import kafka.utils.{NonBlockingIteratorTemplate, Logging, Utils} import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference @@ -35,7 +35,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], val clientId: String) - extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { + extends NonBlockingIteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null @@ -54,20 +54,36 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk item } - protected def makeNext(): MessageAndMetadata[K, V] = { + protected def makeNext(timeout: Long, unit: TimeUnit): MessageAndMetadata[K, V] = { var currentDataChunk: FetchedDataChunk = null // if we don't have an iterator, get one var localCurrent = current.get() if(localCurrent == null || !localCurrent.hasNext) { - if (consumerTimeoutMs < 0) + var canConsumerTimeout = false + if (timeout < 0 && consumerTimeoutMs < 0) currentDataChunk = channel.take + else if (timeout == 0) + currentDataChunk = channel.peek() else { - currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) - if (currentDataChunk == null) { - // reset state to make the iterator re-iterable - resetState() - throw new ConsumerTimeoutException + val timeoutMs = unit.toMillis(timeout) + var pollTimeout: Long = 0 + if (timeout >= 0 && (consumerTimeoutMs < 0 || timeoutMs < consumerTimeoutMs)) { + pollTimeout = timeoutMs + canConsumerTimeout = false } + else { + pollTimeout = consumerTimeoutMs + canConsumerTimeout = true + } + currentDataChunk = channel.poll(pollTimeout, TimeUnit.MILLISECONDS) + } + + if (currentDataChunk == null) { + // reset state to make the iterator re-iterable + resetState() + // Consumer timeout only makes sense if the invoker specified an indefinite timeout. + if (canConsumerTimeout) throw new ConsumerTimeoutException() + return null } if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { debug("Received the shutdown command") diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala index fd952f3..7865abf 100644 --- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala +++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala @@ -17,7 +17,7 @@ package kafka.utils -import java.lang.IllegalStateException +import java.util.concurrent.TimeUnit class State object DONE extends State @@ -29,10 +29,10 @@ object FAILED extends State * Transliteration of the iterator template in google collections. To implement an iterator * override makeNext and call allDone() when there is no more items */ -abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] { - - private var state: State = NOT_READY - private var nextItem = null.asInstanceOf[T] +abstract class IteratorTemplate[T >: Null <: AnyRef] extends Iterator[T] with java.util.Iterator[T] { + + protected[this] var state: State = NOT_READY + protected[this] var nextItem: T = null def next(): T = { if(!hasNext()) @@ -42,13 +42,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T throw new IllegalStateException("Expected item but none found.") nextItem } - - def peek(): T = { - if(!hasNext()) - throw new NoSuchElementException() - nextItem - } - + def hasNext(): Boolean = { if(state == FAILED) throw new IllegalStateException("Iterator is in failed state") @@ -59,7 +53,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T } } - protected def makeNext(): T + protected[this] def makeNext(): T def maybeComputeNext(): Boolean = { state = FAILED @@ -72,16 +66,57 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T } } - protected def allDone(): T = { + protected[this] def allDone(): T = { state = DONE - null.asInstanceOf[T] + null } def remove = throw new UnsupportedOperationException("Removal not supported") - protected def resetState() { + protected[this] def resetState() { state = NOT_READY } } +abstract class NonBlockingIteratorTemplate[T >: Null <: AnyRef] extends IteratorTemplate[T] { + def peek(): T = + poll(0L, TimeUnit.MILLISECONDS) + + def poll(timeout: Long, unit: TimeUnit): T = { + if(state == FAILED) + throw new IllegalStateException("Iterator is in failed state") + state match { + case DONE => throw new NoSuchElementException() + case READY => nextItem + case _ => { + if (maybeComputeNext(timeout, unit)) { + resetState() + if(nextItem == null) + throw new IllegalStateException("Expected item but none found.") + nextItem + } + else { + null + } + } + } + } + + protected[this] def maybeComputeNext(timeout: Long, unit: TimeUnit): Boolean = { + state = FAILED + nextItem = makeNext(timeout, unit) + if(state == DONE || nextItem == null) { + false + } else { + state = READY + true + } + } + + override def makeNext(): T = + makeNext(-1L, TimeUnit.MILLISECONDS) + + protected[this] def makeNext(timeout: Long, unit: TimeUnit): T +} + diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index c0355cc..8ce57fa 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -62,6 +62,47 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) } + def testConsumerIteratorPeekPoll(): Unit = { + val timeout = 200 + val maxPeekTime = 10 + val pollTimeout = 50 + val pollTimeoutSlack = 5 + val iter = new ConsumerIterator[String, String](queue, + timeout, + new StringDecoder(), + new StringDecoder(), + clientId = "") + + { + val started = SystemTime.milliseconds + val item = iter.peek() + val finished = SystemTime.milliseconds + assertEquals(null, item) + assertTrue("peek on empty consumer should return immediately", (finished - started) < maxPeekTime) + } + + { + val started = SystemTime.milliseconds + val item = iter.poll(pollTimeout, TimeUnit.MILLISECONDS) + val finished = SystemTime.milliseconds + assertEquals(null, item) + assertTrue("poll on empty consumer should return after timeout", math.abs((finished - started) - pollTimeout) < pollTimeoutSlack) + } + + { + val messageStrings = (0 until 10).map(_.toString).toList + val messages = messageStrings.map(s => new Message(s.getBytes)) + val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages: _*) + topicInfos(0).enqueue(messageSet) + assertEquals(1, queue.size) + val item = iter.peek() + assertEquals(messageStrings(consumedOffset), item.message()) + + val itemRemoved = iter.poll(pollTimeout, TimeUnit.MILLISECONDS) + assertEquals(messageStrings(consumedOffset), item.message()) + } + } + @Test def testConsumerIteratorDeduplicationDeepIterator() { val messageStrings = (0 until 10).map(_.toString).toList diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala index 46a4e89..38e31a8 100644 --- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala +++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala @@ -16,14 +16,16 @@ */ package kafka.utils +import java.util.concurrent.TimeUnit + import junit.framework.Assert._ import org.scalatest.Assertions import org.junit.{Test, After, Before} class IteratorTemplateTest extends Assertions { - val lst = (0 until 10).toSeq - val iterator = new IteratorTemplate[Int]() { + val lst = (0 until 10).map(_.toString).toSeq + val iterator = new IteratorTemplate[String]() { var i = 0 override def makeNext() = { if(i >= lst.size) { @@ -36,22 +38,75 @@ class IteratorTemplateTest extends Assertions { } } + val nonBlockingIterator = new NonBlockingIteratorTemplate[String]() { + var i = 0 + var curTime: Long = 0L + var makeAvailable: Boolean = true + override def makeNext(timeout: Long, unit: TimeUnit) = { + if(i >= lst.size) { + allDone() + } else { + if (makeAvailable) { + val item = lst(i) + i += 1 + item + } else { + if (timeout > 0) + curTime += timeout + resetState() + null + } + } + } + } + @Test def testIterator() { - for(i <- 0 until 10) { + for(i <- lst) { assertEquals("We should have an item to read.", true, iterator.hasNext) assertEquals("Checking again shouldn't change anything.", true, iterator.hasNext) - assertEquals("Peeking at the item should show the right thing.", i, iterator.peek) - assertEquals("Peeking again shouldn't change anything", i, iterator.peek) assertEquals("Getting the item should give the right thing.", i, iterator.next) } assertEquals("All gone!", false, iterator.hasNext) intercept[NoSuchElementException] { - iterator.peek + iterator.next } + } + + @Test + def testNonBlockingIterator() { + for(i <- lst) { + if (i.toInt % 2 == 0) { + assertEquals("We should have an item to read.", true, nonBlockingIterator.hasNext) + assertEquals("Checking again shouldn't change anything.", true, nonBlockingIterator.hasNext) + assertEquals("Peeking at the item should show the right thing.", i, nonBlockingIterator.peek) + assertEquals("Peeking again shouldn't change anything", i, nonBlockingIterator.peek) + assertEquals("Getting the item should give the right thing.", i, nonBlockingIterator.next) + } else { + val startTime = nonBlockingIterator.curTime + val timeout = 50 + // Force one execution simulating nothing there + nonBlockingIterator.makeAvailable = false + val firstPollValue = nonBlockingIterator.poll(timeout, TimeUnit.MILLISECONDS) + assertEquals("Polling with no items available should return null", null, firstPollValue) + assertEquals("Polling with no items available should wait the full timeout period", startTime + timeout, nonBlockingIterator.curTime) + // And then make the item available + nonBlockingIterator.makeAvailable = true + val secondPollValue = nonBlockingIterator.poll(timeout, TimeUnit.MILLISECONDS) + assertEquals("Polling with an item available should return the right thing", i, secondPollValue) + assertEquals("No mock time should pass when polling with an item available", startTime + timeout, nonBlockingIterator.curTime) + } + } + assertEquals("All gone!", false, nonBlockingIterator.hasNext) intercept[NoSuchElementException] { - iterator.next + nonBlockingIterator.peek + } + intercept[NoSuchElementException] { + nonBlockingIterator.poll(100, TimeUnit.MILLISECONDS) + } + intercept[NoSuchElementException] { + nonBlockingIterator.next } } - + } \ No newline at end of file -- 2.1.2