From 959364f0be7839d8ea225444d418c64941d5b323 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 6 May 2014 19:26:36 -0700 Subject: [PATCH] KAFKA-1179. createMessageStreams() in javaapi.ZookeeperConsumerConnector does not throw. --- .../javaapi/consumer/ZookeeperConsumerConnector.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 1f95d9b..523ff02 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -20,7 +20,7 @@ import kafka.serializer._ import kafka.consumer._ import scala.collection.mutable import scala.collection.JavaConversions - +import java.util.concurrent.atomic.AtomicBoolean /** * This class handles the consumers interaction with zookeeper @@ -63,6 +63,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, extends ConsumerConnector { private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) + private val messageStreamCreated = new AtomicBoolean(false) def this(config: ConsumerConfig) = this(config, true) @@ -73,6 +74,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { + if (messageStreamCreated.getAndSet(true)) + throw new RuntimeException(this.getClass.getSimpleName + + " can create message streams at most once") val scalaTopicCountMap: Map[String, Int] = { import JavaConversions._ Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int]) @@ -87,19 +91,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } ret } - + def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = { import JavaConversions._ underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = + def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) - - def createMessageStreamsByFilter(topicFilter: TopicFilter) = + + def createMessageStreamsByFilter(topicFilter: TopicFilter) = createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()) def commitOffsets() { -- 1.8.5.2 (Apple Git-48)