diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 19df3d5..0d35e07 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -116,16 +116,23 @@ object MirrorMaker extends Logging { val useNewProducer = options.has(useNewProducerOpt) val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) - // create data channel - val mirrorDataChannel = new DataChannel(bufferSize) - // create producer threads val producers = (1 to numProducers).map(_ => { - if (useNewProducer) - new NewShinyProducer(producerProps) - else - new OldProducer(producerProps) - }) + if (useNewProducer) + new NewShinyProducer(producerProps) + else + new OldProducer(producerProps) + }) + + // create consumer streams + connectors = options.valuesOf(consumerConfigOpt).toList + .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) + .map(new ZookeeperConsumerConnector(_)) + val numConsumers = connectors.size * numStreams + + // create data channel, here the consumers are the producers to the channel, + // and the producers are the consumers from the channel. + val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) producerThreads = new ListBuffer[ProducerThread]() var producerIndex: Int = 1 @@ -135,11 +142,6 @@ object MirrorMaker extends Logging { producerIndex += 1 } - // create consumer streams - connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) - .map(new ZookeeperConsumerConnector(_)) - val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) else @@ -154,6 +156,7 @@ object MirrorMaker extends Logging { connectors.foreach(_.shutdown) } consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2)) + assert(consumerThreads.size == numConsumers) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -181,7 +184,7 @@ object MirrorMaker extends Logging { info("Kafka mirror maker shutdown successfully") } - class DataChannel(capacity: Int) extends KafkaMetricsGroup { + class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { val queue = new ArrayBlockingQueue[ProducerRecord](capacity) @@ -192,25 +195,28 @@ object MirrorMaker extends Logging { } ) - private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.MILLISECONDS) - private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.MILLISECONDS) - + // We use a single meter for aggregated wait percentage for the data channel. + // Since meter is calculated as total_recorded_value / time_window and + // time_window is independent of the number of threads, each recorded wait + // time should be discounted by # threads. + private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) + private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) def put(record: ProducerRecord) { var putSucceed = false while (!putSucceed) { - val startPutTime = SystemTime.milliseconds + val startPutTime = SystemTime.nanoseconds putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) - waitPut.mark(SystemTime.milliseconds - startPutTime) + waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } } def take(): ProducerRecord = { var data: ProducerRecord = null while (data == null) { - val startTakeTime = SystemTime.milliseconds + val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) - waitTake.mark(SystemTime.milliseconds - startTakeTime) + waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } data }