From a5a46afbeb44b80e570ddb0e4e08c21337b278f8 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 14 May 2015 15:51:23 -0700 Subject: [PATCH 1/2] Use close(0) of Kafka producer to prevent reordering during shutdown on exception. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 9548521..1209646 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -18,7 +18,7 @@ package kafka.tools import java.util -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{TimeUnit, CountDownLatch} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.{Collections, Properties} @@ -222,7 +222,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { connector.setConsumerRebalanceListener(consumerRebalanceListener) } - // create Kafka streams + // create filters val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) else @@ -271,10 +271,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } info("Closing producer.") producer.close() - connectors.foreach(commitOffsets) - // Connector should only be shutdown after offsets are committed. - info("Shutting down consumer connectors.") - connectors.foreach(_.shutdown()) info("Kafka mirror maker shutdown successfully") } } @@ -306,7 +302,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val stream = streams(0) val iter = stream.iterator() - // TODO: Need to be changed after KAFKA-1660 is available. while (!exitingOnSendFailure && !shuttingDown) { try { while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) { @@ -326,6 +321,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { case t: Throwable => fatal("Mirror maker thread failure due to ", t) } finally { + info("Committing consumer offsets.") + commitOffsets(connector) + info("Shutting down consumer connectors.") + connector.shutdown() shutdownLatch.countDown() info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker @@ -388,6 +387,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def close() { this.producer.close() } + + def close(timeout: Long) { + this.producer.close(timeout, TimeUnit.MILLISECONDS) + } } private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte]) @@ -399,8 +402,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // still could not be sent. super.onCompletion(metadata, exception) // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on. - if (abortOnSendFailure) + if (abortOnSendFailure) { + info("Closing producer due to send failure.") exitingOnSendFailure = true + producer.close(0) + } numDroppedMessages.incrementAndGet() } } -- 1.8.3.4 (Apple Git-47) From d0d9f410e49b76919e09518d9e3d70fac1ec360e Mon Sep 17 00:00:00 2001 From: jqin Date: Fri, 15 May 2015 19:49:07 -0700 Subject: [PATCH 2/2] Should flush before commit offsets when mirror maker thread exits on exception. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 1209646..527c70d 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -321,6 +321,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { case t: Throwable => fatal("Mirror maker thread failure due to ", t) } finally { + producer.flush() info("Committing consumer offsets.") commitOffsets(connector) info("Shutting down consumer connectors.") -- 1.8.3.4 (Apple Git-47)