From d75f7e256ade4ebe9ab9b32378e61aff02161773 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 17 Mar 2015 20:40:17 -0700 Subject: [PATCH 1/2] Follow-up patch for KAFKA-1997, fix a few bugs. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 11acc31..ebe65dd 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM * acks=all * retries=max integer * block.on.buffer.full=true + * max.in.flight.requests.per.connection=1 * 2. Consumer Settings * auto.commit.enable=false * 3. Mirror Maker Setting: @@ -57,7 +58,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) - // Track the messages unacked for consumer rebalance + // Track the messages not successfully sent by mirror maker. private var numDroppedMessages: AtomicInteger = new AtomicInteger(0) private var messageHandler: MirrorMakerMessageHandler = null private var offsetCommitIntervalMs = 0 @@ -83,7 +84,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("config file") .ofType(classOf[String]) - // Please see note about MaxInflightRequests val producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.") .withRequiredArg() @@ -182,6 +182,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString) maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true") maybeSetDefaultProperty(producerProps, "acks", "all") + maybeSetDefaultProperty(producerProps, "max.in.flight.requests.per.connection", "1") producer = new MirrorMakerProducer(producerProps) // Create consumer connector @@ -253,7 +254,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { messageHandler = { if (customMessageHandlerClass != null) { if (messageHandlerArgs != null) - Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, rebalanceListenerArgs) + Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) else Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { @@ -409,11 +410,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on. if (abortOnSendFailure) exitingOnSendFailure = true + numDroppedMessages.incrementAndGet() } } } - private class InternalRebalanceListener(connector: ZookeeperConsumerConnector, customRebalanceListener: Option[ConsumerRebalanceListener]) extends ConsumerRebalanceListener { -- 1.8.3.4 (Apple Git-47) From c9519fa56b56a52630c6c89597e30f76f6970c02 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 18 Mar 2015 11:16:57 -0700 Subject: [PATCH 2/2] Replaced producer config properties string with macros --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index ebe65dd..4f3c4c8 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -33,7 +33,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, Logging, Utils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata} /** * The mirror maker has the following architecture: @@ -179,10 +179,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // Defaults to no data loss settings. - maybeSetDefaultProperty(producerProps, "retries", Int.MaxValue.toString) - maybeSetDefaultProperty(producerProps, "block.on.buffer.full", "true") - maybeSetDefaultProperty(producerProps, "acks", "all") - maybeSetDefaultProperty(producerProps, "max.in.flight.requests.per.connection", "1") + maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString) + maybeSetDefaultProperty(producerProps, ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all") + maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") producer = new MirrorMakerProducer(producerProps) // Create consumer connector -- 1.8.3.4 (Apple Git-47)