diff --git a/core/src/main/scala/kafka/common/MessageStreamsExistsException.scala b/core/src/main/scala/kafka/common/MessageStreamsExistsException.scala new file mode 100644 index 0000000..02c1af5 --- /dev/null +++ b/core/src/main/scala/kafka/common/MessageStreamsExistsException.scala @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.common + +/** + * Indicates a createMessageStreams can't be called more thane once +*/ +class MessageStreamsExistsException(message: String, t: Throwable) extends RuntimeException(message, t) { +} diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 1dde4fc..1f90b9e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -129,31 +129,31 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (config.autoCommitEnable) { scheduler.startup info("starting auto committer every " + config.autoCommitIntervalMs + " ms") - scheduler.schedule("kafka-consumer-autocommit", - autoCommit, + scheduler.schedule("kafka-consumer-autocommit", + autoCommit, delay = config.autoCommitIntervalMs, - period = config.autoCommitIntervalMs, + period = config.autoCommitIntervalMs, unit = TimeUnit.MILLISECONDS) } KafkaMetricsReporter.startReporters(config.props) def this(config: ConsumerConfig) = this(config, true) - - def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = + + def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String, List[KafkaStream[K,V]]] = { if (messageStreamCreated.getAndSet(true)) - throw new RuntimeException(this.getClass.getSimpleName + - " can create message streams at most once") + throw new MessageStreamsExistsException(this.getClass.getSimpleName + + " can create message streams at most once",null) consume(topicCountMap, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, - numStreams: Int, - keyDecoder: Decoder[K] = new DefaultDecoder(), + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, + numStreams: Int, + keyDecoder: Decoder[K] = new DefaultDecoder(), valueDecoder: Decoder[V] = new DefaultDecoder()) = { val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder) wildcardStreamsHandler.streams @@ -921,10 +921,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val wildcardQueuesAndStreams = (1 to numStreams) .map(e => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) - val stream = new KafkaStream[K,V](queue, - config.consumerTimeoutMs, - keyDecoder, - valueDecoder, + val stream = new KafkaStream[K,V](queue, + config.consumerTimeoutMs, + keyDecoder, + valueDecoder, config.clientId) (queue, stream) }).toList @@ -978,4 +978,3 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardQueuesAndStreams.map(_._2) } } - diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 1f95d9b..eec5b41 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,9 +18,10 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ +import kafka.common.MessageStreamsExistsException import scala.collection.mutable import scala.collection.JavaConversions - +import java.util.concurrent.atomic.AtomicBoolean /** * This class handles the consumers interaction with zookeeper @@ -63,6 +64,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, extends ConsumerConnector { private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) + private val messageStreamCreated = new AtomicBoolean(false) def this(config: ConsumerConfig) = this(config, true) @@ -73,6 +75,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { + if (messageStreamCreated.getAndSet(true)) + throw new MessageStreamsExistsException(this.getClass.getSimpleName + + " can create message streams at most once",null) val scalaTopicCountMap: Map[String, Int] = { import JavaConversions._ Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int]) @@ -87,19 +92,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } ret } - + def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = { import JavaConversions._ underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = + def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) - - def createMessageStreamsByFilter(topicFilter: TopicFilter) = + + def createMessageStreamsByFilter(topicFilter: TopicFilter) = createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()) def commitOffsets() { diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 96fa0bd..bcc7de1 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -1,4 +1,3 @@ - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -6,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -31,6 +30,7 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ +import kafka.common.MessageStreamsExistsException class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -157,6 +157,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir) assertEquals(expected_2, actual_3) + // call createMesssageStreams twice should throw MessageStreamsExistsException + try { + val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) + fail("Should fail with MessageStreamsExistsException") + } catch { + case e: MessageStreamsExistsException => // expected + } + zkConsumerConnector1.shutdown zkConsumerConnector2.shutdown zkConsumerConnector3.shutdown @@ -164,6 +172,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } + def testCompression() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) @@ -358,10 +367,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar ms.toList } - def sendMessages(config: KafkaConfig, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, + def sendMessages(config: KafkaConfig, + messagesPerNode: Int, + header: String, + compression: CompressionCodec, numParts: Int): List[String]= { var messages: List[String] = Nil val props = new Properties() @@ -412,5 +421,3 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } } - - diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 20e8efe..276fcad 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -27,6 +27,7 @@ import kafka.utils.IntEncoder import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness +import kafka.common.MessageStreamsExistsException import scala.collection.JavaConversions @@ -69,17 +70,24 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) + // call createMesssageStreams twice should throw MessageStreamsExistsException + try { + val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder()) + fail("Should fail with MessageStreamsExistsException") + } catch { + case e: MessageStreamsExistsException => // expected + } zkConsumerConnector1.shutdown info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, - messagesPerNode: Int, - header: String, + def sendMessages(conf: KafkaConfig, + messagesPerNode: Int, + header: String, compressed: CompressionCodec): List[String] = { var messages: List[String] = Nil - val producer: kafka.producer.Producer[Int, String] = + val producer: kafka.producer.Producer[Int, String] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) @@ -94,8 +102,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages } - def sendMessages(messagesPerNode: Int, - header: String, + def sendMessages(messagesPerNode: Int, + header: String, compressed: CompressionCodec = NoCompressionCodec): List[String] = { var messages: List[String] = Nil for(conf <- configs) @@ -103,7 +111,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages } - def getMessages(nMessagesPerThread: Int, + def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { var messages: List[String] = Nil import scala.collection.JavaConversions._ @@ -126,5 +134,5 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val javaMap = new java.util.HashMap[String, java.lang.Integer]() scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer])) javaMap - } + } }