diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 26730c4..fdfb160 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,7 +17,7 @@ package kafka.tools -import kafka.utils.{Utils, CommandLineUtils, Logging} +import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} import kafka.consumer._ import kafka.serializer._ import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} @@ -26,9 +26,11 @@ import org.apache.kafka.clients.producer.ProducerRecord import scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._ -import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch} +import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} import joptsimple.OptionParser +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge object MirrorMaker extends Logging { @@ -73,7 +75,8 @@ object MirrorMaker extends Logging { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val bufferSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the consumer and producer") + val bufferSizeOpt = parser.accepts("queue.size", + "Number of messages that are buffered between the consumer and producer") .withRequiredArg() .describedAs("Queue size in terms of number of messages") .ofType(classOf[java.lang.Integer]) @@ -114,7 +117,7 @@ object MirrorMaker extends Logging { val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // create data channel - val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize) + val mirrorDataChannel = new DataChannel(bufferSize) // create producer threads val producers = (1 to numProducers).map(_ => { @@ -178,14 +181,31 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } + class DataChannel(capacity: Int) extends KafkaMetricsGroup { + + val queue = new ArrayBlockingQueue[ProducerRecord](capacity) + + newGauge( + "MirrorMaker-DataChannelSize", + new Gauge[Int] { + def value = queue.size + } + ) + + def put(record: ProducerRecord) = queue.put(record) + def put(record: ProducerRecord, timeout: Int): Boolean = queue.offer(record, timeout, TimeUnit.MILLISECONDS) + def take(timeout: Int): ProducerRecord = queue.poll(timeout, TimeUnit.MILLISECONDS) + } + class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - mirrorDataChannel: BlockingQueue[ProducerRecord], - producers: Seq[BaseProducer], - threadId: Int) - extends Thread with Logging { + mirrorDataChannel: DataChannel, + producers: Seq[BaseProducer], + threadId: Int) + extends Thread with Logging with KafkaMetricsGroup { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-consumer-" + threadId + private val waitMeter = newMeter(threadName + "-WaitOnQueuePercent", "percent", TimeUnit.MILLISECONDS) this.logIdent = "[%s] ".format(threadName) this.setName(threadName) @@ -199,7 +219,12 @@ object MirrorMaker extends Logging { if (msgAndMetadata.key == null) { trace("Send the non-keyed message the producer channel.") val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.message) - mirrorDataChannel.put(data) + var putSucceed = false + while (!putSucceed) { + val startPutTime = SystemTime.milliseconds + putSucceed = mirrorDataChannel.put(data, 500) + waitMeter.mark(SystemTime.milliseconds - startPutTime) + } } else { val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) @@ -226,11 +251,12 @@ object MirrorMaker extends Logging { } } - class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], + class ProducerThread (val dataChannel: DataChannel, val producer: BaseProducer, - val threadId: Int) extends Thread with Logging { + val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + private val waitMeter = newMeter(threadName + "-WaitOnQueuePercent", "percent", TimeUnit.NANOSECONDS) this.logIdent = "[%s] ".format(threadName) setName(threadName) @@ -239,14 +265,18 @@ object MirrorMaker extends Logging { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take - trace("Sending message with value size %d".format(data.value().size)) - - if(data eq shutdownMessage) { - info("Received shutdown message") - return + val startTakeTime = SystemTime.nanoseconds + val data: ProducerRecord = dataChannel.take(500) + waitMeter.mark(SystemTime.nanoseconds - startTakeTime) + + if (data != null) { + trace("Sending message with value size %d".format(data.value().size)) + if(data eq shutdownMessage) { + info("Received shutdown message") + return + } + producer.send(data.topic(), data.key(), data.value()) } - producer.send(data.topic(), data.key(), data.value()) } } catch { case t: Throwable => {