From 9b46d2b115f6c8e3b1a93df881a0089c70e8718c Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 16:42:05 -0700 Subject: [PATCH] Fix for KAFKA-2047 Accelarate consumer bootstrap consumer rebalance in mirror maker. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 41 +++++++++++------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f3c4c8..a128d98 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{TopicFilter, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup @@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else new Blacklist(options.valueOf(blacklistOpt)) - // create a (connector->stream) sequence - val connectorStream = (0 until numStreams) map { - i => { - var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null - try { - // Creating just on stream per each connector instance - stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) - require(stream.size == 1) - } catch { - case t: Throwable => - fatal("Unable to create stream - shutting down mirror maker.", t) - connectors(i).shutdown() - } - connectors(i) -> stream(0) - } - } - // Create mirror maker threads mirrorMakerThreads = (0 until numStreams) map ( i => - new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i) + new MirrorMakerThread(connectors(i), filterSpec, i) ) // Create and initialize message handler @@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) { - properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue)) + val propertyValue = properties.getProperty(propertyName) + properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue)) if (properties.getProperty(propertyName) != defaultValue) - info("Property %s is overridden to %s - data loss or message reordering is possible.") + info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue)) } class MirrorMakerThread(connector: ZookeeperConsumerConnector, - stream: KafkaStream[Array[Byte], Array[Byte]], + filterSpec: TopicFilter, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId private val shutdownLatch: CountDownLatch = new CountDownLatch(1) @@ -312,6 +296,19 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { setName(threadName) override def run() { + // Create the stream first. + var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null + try { + // Creating just on stream per each connector instance + streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) + require(streams.size == 1) + } catch { + case t: Throwable => + fatal("Unable to create stream - shutting down mirror maker.", t) + System.exit(-1) + } + val stream = streams(0) + info("Starting mirror maker thread " + threadName) val iter = stream.iterator() try { -- 1.8.3.4 (Apple Git-47)