From e9a1d19e7c62df22f4967bbd8d1a2144952bae99 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 24 Mar 2015 16:42:05 -0700 Subject: [PATCH 1/2] Fix for KAFKA-2047 Accelarate consumer bootstrap consumer rebalance in mirror maker. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 41 +++++++++++------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f3c4c8..a128d98 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConversions._ import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{TopicFilter, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup @@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else new Blacklist(options.valueOf(blacklistOpt)) - // create a (connector->stream) sequence - val connectorStream = (0 until numStreams) map { - i => { - var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null - try { - // Creating just on stream per each connector instance - stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) - require(stream.size == 1) - } catch { - case t: Throwable => - fatal("Unable to create stream - shutting down mirror maker.", t) - connectors(i).shutdown() - } - connectors(i) -> stream(0) - } - } - // Create mirror maker threads mirrorMakerThreads = (0 until numStreams) map ( i => - new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i) + new MirrorMakerThread(connectors(i), filterSpec, i) ) // Create and initialize message handler @@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) { - properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue)) + val propertyValue = properties.getProperty(propertyName) + properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue)) if (properties.getProperty(propertyName) != defaultValue) - info("Property %s is overridden to %s - data loss or message reordering is possible.") + info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue)) } class MirrorMakerThread(connector: ZookeeperConsumerConnector, - stream: KafkaStream[Array[Byte], Array[Byte]], + filterSpec: TopicFilter, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId private val shutdownLatch: CountDownLatch = new CountDownLatch(1) @@ -312,6 +296,19 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { setName(threadName) override def run() { + // Create the stream first. + var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null + try { + // Creating just on stream per each connector instance + streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) + require(streams.size == 1) + } catch { + case t: Throwable => + fatal("Unable to create stream - shutting down mirror maker.", t) + System.exit(-1) + } + val stream = streams(0) + info("Starting mirror maker thread " + threadName) val iter = stream.iterator() try { -- 1.8.3.4 (Apple Git-47) From d1682dd521a82b8d6cb2e28f326224fd823eaa80 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 25 Mar 2015 13:46:44 -0700 Subject: [PATCH 2/2] Addressed Guozhang's comment. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 24 +++++++++-------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index a128d98..93a1da2 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -22,18 +22,18 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.{Collections, Properties} -import scala.collection.JavaConversions._ - import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{TopicFilter, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, Logging, Utils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} + +import scala.collection.JavaConversions._ /** * The mirror maker has the following architecture: @@ -296,22 +296,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { setName(threadName) override def run() { - // Create the stream first. - var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null + info("Starting mirror maker thread " + threadName) try { + // Create the stream first. + var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null // Creating just on stream per each connector instance streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) require(streams.size == 1) - } catch { - case t: Throwable => - fatal("Unable to create stream - shutting down mirror maker.", t) - System.exit(-1) - } - val stream = streams(0) + val stream = streams(0) + val iter = stream.iterator() - info("Starting mirror maker thread " + threadName) - val iter = stream.iterator() - try { // TODO: Need to be changed after KAFKA-1660 is available. while (!exitingOnSendFailure && !shuttingDown) { try { -- 1.8.3.4 (Apple Git-47)