diff --git a/core/src/main/scala/kafka/common/MessageStreamsExistException.scala b/core/src/main/scala/kafka/common/MessageStreamsExistException.scala deleted file mode 100644 index 68a2e07..0000000 --- a/core/src/main/scala/kafka/common/MessageStreamsExistException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.common - -/** - * Indicates a createMessageStreams can't be called more thane once -*/ -class MessageStreamsExistException(message: String, t: Throwable) extends RuntimeException(message, t) { -} diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index c032d26..1dde4fc 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -129,31 +129,31 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (config.autoCommitEnable) { scheduler.startup info("starting auto committer every " + config.autoCommitIntervalMs + " ms") - scheduler.schedule("kafka-consumer-autocommit", - autoCommit, + scheduler.schedule("kafka-consumer-autocommit", + autoCommit, delay = config.autoCommitIntervalMs, - period = config.autoCommitIntervalMs, + period = config.autoCommitIntervalMs, unit = TimeUnit.MILLISECONDS) } KafkaMetricsReporter.startReporters(config.props) def this(config: ConsumerConfig) = this(config, true) - - def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = + + def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String, List[KafkaStream[K,V]]] = { if (messageStreamCreated.getAndSet(true)) - throw new MessageStreamsExistException(this.getClass.getSimpleName + - " can create message streams at most once",null) + throw new RuntimeException(this.getClass.getSimpleName + + " can create message streams at most once") consume(topicCountMap, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, - numStreams: Int, - keyDecoder: Decoder[K] = new DefaultDecoder(), + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, + numStreams: Int, + keyDecoder: Decoder[K] = new DefaultDecoder(), valueDecoder: Decoder[V] = new DefaultDecoder()) = { val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder) wildcardStreamsHandler.streams @@ -921,10 +921,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val wildcardQueuesAndStreams = (1 to numStreams) .map(e => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) - val stream = new KafkaStream[K,V](queue, - config.consumerTimeoutMs, - keyDecoder, - valueDecoder, + val stream = new KafkaStream[K,V](queue, + config.consumerTimeoutMs, + keyDecoder, + valueDecoder, config.clientId) (queue, stream) }).toList @@ -978,3 +978,4 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardQueuesAndStreams.map(_._2) } } + diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 1f98db5..1f95d9b 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,10 +18,9 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ -import kafka.common.MessageStreamsExistException import scala.collection.mutable import scala.collection.JavaConversions -import java.util.concurrent.atomic.AtomicBoolean + /** * This class handles the consumers interaction with zookeeper @@ -64,7 +63,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, extends ConsumerConnector { private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) - private val messageStreamCreated = new AtomicBoolean(false) def this(config: ConsumerConfig) = this(config, true) @@ -75,9 +73,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { - if (messageStreamCreated.getAndSet(true)) - throw new MessageStreamsExistException(this.getClass.getSimpleName + - " can create message streams at most once",null) val scalaTopicCountMap: Map[String, Int] = { import JavaConversions._ Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int]) @@ -92,19 +87,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } ret } - + def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = { import JavaConversions._ underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = + def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) - - def createMessageStreamsByFilter(topicFilter: TopicFilter) = + + def createMessageStreamsByFilter(topicFilter: TopicFilter) = createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()) def commitOffsets() { diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 26730c4..19df3d5 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,7 +17,7 @@ package kafka.tools -import kafka.utils.{Utils, CommandLineUtils, Logging} +import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} import kafka.consumer._ import kafka.serializer._ import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} @@ -26,9 +26,11 @@ import org.apache.kafka.clients.producer.ProducerRecord import scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._ -import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch} +import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} import joptsimple.OptionParser +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge object MirrorMaker extends Logging { @@ -73,7 +75,8 @@ object MirrorMaker extends Logging { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer") + val bufferSizeOpt = parser.accepts("queue.size", + "Number of messages that are buffered between the consumer and producer") .withRequiredArg() .describedAs("Queue size in terms of number of messages") .ofType(classOf[java.lang.Integer]) @@ -114,7 +117,7 @@ object MirrorMaker extends Logging { val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // create data channel - val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize) + val mirrorDataChannel = new DataChannel(bufferSize) // create producer threads val producers = (1 to numProducers).map(_ => { @@ -178,11 +181,46 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } + class DataChannel(capacity: Int) extends KafkaMetricsGroup { + + val queue = new ArrayBlockingQueue[ProducerRecord](capacity) + + newGauge( + "MirrorMaker-DataChannel-Size", + new Gauge[Int] { + def value = queue.size + } + ) + + private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.MILLISECONDS) + private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.MILLISECONDS) + + + def put(record: ProducerRecord) { + var putSucceed = false + while (!putSucceed) { + val startPutTime = SystemTime.milliseconds + putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) + waitPut.mark(SystemTime.milliseconds - startPutTime) + } + } + + def take(): ProducerRecord = { + var data: ProducerRecord = null + while (data == null) { + val startTakeTime = SystemTime.milliseconds + data = queue.poll(500, TimeUnit.MILLISECONDS) + waitTake.mark(SystemTime.milliseconds - startTakeTime) + } + data + } + } + class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - mirrorDataChannel: BlockingQueue[ProducerRecord], - producers: Seq[BaseProducer], - threadId: Int) - extends Thread with Logging { + mirrorDataChannel: DataChannel, + producers: Seq[BaseProducer], + threadId: Int) + extends Thread with Logging with KafkaMetricsGroup { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-consumer-" + threadId @@ -226,9 +264,9 @@ object MirrorMaker extends Logging { } } - class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], + class ProducerThread (val dataChannel: DataChannel, val producer: BaseProducer, - val threadId: Int) extends Thread with Logging { + val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) this.logIdent = "[%s] ".format(threadName) @@ -241,7 +279,6 @@ object MirrorMaker extends Logging { while (true) { val data: ProducerRecord = dataChannel.take trace("Sending message with value size %d".format(data.value().size)) - if(data eq shutdownMessage) { info("Received shutdown message") return diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e1d8711..96fa0bd 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -5,7 +6,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -30,7 +31,6 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ -import kafka.common.MessageStreamsExistException class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -157,14 +157,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir) assertEquals(expected_2, actual_3) - // call createMesssageStreams twice should throw MessageStreamsExistException - try { - val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) - fail("Should fail with MessageStreamsExistException") - } catch { - case e: MessageStreamsExistException => // expected - } - zkConsumerConnector1.shutdown zkConsumerConnector2.shutdown zkConsumerConnector3.shutdown @@ -172,7 +164,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } - def testCompression() { val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler]) requestHandlerLogger.setLevel(Level.FATAL) @@ -367,10 +358,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar ms.toList } - def sendMessages(config: KafkaConfig, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, + def sendMessages(config: KafkaConfig, + messagesPerNode: Int, + header: String, + compression: CompressionCodec, numParts: Int): List[String]= { var messages: List[String] = Nil val props = new Properties() @@ -421,3 +412,5 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } } + + diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index d6248b0..20e8efe 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -27,7 +27,6 @@ import kafka.utils.IntEncoder import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness -import kafka.common.MessageStreamsExistException import scala.collection.JavaConversions @@ -70,24 +69,17 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) - // call createMesssageStreams twice should throw MessageStreamsExistException - try { - val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder()) - fail("Should fail with MessageStreamsExistException") - } catch { - case e: MessageStreamsExistException => // expected - } zkConsumerConnector1.shutdown info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, - messagesPerNode: Int, - header: String, + def sendMessages(conf: KafkaConfig, + messagesPerNode: Int, + header: String, compressed: CompressionCodec): List[String] = { var messages: List[String] = Nil - val producer: kafka.producer.Producer[Int, String] = + val producer: kafka.producer.Producer[Int, String] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), encoder = classOf[StringEncoder].getName, keyEncoder = classOf[IntEncoder].getName) @@ -102,8 +94,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages } - def sendMessages(messagesPerNode: Int, - header: String, + def sendMessages(messagesPerNode: Int, + header: String, compressed: CompressionCodec = NoCompressionCodec): List[String] = { var messages: List[String] = Nil for(conf <- configs) @@ -111,7 +103,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar messages } - def getMessages(nMessagesPerThread: Int, + def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { var messages: List[String] = Nil import scala.collection.JavaConversions._ @@ -134,5 +126,5 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val javaMap = new java.util.HashMap[String, java.lang.Integer]() scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer])) javaMap - } + } }