diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d61b355..6b01b8e 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -161,6 +161,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .ofType(classOf[String]) .defaultsTo("true") + val dataConsistencyOpt = parser.accepts("data.consistency", + "Configure the mirror maker to guarantee data is consistent between clusters") + .withRequiredArg() + .describedAs("Guarantee data is consistent between clusters") + .ofType(classOf[String]) + .defaultsTo("true") + val helpOpt = parser.accepts("help", "Print this message.") if (args.length == 0) @@ -287,6 +294,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // Create and initialize message handler val customMessageHandlerClass = options.valueOf(messageHandlerOpt) val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt) + val dataConsistency = options.valueOf(dataConsistencyOpt).toBoolean messageHandler = { if (customMessageHandlerClass != null) { if (messageHandlerArgs != null) @@ -294,7 +302,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { - defaultMirrorMakerMessageHandler + if (dataConsistency) + defaultMirrorMakerMessageInorderHandler + else + defaultMirrorMakerMessageUnorderedHandler } } } catch { @@ -751,11 +762,18 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] } - private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + private[tools] object defaultMirrorMakerMessageUnorderedHandler extends MirrorMakerMessageHandler { override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, null, timestamp, record.key, record.value)) } } + private[tools] object defaultMirrorMakerMessageInorderHandler extends MirrorMakerMessageHandler { + override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { + val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp + Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.partition, timestamp, record.key, record.value)) + } + } + }