From d75f7e256ade4ebe9ab9b32378e61aff02161773 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 17 Mar 2015 20:40:17 -0700 Subject: [PATCH] Follow-up patch for KAFKA-1997, fix a few bugs. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 11acc31..ebe65dd 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM * acks=all * retries=max integer * block.on.buffer.full=true + * max.in.flight.requests.per.connection=1 * 2. Consumer Settings * auto.commit.enable=false * 3. Mirror Maker Setting: @@ -57,7 +58,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) - // Track the messages unacked for consumer rebalance + // Track the messages not successfully sent by mirror maker. private var numDroppedMessages: AtomicInteger = new AtomicInteger(0) private var messageHandler: MirrorMakerMessageHandler = null private var offsetCommitIntervalMs = 0 @@ -83,7 +84,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("config file") .ofType(classOf[String]) - // Please see note about MaxInflightRequests val producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.") .withRequiredArg() @@ -182,6 +182,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString) maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true") maybeSetDefaultProperty(producerProps, "acks", "all") + maybeSetDefaultProperty(producerProps, "max.in.flight.requests.per.connection", "1") producer = new MirrorMakerProducer(producerProps) // Create consumer connector @@ -253,7 +254,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { messageHandler = { if (customMessageHandlerClass != null) { if (messageHandlerArgs != null) - Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs) + Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) else Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { @@ -409,11 +410,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on. if (abortOnSendFailure) exitingOnSendFailure = true + numDroppedMessages.incrementAndGet() } } } - private class InternalRebalanceListener(connector: ZookeeperConsumerConnector, customRebalanceListener: Option[ConsumerRebalanceListener]) extends ConsumerRebalanceListener { -- 1.8.3.4 (Apple Git-47)