diff --git core/src/main/scala/kafka/consumer/ConsoleConsumer.scala core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 21aaccc..2e24c9a 100644 --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -119,7 +119,7 @@ object ConsoleConsumer { } }) - var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0) + var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0) val iter = if(maxMessages >= 0) stream.slice(0, maxMessages) diff --git core/src/main/scala/kafka/consumer/ConsumerConnector.scala core/src/main/scala/kafka/consumer/ConsumerConnector.scala index d807b25..2229084 100644 --- core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -20,6 +20,7 @@ package kafka.consumer import scala.collection._ import kafka.utils.Utils import org.apache.log4j.Logger +import kafka.serializer.{DefaultDecoder, Decoder} /** * Main interface for consumer @@ -32,7 +33,9 @@ trait ConsumerConnector { * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the * list is #streams. Each KafkaMessageStream supports an iterator of messages. */ - def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] + def createMessageStreams[T](topicCountMap: Map[String,Int], + decoder: Decoder[T] = new DefaultDecoder) + : Map[String,List[KafkaMessageStream[T]]] /** * Commit the offsets of all broker partitions connected by this connector. diff --git core/src/main/scala/kafka/consumer/ConsumerIterator.scala core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 9b24781..8702138 100644 --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -22,32 +22,35 @@ import org.apache.log4j.Logger import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.cluster.Partition import kafka.message.{MessageAndOffset, MessageSet, Message} +import kafka.serializer.Decoder /** * An iterator that blocks until a value can be read from the supplied queue. * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * */ -class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int) - extends IteratorTemplate[Message] { +class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], + consumerTimeoutMs: Int, + private val decoder: Decoder[T]) + extends IteratorTemplate[T] { - private val logger = Logger.getLogger(classOf[ConsumerIterator]) + private val logger = Logger.getLogger(classOf[ConsumerIterator[T]]) private var current: Iterator[MessageAndOffset] = null private var currentDataChunk: FetchedDataChunk = null private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L - override def next(): Message = { - val message = super.next + override def next(): T = { + val decodedMessage = super.next() if(consumedOffset < 0) throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset)) currentTopicInfo.resetConsumeOffset(consumedOffset) if(logger.isTraceEnabled) logger.trace("Setting consumed offset to %d".format(consumedOffset)) - message + decodedMessage } - protected def makeNext(): Message = { + protected def makeNext(): T = { // if we don't have an iterator, get one if(current == null || !current.hasNext) { if (consumerTimeoutMs < 0) @@ -62,7 +65,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con if(logger.isDebugEnabled) logger.debug("Received the shutdown command") channel.offer(currentDataChunk) - return allDone + return allDone() } else { currentTopicInfo = currentDataChunk.topicInfo if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) { @@ -73,9 +76,9 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con current = currentDataChunk.messages.iterator } } - val item = current.next + val item = current.next() consumedOffset = item.offset - item.message + decoder.toEvent(item.message) } } diff --git core/src/main/scala/kafka/consumer/KafkaMessageStream.scala core/src/main/scala/kafka/consumer/KafkaMessageStream.scala index f6074a8..64fe6dc 100644 --- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala +++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala @@ -20,20 +20,23 @@ package kafka.consumer import java.util.concurrent.BlockingQueue import org.apache.log4j.Logger import kafka.message.Message - +import kafka.serializer.{DefaultDecoder, Decoder} /** * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread * that feeds messages into a blocking queue for processing. */ -class KafkaMessageStream(private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int) - extends Iterable[Message] with java.lang.Iterable[Message]{ +class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk], + consumerTimeoutMs: Int, + private val decoder: Decoder[T]) + extends Iterable[T] with java.lang.Iterable[T]{ private val logger = Logger.getLogger(getClass()) - private val iter: ConsumerIterator = new ConsumerIterator(queue, consumerTimeoutMs) + private val iter: ConsumerIterator[T] = + new ConsumerIterator[T](queue, consumerTimeoutMs, decoder) /** * Create an iterator over messages in the stream. */ - def iterator(): ConsumerIterator = iter + def iterator(): ConsumerIterator[T] = iter } diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 234ae06..4b0676a 100644 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -29,6 +29,7 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import kafka.api.OffsetRequest import java.util.UUID +import kafka.serializer.{DefaultDecoder, Decoder} /** * This class handles the consumers interaction with zookeeper @@ -103,8 +104,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def this(config: ConsumerConfig) = this(config, true) - def createMessageStreams(topicCountMap: Map[String,Int]) : Map[String,List[KafkaMessageStream]] = { - consume(topicCountMap) + def createMessageStreams[T](topicCountMap: Map[String,Int], + decoder: Decoder[T] = new DefaultDecoder) + : Map[String,List[KafkaMessageStream[T]]] = { + consume(topicCountMap, decoder) } private def createFetcher() { @@ -143,13 +146,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def consume(topicCountMap: scala.collection.Map[String,Int]): Map[String,List[KafkaMessageStream]] = { + def consume[T](topicCountMap: scala.collection.Map[String,Int], + decoder: Decoder[T] = new DefaultDecoder) + : Map[String,List[KafkaMessageStream[T]]] = { logger.debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") val dirs = new ZKGroupDirs(config.groupId) - var ret = new mutable.HashMap[String,List[KafkaMessageStream]] + var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]] var consumerUuid : String = null config.consumerId match { @@ -177,11 +182,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // create a queue per topic per consumer thread val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) { - var streamList: List[KafkaMessageStream] = Nil + var streamList: List[KafkaMessageStream[T]] = Nil for (threadId <- threadIdSet) { val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) queues.put((topic, threadId), stream) - streamList ::= new KafkaMessageStream(stream, config.consumerTimeoutMs) + streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder) } ret += (topic -> streamList) logger.debug("adding topic " + topic + " and stream to map..") diff --git core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java index 2f3355a..6f50e97 100644 --- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java +++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java @@ -18,19 +18,25 @@ package kafka.javaapi.consumer; import kafka.consumer.KafkaMessageStream; +import kafka.message.Message; +import kafka.serializer.Decoder; import java.util.List; import java.util.Map; public interface ConsumerConnector { /** - * Create a list of MessageStreams for each topic. + * Create a list of MessageStreams of type T for each topic. * * @param topicCountMap a map of (topic, #streams) pair + * @param decoder a decoder that converts from Message to T * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the * list is #streams. Each KafkaMessageStream supports an iterator of messages. */ - public Map> createMessageStreams(Map topicCountMap); + public Map>> createMessageStreams( + Map topicCountMap, Decoder decoder); + public Map>> createMessageStreams( + Map topicCountMap); /** * Commit the offsets of all broker partitions connected by this connector. diff --git core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 856e6e7..0ee7488 100644 --- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -17,6 +17,8 @@ package kafka.javaapi.consumer import kafka.consumer.{KafkaMessageStream, ConsumerConfig} +import kafka.message.Message +import kafka.serializer.{DefaultDecoder, Decoder} /** * This class handles the consumers interaction with zookeeper @@ -63,15 +65,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def this(config: ConsumerConfig) = this(config, true) // for java client - def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): - java.util.Map[String,java.util.List[KafkaMessageStream]] = { + def createMessageStreams[T]( + topicCountMap: java.util.Map[String,java.lang.Integer], + decoder: Decoder[T]) + : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = { import scala.collection.JavaConversions._ val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) - val scalaReturn = underlying.consume(scalaTopicCountMap) - val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream]] + val scalaReturn = underlying.consume(scalaTopicCountMap, decoder) + val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]] for ((topic, streams) <- scalaReturn) { - var javaStreamList = new java.util.ArrayList[KafkaMessageStream] + var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]] for (stream <- streams) javaStreamList.add(stream) ret.put(topic, javaStreamList) @@ -79,6 +83,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, ret } + def createMessageStreams( + topicCountMap: java.util.Map[String,java.lang.Integer]) + : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] = + createMessageStreams(topicCountMap, new DefaultDecoder) + + def commitOffsets() { underlying.commitOffsets } diff --git core/src/main/scala/kafka/tools/ConsumerShell.scala core/src/main/scala/kafka/tools/ConsumerShell.scala index a083fa1..6234152 100644 --- core/src/main/scala/kafka/tools/ConsumerShell.scala +++ core/src/main/scala/kafka/tools/ConsumerShell.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils import java.util.concurrent.CountDownLatch import org.apache.log4j.Logger import kafka.consumer._ +import kafka.message.Message /** * Program to read using the rich consumer and dump the results to standard out @@ -83,7 +84,7 @@ object ConsumerShell { } } -class ZKConsumerThread(stream: KafkaMessageStream) extends Thread { +class ZKConsumerThread(stream: KafkaMessageStream[Message]) extends Thread { val shutdownLatch = new CountDownLatch(1) val logger = Logger.getLogger(getClass) diff --git core/src/main/scala/kafka/tools/ReplayLogProducer.scala core/src/main/scala/kafka/tools/ReplayLogProducer.scala index eb96574..d33de67 100644 --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -139,7 +139,7 @@ object ReplayLogProducer { } } - class ZKConsumerThread(config: Config, stream: KafkaMessageStream) extends Thread { + class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread { val shutdownLatch = new CountDownLatch(1) val logger = Logger.getLogger(getClass) val props = new Properties() diff --git core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index 1c5f542..c8e4a3c 100644 --- core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -18,6 +18,7 @@ package kafka import consumer._ +import message.Message import utils.Utils import java.util.concurrent.CountDownLatch @@ -55,7 +56,7 @@ object TestZKConsumerOffsets { } } -private class ConsumerThread(stream: KafkaMessageStream) extends Thread { +private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread { val shutdownLatch = new CountDownLatch(1) override def run() { diff --git core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 4002ccf..f5d309d 100644 --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -27,6 +27,7 @@ import kafka.utils.{TestZKUtils, TestUtils} import org.scalatest.junit.JUnit3Suite import org.apache.log4j.{Level, Logger} import kafka.message._ +import kafka.serializer.StringDecoder class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness { private val logger = Logger.getLogger(getClass()) @@ -124,26 +125,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers]) requestHandlerLogger.setLevel(Level.FATAL) - var actualMessages: List[Message] = Nil - - // test consumer timeout logic - val consumerConfig0 = new ConsumerConfig( - TestUtils.createConsumerProperties(zkConnect, group, consumer0)) { - override val consumerTimeoutMs = 200 - } - val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true) - val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2)) - try { - getMessages(nMessages*2, topicMessageStreams0) - fail("should get an exception") - } - catch { - case e: ConsumerTimeoutException => // this is ok - println("This is ok") - case e => throw e - } - zkConsumerConnector0.shutdown - println("Sending messages for 1st consumer") // send some messages to each broker val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec) @@ -227,6 +208,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } + def testConsumerDecoder() { + val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers]) + requestHandlerLogger.setLevel(Level.FATAL) + + val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec). + map(m => Utils.toString(m.payload, "UTF-8")). + sortWith((s, t) => s.compare(t) == -1) + val consumerConfig = new ConsumerConfig( + TestUtils.createConsumerProperties(zkConnect, group, consumer1)) + + val zkConsumerConnector = + new ZookeeperConsumerConnector(consumerConfig, true) + val topicMessageStreams = + zkConsumerConnector.createMessageStreams( + Predef.Map(topic -> numNodes*numParts/2), new StringDecoder) + + var receivedMessages: List[String] = Nil + for ((topic, messageStreams) <- topicMessageStreams) { + for (messageStream <- messageStreams) { + val iterator = messageStream.iterator + for (i <- 0 until nMessages * 2) { + assertTrue(iterator.hasNext()) + val message = iterator.next() + receivedMessages ::= message + logger.debug("received message: " + message) + } + } + } + receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1) + assertEquals(sentMessages, receivedMessages) + + zkConsumerConnector.shutdown() + requestHandlerLogger.setLevel(Level.ERROR) + } + def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec): List[Message]= { var messages: List[Message] = Nil val producer = TestUtils.createProducer("localhost", conf.port) @@ -250,7 +266,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages.sortWith((s,t) => s.checksum < t.checksum) } - def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream]]): List[Message]= { + def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= { var messages: List[Message] = Nil for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { diff --git core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 066e8f7..270816b 100644 --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -31,6 +31,7 @@ import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTim import javax.management.NotCompliantMBeanException import org.apache.log4j.{Level, Logger} import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message} +import kafka.serializer.StringDecoder class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness { private val logger = Logger.getLogger(getClass()) @@ -125,24 +126,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompression() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers]) requestHandlerLogger.setLevel(Level.FATAL) - var actualMessages: List[Message] = Nil - - // test consumer timeout logic - val consumerConfig0 = new ConsumerConfig( - TestUtils.createConsumerProperties(zkConnect, group, consumer0)) { - override val consumerTimeoutMs = 200 - } - val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true) - val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2))) - try { - getMessages(nMessages*2, topicMessageStreams0) - fail("should get an exception") - } - catch { - case e: ConsumerTimeoutException => // this is ok - case e => throw e - } - zkConsumerConnector0.shutdown // send some messages to each broker val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec) @@ -224,6 +207,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } + def testConsumerDecoder() { + val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers]) + requestHandlerLogger.setLevel(Level.FATAL) + + val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec). + map(m => Utils.toString(m.payload, "UTF-8")). + sortWith((s, t) => s.compare(t) == -1) + val consumerConfig = new ConsumerConfig( + TestUtils.createConsumerProperties(zkConnect, group, consumer1)) + + val zkConsumerConnector = + new ZookeeperConsumerConnector(consumerConfig, true) + val topicMessageStreams = zkConsumerConnector.createMessageStreams( + Predef.Map(topic -> new java.lang.Integer(numNodes * numParts / 2)), new StringDecoder) + + var receivedMessages: List[String] = Nil + for ((topic, messageStreams) <- topicMessageStreams) { + for (messageStream <- messageStreams) { + val iterator = messageStream.iterator + for (i <- 0 until nMessages * 2) { + assertTrue(iterator.hasNext()) + val message = iterator.next() + receivedMessages ::= message + logger.debug("received message: " + message) + } + } + } + receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1) + assertEquals(sentMessages, receivedMessages) + + zkConsumerConnector.shutdown() + requestHandlerLogger.setLevel(Level.ERROR) + } + + def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= { var messages: List[Message] = Nil val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port)) @@ -247,7 +265,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages.sortWith((s,t) => s.checksum < t.checksum) } - def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream]]) + def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]]) : List[Message]= { var messages: List[Message] = Nil val topicMessageStreams = asMap(jTopicMessageStreams) diff --git examples/src/main/java/kafka/examples/Consumer.java examples/src/main/java/kafka/examples/Consumer.java index f63b0df..b0a330c 100644 --- examples/src/main/java/kafka/examples/Consumer.java +++ examples/src/main/java/kafka/examples/Consumer.java @@ -25,6 +25,8 @@ import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaMessageStream; import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.Message; +import kafka.serializer.DefaultDecoder; public class Consumer extends Thread { @@ -33,7 +35,8 @@ public class Consumer extends Thread public Consumer(String topic) { - consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); + consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + createConsumerConfig()); this.topic = topic; } @@ -53,9 +56,9 @@ public class Consumer extends Thread public void run() { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); - Map> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaMessageStream stream = consumerMap.get(topic).get(0); - ConsumerIterator it = stream.iterator(); + Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + KafkaMessageStream stream = consumerMap.get(topic).get(0); + ConsumerIterator it = stream.iterator(); while(it.hasNext()) System.out.println(ExampleUtils.getMessage(it.next())); } diff --git project/build/KafkaProject.scala project/build/KafkaProject.scala index a36b17b..4dd1025 100644 --- project/build/KafkaProject.scala +++ project/build/KafkaProject.scala @@ -129,6 +129,8 @@ class KafkaProject(info: ProjectInfo) extends ParentProject(info) with IdeaProje override def artifactID = "kafka-java-examples" override def filterScalaJars = false + override def javaCompileOptions = super.javaCompileOptions ++ + List(JavaCompileOption("-Xlint:unchecked")) } class KafkaPerfProject(info: ProjectInfo) extends DefaultProject(info)