From 29991e4e5b55396eeb29176571bb4bbd535e4bb6 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 19 Jan 2015 20:24:21 -0800 Subject: [PATCH] Patch for KAFKA-1840. Add a message handler in mirror maker. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 56 ++++++++++++++++++----- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5cbc810..37f35f0 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -68,6 +68,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var numUnackedMessages: AtomicInteger = new AtomicInteger(0) private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0) private var consumerRebalanceListener: ConsumerRebalanceListener = null + private var mirrorMakerMessageHandler: MirrorMakerMessageHandler = null // This is to indicate whether the rebalance is going on so the producer callback knows if // the flag indicates internal consumer rebalance callback is waiting for all the messages sent to be acked. private var waitingForMessageAcks: Boolean = false @@ -172,6 +173,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("A custom rebalance listener of type ConsumerRebalanceListener") .ofType(classOf[String]) + val mirrorMakerMessageHandlerOpt = parser.accepts("event.handler", + "The event handler to handle messages.") + .withRequiredArg() + .describedAs("An event handler of type MirrorMakerMessageHandler") + .ofType(classOf[String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -220,6 +227,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) connector.setConsumerRebalanceListener(consumerRebalanceListener) + val mirrorMakerMessageHandlerClass = options.valueOf(mirrorMakerMessageHandlerOpt) + mirrorMakerMessageHandler = { + if (mirrorMakerMessageHandlerClass != null) + Utils.createObject[MirrorMakerMessageHandler](mirrorMakerMessageHandlerClass) + else + new defaultMirrorMakerMessageHandler + } + // create producer threads val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) val useNewProducer = { @@ -401,12 +416,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val iter = stream.iterator() while (!shutdownFlag && iter.hasNext()) { val msgAndMetadata = iter.next() - val data = new MirrorMakerRecord(msgAndMetadata.topic, - msgAndMetadata.partition, - msgAndMetadata.offset, - msgAndMetadata.key(), - msgAndMetadata.message()) - mirrorDataChannel.put(data) + val data = mirrorMakerMessageHandler.handle(new MirrorMakerRecord(msgAndMetadata.topic, + msgAndMetadata.partition, + msgAndMetadata.offset, + msgAndMetadata.key(), + msgAndMetadata.message())) + for (record <- data) + mirrorDataChannel.put(record) } } catch { case e: Throwable => { @@ -627,19 +643,35 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { commitOffsets() // invoke custom consumer rebalance listener - if (customRebalanceListener.isDefined) + if (customRebalanceListener.get != null) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } } - private[kafka] class MirrorMakerRecord (val sourceTopic: String, - val sourcePartition: Int, - val sourceOffset: Long, - val key: Array[Byte], - val value: Array[Byte]) { + class MirrorMakerRecord (val sourceTopic: String, + val sourcePartition: Int, + val sourceOffset: Long, + val key: Array[Byte], + val value: Array[Byte]) { def size = value.length + {if (key == null) 0 else key.length} } + trait MirrorMakerMessageHandler { + /** + * Handle a given record and return a list of mirror maker record. This handler is intended + * to provide the convenience of some simple tasks in mirror maker, e.g. filtering. + * @param record The record to be handled + * @return A list of records to produce + */ + def handle(record: MirrorMakerRecord): List[MirrorMakerRecord] + } + + private class defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + override def handle(record: MirrorMakerRecord): List[MirrorMakerRecord] = { + List[MirrorMakerRecord](record) + } + } + private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) { } -- 1.8.3.4 (Apple Git-47)