From 29991e4e5b55396eeb29176571bb4bbd535e4bb6 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 19 Jan 2015 20:24:21 -0800 Subject: [PATCH 1/4] Patch for KAFKA-1840. Add a message handler in mirror maker. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 56 ++++++++++++++++++----- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5cbc810..37f35f0 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -68,6 +68,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var numUnackedMessages: AtomicInteger = new AtomicInteger(0) private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0) private var consumerRebalanceListener: ConsumerRebalanceListener = null + private var mirrorMakerMessageHandler: MirrorMakerMessageHandler = null // This is to indicate whether the rebalance is going on so the producer callback knows if // the flag indicates internal consumer rebalance callback is waiting for all the messages sent to be acked. private var waitingForMessageAcks: Boolean = false @@ -172,6 +173,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("A custom rebalance listener of type ConsumerRebalanceListener") .ofType(classOf[String]) + val mirrorMakerMessageHandlerOpt = parser.accepts("event.handler", + "The event handler to handle messages.") + .withRequiredArg() + .describedAs("An event handler of type MirrorMakerMessageHandler") + .ofType(classOf[String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -220,6 +227,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) connector.setConsumerRebalanceListener(consumerRebalanceListener) + val mirrorMakerMessageHandlerClass = options.valueOf(mirrorMakerMessageHandlerOpt) + mirrorMakerMessageHandler = { + if (mirrorMakerMessageHandlerClass != null) + Utils.createObject[MirrorMakerMessageHandler](mirrorMakerMessageHandlerClass) + else + new defaultMirrorMakerMessageHandler + } + // create producer threads val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) val useNewProducer = { @@ -401,12 +416,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val iter = stream.iterator() while (!shutdownFlag && iter.hasNext()) { val msgAndMetadata = iter.next() - val data = new MirrorMakerRecord(msgAndMetadata.topic, - msgAndMetadata.partition, - msgAndMetadata.offset, - msgAndMetadata.key(), - msgAndMetadata.message()) - mirrorDataChannel.put(data) + val data = mirrorMakerMessageHandler.handle(new MirrorMakerRecord(msgAndMetadata.topic, + msgAndMetadata.partition, + msgAndMetadata.offset, + msgAndMetadata.key(), + msgAndMetadata.message())) + for (record <- data) + mirrorDataChannel.put(record) } } catch { case e: Throwable => { @@ -627,19 +643,35 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { commitOffsets() // invoke custom consumer rebalance listener - if (customRebalanceListener.isDefined) + if (customRebalanceListener.get != null) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } } - private[kafka] class MirrorMakerRecord (val sourceTopic: String, - val sourcePartition: Int, - val sourceOffset: Long, - val key: Array[Byte], - val value: Array[Byte]) { + class MirrorMakerRecord (val sourceTopic: String, + val sourcePartition: Int, + val sourceOffset: Long, + val key: Array[Byte], + val value: Array[Byte]) { def size = value.length + {if (key == null) 0 else key.length} } + trait MirrorMakerMessageHandler { + /** + * Handle a given record and return a list of mirror maker record. This handler is intended + * to provide the convenience of some simple tasks in mirror maker, e.g. filtering. + * @param record The record to be handled + * @return A list of records to produce + */ + def handle(record: MirrorMakerRecord): List[MirrorMakerRecord] + } + + private class defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + override def handle(record: MirrorMakerRecord): List[MirrorMakerRecord] = { + List[MirrorMakerRecord](record) + } + } + private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) { } -- 1.8.3.4 (Apple Git-47) From b5b753552e9ede54fb7155cb9f68ee41ce9254bd Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 20 Jan 2015 11:35:43 -0800 Subject: [PATCH 2/4] Addressed Joel's comments --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 37f35f0..9c0c6cd 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -220,11 +220,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) val customRebalanceListener = { if (customRebalanceListenerClass != null) - Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass) + Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) else - null + None } - consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) + consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener) connector.setConsumerRebalanceListener(consumerRebalanceListener) val mirrorMakerMessageHandlerClass = options.valueOf(mirrorMakerMessageHandlerOpt) @@ -643,7 +643,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { commitOffsets() // invoke custom consumer rebalance listener - if (customRebalanceListener.get != null) + if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } } -- 1.8.3.4 (Apple Git-47) From 845f274a68dcdc5f41fdd52b14e5dd74fb9ddc7a Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 20 Jan 2015 11:35:43 -0800 Subject: [PATCH 3/4] Addressed Joel's comments --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d4be863..9693aff 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -648,7 +648,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { commitOffsets() // invoke custom consumer rebalance listener - if (customRebalanceListener.get != null) + if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } } -- 1.8.3.4 (Apple Git-47) From 1f3931c03fb4fe847d3ef95df75d777eaf5dcd25 Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 1 Feb 2015 00:12:17 -0800 Subject: [PATCH 4/4] Addressed previous reviews. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 37 ++++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 9693aff..414b71a 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -179,6 +179,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("An event handler of type MirrorMakerMessageHandler") .ofType(classOf[String]) + val mirrorMakerMessageHandlerArgsOpt = parser.accepts("message.handler.args", + "The arguments used to initialize message handler") + .withRequiredArg() + .describedAs("The arguments used to initialize message handler") + .ofType(classOf[String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -217,23 +223,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // set consumer rebalance listener // custom rebalance listener will be invoked after internal listener finishes its work. - val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) - val customRebalanceListener = { - if (customRebalanceListenerClass != null) - Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) - else - None + val customRebalanceListener = Option(options.valueOf(consumerRebalanceListenerOpt)).map { + customRebalanceListenerClass => Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass) } consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener) connector.setConsumerRebalanceListener(consumerRebalanceListener) - val mirrorMakerMessageHandlerClass = options.valueOf(mirrorMakerMessageHandlerOpt) - mirrorMakerMessageHandler = { - if (mirrorMakerMessageHandlerClass != null) - Utils.createObject[MirrorMakerMessageHandler](mirrorMakerMessageHandlerClass) - else - new defaultMirrorMakerMessageHandler - } + mirrorMakerMessageHandler = Option(options.valueOf(mirrorMakerMessageHandlerOpt)).map { + mirrorMakerMessageHandlerClass => Utils.createObject[MirrorMakerMessageHandler](mirrorMakerMessageHandlerClass) + }.getOrElse(DefaultMirrorMakerMessageHandler) + mirrorMakerMessageHandler.init(Option(options.valueOf(mirrorMakerMessageHandlerArgsOpt)).orNull) // create producer threads val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) @@ -553,7 +552,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, destPartition: Option[Int], key: Array[Byte], value: Array[Byte]) { val record = new ProducerRecord[Array[Byte], Array[Byte]](sourceTopicPartition.topic, - destPartition.map(Int.box).orNull, + destPartition.map(Int.box).orNull, key, value) if(sync) { @@ -664,6 +663,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { trait MirrorMakerMessageHandler { /** + * Initialize the handler. + * @param handlerArgs the arguments from commandline option --message.handler.args + */ + def init(handlerArgs: String) + /** * Handle a given record and return a list of mirror maker record. This handler is intended * to provide the convenience of some simple tasks in mirror maker, e.g. filtering. * @param record The record to be handled @@ -672,7 +676,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def handle(record: MirrorMakerRecord): List[MirrorMakerRecord] } - private class defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + private object DefaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + + override def init(handlerArgs: String) {} + override def handle(record: MirrorMakerRecord): List[MirrorMakerRecord] = { List[MirrorMakerRecord](record) } -- 1.8.3.4 (Apple Git-47)