diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 26730c4..db03aab 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -29,6 +29,8 @@ import scala.collection.JavaConversions._ import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch} import joptsimple.OptionParser +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge object MirrorMaker extends Logging { @@ -114,7 +116,7 @@ object MirrorMaker extends Logging { val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // create data channel - val mirrorDataChannel = new ArrayBlockingQueue[ProducerRecord](bufferSize) + val mirrorDataChannel = new DataChannel(bufferSize) // create producer threads val producers = (1 to numProducers).map(_ => { @@ -178,10 +180,26 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } + class DataChannel(capacity: Int) extends KafkaMetricsGroup { + + val queue = new ArrayBlockingQueue[ProducerRecord](capacity) + + newGauge( + "MirrorMaker-DataChannelSize", + new Gauge[Int] { + def value = queue.size + } + ) + + def put(record: ProducerRecord) = queue.put(record) + + def take() = queue.take() + } + class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - mirrorDataChannel: BlockingQueue[ProducerRecord], - producers: Seq[BaseProducer], - threadId: Int) + mirrorDataChannel: DataChannel, + producers: Seq[BaseProducer], + threadId: Int) extends Thread with Logging { private val shutdownLatch = new CountDownLatch(1) @@ -226,7 +244,7 @@ object MirrorMaker extends Logging { } } - class ProducerThread (val dataChannel: BlockingQueue[ProducerRecord], + class ProducerThread (val dataChannel: DataChannel, val producer: BaseProducer, val threadId: Int) extends Thread with Logging { private val threadName = "mirrormaker-producer-" + threadId