From 0c4efcd71a1f6f902af5ac99b9977dc5f05e4ade Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 3 Mar 2015 14:17:32 -0800 Subject: [PATCH 1/5] Patch for KAFKA-1997: refactor mirror maker based on producer.flush() --- .../producer/internals/RecordAccumulator.java | 24 +- .../scala/kafka/consumer/PartitionAssignor.scala | 32 +- .../consumer/ZookeeperConsumerConnector.scala | 19 +- .../consumer/ConsumerRebalanceListener.java | 11 + core/src/main/scala/kafka/tools/MirrorMaker.scala | 608 ++++++--------------- .../kafka/consumer/PartitionAssignorTest.scala | 2 +- .../consumer/ZookeeperConsumerConnectorTest.scala | 60 +- 7 files changed, 274 insertions(+), 482 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d5c79e2..b6468f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -12,27 +12,10 @@ */ package org.apache.kafka.clients.producer.internals; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.*; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; @@ -44,6 +27,11 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + /** * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} * instances to be sent to the server. diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index e6ff768..b0470da 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -19,7 +19,7 @@ package kafka.consumer import org.I0Itec.zkclient.ZkClient import kafka.common.TopicAndPartition -import kafka.utils.{Utils, ZkUtils, Logging} +import kafka.utils.{Pool, Utils, ZkUtils, Logging} trait PartitionAssignor { @@ -28,7 +28,7 @@ trait PartitionAssignor { * @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong * to the given assignment-context's consumer. */ - def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId] + def assign(ctx: AssignmentContext): Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]] } @@ -69,7 +69,10 @@ class AssignmentContext(group: String, val consumerId: String, excludeInternalTo class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() + + val partitionOwnershipDecision = new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]]( + Some( (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId]) + ) if (ctx.consumersForTopic.size > 0) { // check conditions (a) and (b) @@ -102,11 +105,12 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { allTopicPartitions.foreach(topicPartition => { val threadId = threadAssignor.next() - if (threadId.consumer == ctx.consumerId) - partitionOwnershipDecision += (topicPartition -> threadId) + partitionOwnershipDecision.getAndMaybePut(threadId.consumer) += (topicPartition -> threadId) }) } - + // If this consumer owns no partitions, make sure an empty partition ownership assignment map is there + // for ZookeeperConsumerConnector to use. + partitionOwnershipDecision.getAndMaybePut(ctx.consumerId) partitionOwnershipDecision } } @@ -123,9 +127,10 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { class RangeAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]() - - for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) { + val partitionOwnershipDecision = new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]]( + Some( (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId]) + ) + for (topic <- ctx.myTopicThreadIds.keySet) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) @@ -135,7 +140,7 @@ class RangeAssignor() extends PartitionAssignor with Logging { info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers) - for (consumerThreadId <- consumerThreadIdSet) { + for (consumerThreadId <- curConsumers) { val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) @@ -152,12 +157,15 @@ class RangeAssignor() extends PartitionAssignor with Logging { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision - partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId) + partitionOwnershipDecision.getAndMaybePut(consumerThreadId.consumer) += + (TopicAndPartition(topic, partition) -> consumerThreadId) } } } } - + // If this consumer owns no partitions, make sure an empty partition ownership assignment map is there + // for ZookeeperConsumerConnector to use. + partitionOwnershipDecision.getAndMaybePut(ctx.consumerId) partitionOwnershipDecision } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 5487259..9c14652 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -682,7 +682,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) - val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) + val globalPartitionOwnershipDecision = partitionAssignor.assign(assignmentContext) + val partitionOwnershipDecision = globalPartitionOwnershipDecision.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) @@ -719,6 +720,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } topicRegistry = currentTopicRegistry + // Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set. + if (consumerRebalanceListener != null) { + info("Calling beforeStartingFetchers() from rebalance listener.") + consumerRebalanceListener.beforeStartingFetchers( consumerIdString, { + val partitionAssigment = globalPartitionOwnershipDecision.values.flatten.groupBy[String]( + partitionOwnership => partitionOwnership._1.topic + ).map({ + case (topic, partitionOwnerShips) => + topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnerShips.map({ + case (topicAndPartition, consumerThreadId) => + topicAndPartition.partition -> consumerThreadId + }).toSeq:_*)).asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]] + }) + mapAsJavaMap(collection.mutable.Map(partitionAssigment.toSeq:_*)) + }) + } updateFetcher(cluster) true } else { diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index 7f45a90..02fb456 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -17,6 +17,8 @@ package kafka.javaapi.consumer; + +import kafka.consumer.ConsumerThreadId; import java.util.Map; import java.util.Set; @@ -33,7 +35,16 @@ public interface ConsumerRebalanceListener { * This listener is initially added to prevent duplicate messages on consumer rebalance * in mirror maker, where offset auto commit is disabled to prevent data loss. It could * also be used in more general cases. + * @param partitionOwnership The partition this consumer currently owns. */ public void beforeReleasingPartitions(Map> partitionOwnership); + /** + * This method is called after the new partition assignment is finished but before fetcher + * threads start. A map of new global partition assignment is passed in as parameter. + * @param consumerId The consumer Id string of the consumer invoking this callback. + * @param partitionAssignment The global partition assignment of this consumer group. + */ + public void beforeStartingFetchers(String consumerId, Map> partitionAssignment); + } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5374280..8d98bc2 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,75 +17,52 @@ package kafka.tools +import java.util.Properties +import java.util.concurrent._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + import com.yammer.metrics.core._ -import kafka.common.{TopicAndPartition, OffsetAndMetadata} -import kafka.javaapi.consumer.ConsumerRebalanceListener -import kafka.utils._ +import joptsimple.OptionParser import kafka.consumer._ -import kafka.serializer._ -import kafka.producer.{KeyedMessage, ProducerConfig} +import kafka.javaapi.consumer.ConsumerRebalanceListener +import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup +import kafka.serializer._ +import kafka.utils._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord} - -import joptsimple.OptionParser -import java.util.Properties -import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} -import java.util.concurrent._ +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} /** - * The mirror maker consists of three major modules: - * Consumer Threads - The consumer threads consume messages from source Kafka cluster through - * ZookeeperConsumerConnector and put them into corresponding data channel queue based on hash value - * of source topic-partitionId string. This guarantees the message order in source partition is - * preserved. - * Producer Threads - Producer threads take messages out of data channel queues and send them to target cluster. Each - * producer thread is bound to one data channel queue, so that the message order is preserved. - * Data Channel - The data channel has multiple queues. The number of queue is same as number of producer threads. - * - * If new producer is used, the offset will be committed based on the new producer's callback. An offset map is - * maintained and updated on each send() callback. A separate offset commit thread will commit the offset periodically. - * @note For mirror maker, the following settings are required to make sure there is no data loss: + * The mirror maker has the following architecture: + * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream. + * - All the mirror maker threads share a same producer. + * - Each mirror maker thread commits all the offsets periodically by first flush the messages previously + * sent to producer. + + * @note For mirror maker, the following settings are set by default to make sure there is no data loss: * 1. use new producer with following settings * acks=all * retries=max integer * block.on.buffer.full=true * 2. Consumer Settings * auto.commit.enable=false - * If --no.data.loss flag is set in option, then those settings are automatically applied. + * 3. Mirror Maker Setting: + * abort.on.send.failure=true */ object MirrorMaker extends Logging with KafkaMetricsGroup { private var connector: ZookeeperConsumerConnector = null - private var consumerThreads: Seq[ConsumerThread] = null - private var producerThreads: Seq[ProducerThread] = null + private var producer: MirrorMakerProducer = null + private var mirrorMakerThreads: Seq[MirrorMakerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) - private val scheduler: KafkaScheduler = new KafkaScheduler(threads = 1) - - private val unackedOffsetsMap: Pool[TopicAndPartition, UnackedOffsets] = - new Pool[TopicAndPartition, UnackedOffsets](valueFactory = Some((k: TopicAndPartition) => new UnackedOffsets)) // Track the messages unacked for consumer rebalance - private var numUnackedMessages: AtomicInteger = new AtomicInteger(0) private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0) private var consumerRebalanceListener: ConsumerRebalanceListener = null - // This is to indicate whether the rebalance is going on so the producer callback knows if - // the flag indicates internal consumer rebalance callback is waiting for all the messages sent to be acked. - private var waitingForMessageAcks: Boolean = false - - private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes) - - newGauge("MirrorMaker-NumUnackedMessages", - new Gauge[Int] { - def value = numUnackedMessages.get() - }) - - // The number of unacked offsets in the unackedOffsetsMap - newGauge("MirrorMaker-UnackedOffsetListsSize", - new Gauge[Int] { - def value = unackedOffsetsMap.iterator.map{ - case(_, unackedOffsets) => unackedOffsets.size - }.sum - }) + private var messageHandler: MirrorMakerMessageHandler = null + private var lastOffsetCommitMs: Long = 0 + private var offsetCommitIntervalMs = 0 + private var abortOnSendFailure: Boolean = true + @volatile private var exitingOnSendFailure: Boolean = false // If a message send failed after retries are exhausted. The offset of the messages will also be removed from // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that @@ -96,8 +73,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { }) def main(args: Array[String]) { - - info ("Starting mirror maker") + + info("Starting mirror maker") val parser = new OptionParser val consumerConfigOpt = parser.accepts("consumer.config", @@ -113,19 +90,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("config file") .ofType(classOf[String]) - val useNewProducerOpt = parser.accepts("new.producer", - "Use the new producer implementation.") - - val noDataLossOpt = parser.accepts("no.data.loss", - "Configure the mirror maker to have no data loss.") - - val numProducersOpt = parser.accepts("num.producers", - "Number of producer instances") - .withRequiredArg() - .describedAs("Number of producers") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val numStreamsOpt = parser.accepts("num.streams", "Number of consumption streams.") .withRequiredArg() @@ -133,20 +97,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val bufferSizeOpt = parser.accepts("queue.size", - "Number of messages that are buffered between the consumer and producer") - .withRequiredArg() - .describedAs("Queue size in terms of number of messages") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10000) - - val bufferByteSizeOpt = parser.accepts("queue.byte.size", - "Maximum bytes that can be buffered in each data channel queue") - .withRequiredArg() - .describedAs("Data channel queue size in terms of number of bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100000000) - val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") .withRequiredArg() @@ -160,7 +110,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .ofType(classOf[String]) val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms", - "Offset commit interval in ms") + "Offset commit interval in ms") .withRequiredArg() .describedAs("offset commit interval in millisecond") .ofType(classOf[java.lang.Integer]) @@ -172,12 +122,39 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("A custom rebalance listener of type ConsumerRebalanceListener") .ofType(classOf[String]) + val rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args", + "Arguments used by custom rebalance listener for mirror maker consumer") + .withRequiredArg() + .describedAs("Arguments passed to custom rebalance listener constructor as a string.") + .ofType(classOf[String]) + + val messageHandlerOpt = parser.accepts("message.handler", + "The consumer rebalance listener to use for mirror maker consumer.") + .withRequiredArg() + .describedAs("A custom rebalance listener of type MirrorMakerMessageHandler") + .ofType(classOf[String]) + + val messageHandlerArgsOpt = parser.accepts("message.handler.args", + "Arguments used by custom rebalance listener for mirror maker consumer") + .withRequiredArg() + .describedAs("Arguments passed to message handler constructor.") + .ofType(classOf[String]) + .defaultsTo("") + + val abortOnSendFailureOpt = parser.accepts("abort.on.send.failure", + "Configure the mirror maker to exit on a failed send.") + .withRequiredArg() + .describedAs("Stop the entire mirror maker when a send failure occurs") + .ofType(classOf[String]) + .defaultsTo("true") + val helpOpt = parser.accepts("help", "Print this message.") - - if(args.length == 0) + + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.") - val options = parser.parse(args : _*) + + val options = parser.parse(args: _*) if (options.has(helpOpt)) { parser.printHelpOn(System.out) @@ -190,79 +167,56 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { System.exit(1) } - val numProducers = options.valueOf(numProducersOpt).intValue() + abortOnSendFailure = options.valueOf(abortOnSendFailureOpt).toBoolean + offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - val bufferSize = options.valueOf(bufferSizeOpt).intValue() - val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue() - val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() - - // create consumer connector + // Create consumer connector val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt)) - val noDataLoss = options.has(noDataLossOpt) - // disable consumer auto commit because offset will be committed by offset commit thread. - if (noDataLoss) - consumerConfigProps.setProperty("auto.commit.enable","false") + + // Disable consumer auto commit because offset will be committed by offset commit thread. + consumerConfigProps.setProperty("auto.commit.enable", "false") + // Set the consumer timeout so we will not block for low volume topics. + consumerConfigProps.setProperty("consumer.timeout.ms", "10000") val consumerConfig = new ConsumerConfig(consumerConfigProps) connector = new ZookeeperConsumerConnector(consumerConfig) - // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers) - // set consumer rebalance listener // custom rebalance listener will be invoked after internal listener finishes its work. val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) + val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt) val customRebalanceListener = { - if (customRebalanceListenerClass != null) - Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) - else + if (customRebalanceListenerClass != null) { + if (rebalanceListenerArgs != null) + Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) + else + Some(Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) + } else { None + } } - consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, customRebalanceListener) + consumerRebalanceListener = new InternalRebalanceListener(customRebalanceListener) connector.setConsumerRebalanceListener(consumerRebalanceListener) - // create producer threads - val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) - val useNewProducer = { - // Override producer settings if no.data.loss is set - if (noDataLoss) { - producerProps.setProperty("retries",Int.MaxValue.toString) - producerProps.setProperty("block.on.buffer.full", "true") - producerProps.setProperty("acks","all") - true - } else { - options.has(useNewProducerOpt) - } - } - val clientId = producerProps.getProperty("client.id", "") - producerThreads = (0 until numProducers).map(i => { - producerProps.setProperty("client.id", clientId + "-" + i) - val producer = - if (useNewProducer) { - producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer") - new MirrorMakerNewProducer(producerProps) - } + // Create and initialize message handler + val customMessageHanlerClass = options.valueOf(messageHandlerOpt) + val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt) + messageHandler = { + if (customMessageHanlerClass != null) + Utils.createObject[MirrorMakerMessageHandler](customMessageHanlerClass) else - new MirrorMakerOldProducer(producerProps) - new ProducerThread(mirrorDataChannel, producer, i) - }) - - // start offset commit thread - if (noDataLoss) { - /** - * The offset commit thread periodically commit consumed offsets. With the new producer, - * the offsets are updated upon the returned future metadata of the send() call; with the old producer, - * the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data - * loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer - * messages inside the data channel could be lost upon mirror maker unclean shutdown. - */ - scheduler.startup() - scheduler.schedule("offset-commit", commitOffsets, 0, offsetCommitIntervalMs, TimeUnit.MILLISECONDS) + defaultMirrorMakerMessageHandler } + messageHandler.init(messageHandlerArgs) - // create consumer threads + // create producer + val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) + // Defaults to no data loss settings. + producerProps.setProperty("retries", Int.MaxValue.toString) + producerProps.setProperty("block.on.buffer.full", "true") + producerProps.setProperty("acks", "all") + producer = new MirrorMakerProducer(producerProps) + + // create consumer connector and Kafka streams val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) else @@ -276,8 +230,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { fatal("Unable to create stream - shutting down mirror maker.", t) connector.shutdown() } - consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2)) - assert(consumerThreads.size == numStreams) + assert(streams.size == numStreams) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -285,14 +238,25 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } }) - consumerThreads.foreach(_.start) - producerThreads.foreach(_.start) + // Create mirror maker threads + mirrorMakerThreads = (0 until numStreams).map(i => + new MirrorMakerThread(streams(i), i) + ) + lastOffsetCommitMs = System.currentTimeMillis() + mirrorMakerThreads.foreach(_.start()) + mirrorMakerThreads.foreach(_.awaitShutdown()) + } - // we wait on producer's shutdown latch instead of consumers - // since the consumer threads can hit a timeout/other exception; - // but in this case the producer should still be able to shutdown - // based on the shutdown message in the channel - producerThreads.foreach(_.awaitShutdown()) + def commitOffsets() { + this.synchronized { + if (!exitingOnSendFailure) { + trace("Committing offsets.") + connector.commitOffsets + lastOffsetCommitMs = System.currentTimeMillis() + } else { + info("Exiting on send failure, skip committing offsets.") + } + } } def cleanShutdown() { @@ -300,19 +264,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Start clean shutdown.") // Shutdown consumer threads. info("Shutting down consumer threads.") - if (consumerThreads != null) { - consumerThreads.foreach(_.shutdown()) - consumerThreads.foreach(_.awaitShutdown()) - } - // After consumer threads exit, shutdown producer. - info("Shutting down producer threads.") - if (producerThreads != null) { - producerThreads.foreach(_.shutdown()) - producerThreads.foreach(_.awaitShutdown()) + if (mirrorMakerThreads != null) { + mirrorMakerThreads.foreach(_.shutdown()) + mirrorMakerThreads.foreach(_.awaitShutdown()) } - // offset commit thread should only be shutdown after producer threads are shutdown, so we don't lose offsets. - scheduler.shutdown() - swallow(commitOffsets()) + info("Closing producer.") + producer.close() + commitOffsets() // connector should only be shutdown after offsets are committed. info("Shutting down consumer connectors.") @@ -322,145 +280,35 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - class DataChannel(messageCapacity: Int, byteCapacity: Int, numInputs: Int, numOutputs: Int) - extends KafkaMetricsGroup { - - val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numOutputs) - val channelSizeHists = new Array[Histogram](numOutputs) - val channelByteSizeHists = new Array[Histogram](numOutputs) - val sizeFunction = (record: MirrorMakerRecord) => record.size - for (i <- 0 until numOutputs) { - queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](messageCapacity, byteCapacity, Some(sizeFunction)) - channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-NumMessages".format(i)) - channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Bytes".format(i)) - } - private val channelRecordSizeHist = newHistogram("MirrorMaker-DataChannel-Record-Size") - - // We use a single meter for aggregated wait percentage for the data channel. - // Since meter is calculated as total_recorded_value / time_window and - // time_window is independent of the number of threads, each recorded wait - // time should be discounted by # threads. - private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) - private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) - - def put(record: MirrorMakerRecord) { - // Use hash of source topic-partition to decide which queue to put the message in. The benefit is that - // we can maintain the message order for both keyed and non-keyed messages. - val queueId = - Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numOutputs - put(record, queueId) - } - - def put(record: MirrorMakerRecord, queueId: Int) { - val queue = queues(queueId) - - var putSucceed = false - while (!putSucceed) { - val startPutTime = SystemTime.nanoseconds - putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) - waitPut.mark((SystemTime.nanoseconds - startPutTime) / numInputs) - } - channelSizeHists(queueId).update(queue.size()) - channelByteSizeHists(queueId).update(queue.byteSize()) - channelRecordSizeHist.update(sizeFunction(record)) - } - - def take(queueId: Int): MirrorMakerRecord = { - val queue = queues(queueId) - var data: MirrorMakerRecord = null - while (data == null) { - val startTakeTime = SystemTime.nanoseconds - data = queue.poll(500, TimeUnit.MILLISECONDS) - waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numOutputs) - } - channelSizeHists(queueId).update(queue.size()) - channelByteSizeHists(queueId).update(queue.byteSize()) - data - } - - def clear() { - queues.foreach(queue => queue.clear()) - } - } - - class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], - mirrorDataChannel: DataChannel, - threadId: Int) - extends Thread with Logging with KafkaMetricsGroup { - - private val shutdownLatch = new CountDownLatch(1) - private val threadName = "mirrormaker-consumer-" + threadId - this.logIdent = "[%s] ".format(threadName) - private var shutdownFlag: Boolean = false - - this.setName(threadName) - - override def run() { - info("Starting mirror maker consumer thread " + threadName) - try { - val iter = stream.iterator() - while (!shutdownFlag && iter.hasNext()) { - val msgAndMetadata = iter.next() - val data = new MirrorMakerRecord(msgAndMetadata.topic, - msgAndMetadata.partition, - msgAndMetadata.offset, - msgAndMetadata.key(), - msgAndMetadata.message()) - mirrorDataChannel.put(data) - } - } catch { - case e: Throwable => { - fatal("Stream unexpectedly exited.", e) - } - } finally { - shutdownLatch.countDown() - info("Consumer thread stopped") - - // If it exits accidentally, stop the entire mirror maker. - if (!isShuttingdown.get()) { - fatal("Consumer thread exited abnormally, stopping the whole mirror maker.") - System.exit(-1) - } - } - } - - def shutdown() { - shutdownFlag = true - } - - def awaitShutdown() { - try { - shutdownLatch.await() - info("Consumer thread shutdown complete") - } catch { - case e: InterruptedException => fatal("Shutdown of the consumer thread interrupted. This might leak data!") - } - } - } - - class ProducerThread (val dataChannel: DataChannel, - val producer: MirrorMakerBaseProducer, - val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { - private val threadName = "mirrormaker-producer-" + threadId + class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], + val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { + private val threadName = "mirrormaker-thread-" + threadId private val shutdownLatch: CountDownLatch = new CountDownLatch(1) + @volatile private var shutdownFlag: Boolean = false this.logIdent = "[%s] ".format(threadName) setName(threadName) override def run() { - info("Starting mirror maker producer thread " + threadName) + info("Starting mirror maker thread " + threadName) + val iter = stream.iterator() try { - while (true) { - val data: MirrorMakerRecord = dataChannel.take(threadId) - trace("Sending message with value size %d".format(data.value.size)) - if(data eq shutdownMessage) { - info("Received shutdown message") - return + while (!shutdownFlag) { + try { + while (iter.hasNext()) { + val data = iter.next() + trace("Sending message with value size %d".format(data.message().size)) + val records = messageHandler.handle(data) + records.foreach(producer.send) + } + } catch { + case e: ConsumerTimeoutException => + trace("Caught ConsumerTimeoutException, continue iteration.") + } + if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { + producer.flush() + commitOffsets() } - producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition), - data.sourceOffset, - data.key, - data.value) } } catch { case t: Throwable => @@ -470,7 +318,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Producer thread stopped") // if it exits accidentally, stop the entire mirror maker if (!isShuttingdown.get()) { - fatal("Producer thread exited abnormally, stopping the whole mirror maker.") + fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.") System.exit(-1) } } @@ -478,8 +326,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def shutdown() { try { - info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage, threadId) + info(threadName + " shutting down") + shutdownFlag = true } catch { case ie: InterruptedException => { @@ -491,8 +339,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def awaitShutdown() { try { shutdownLatch.await() - producer.close() - info("Producer thread shutdown complete") + info("Mirror maker thread shutdown complete") } catch { case ie: InterruptedException => { warn("Shutdown of the producer thread interrupted") @@ -501,182 +348,79 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private def commitOffsets() { - try { - info("Committing offsets") - val offsetsToCommit = collection.immutable.Map(unackedOffsetsMap.map { - case (topicPartition, unackedOffsets) => - topicPartition -> OffsetAndMetadata(unackedOffsets.getOffsetToCommit, null) - }.toSeq: _*) - if (connector == null) { - warn("No consumer connector available to commit offset.") - } else { - connector.commitOffsets(offsetsToCommit, isAutoCommit = false) - } - } catch { - case e: OutOfMemoryError => - fatal("Shutting down mirror maker due to error when committing offsets.", e) - System.exit(-1) - case t: Throwable => - warn("Offsets commit failed due to ", t) - } - } - - private[kafka] trait MirrorMakerBaseProducer { - def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) - def close() - } - - private class MirrorMakerNewProducer (val producerProps: Properties) extends MirrorMakerBaseProducer { + private class MirrorMakerProducer(val producerProps: Properties) { val sync = producerProps.getProperty("producer.type", "async").equals("sync") val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) - override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) { - val record = new ProducerRecord[Array[Byte], Array[Byte]](sourceTopicPartition.topic, key, value) - if(sync) { + def send(record: ProducerRecord[Array[Byte], Array[Byte]]) { + if (sync) { this.producer.send(record).get() - unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset) } else { - - val unackedOffsets = unackedOffsetsMap.getAndMaybePut(sourceTopicPartition) - // synchronize to ensure that addOffset precedes removeOffset - unackedOffsets synchronized { - val unackedOffset = new UnackedOffset(sourceOffset) this.producer.send(record, - new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value)) - // add offset to unackedOffsets - unackedOffsets.addOffset(unackedOffset) - numUnackedMessages.incrementAndGet() - } + new MirrorMakerProducerCallback(record.topic(), record.key(), record.value())) } } - override def close() { - this.producer.close() - } - } - - private class MirrorMakerOldProducer (val producerProps: Properties) extends MirrorMakerBaseProducer { - - // default to byte array partitioner - if (producerProps.getProperty("partitioner.class") == null) - producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName) - val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps)) - - override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { - this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topicPartition.topic, key, value)) + def flush() { + this.producer.flush() } - override def close() { + def close() { this.producer.close() } } - private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition, - val offset: UnackedOffset, - val key: Array[Byte], - val value: Array[Byte]) - extends ErrorLoggingCallback(topicPartition.topic, key, value, false) { + private class MirrorMakerProducerCallback (topic: String, key: Array[Byte], value: Array[Byte]) + extends ErrorLoggingCallback(topic, key, value, false) { override def onCompletion(metadata: RecordMetadata, exception: Exception) { if (exception != null) { // Use default call back to log error. This means the max retries of producer has reached and message - // still could not be sent. In this case we have to remove the offsets from list to let the mirror maker - // move on. The message failed to be sent will be lost in target cluster. - warn("Not be able to send message, offset of "+ topicPartition + " will not advance. Total number" + - "of skipped unacked messages is" + numSkippedUnackedMessages.incrementAndGet()) + // still could not be sent. super.onCompletion(metadata, exception) - } else { - trace("Updating offset for %s to %d".format(topicPartition, offset)) - } - // remove the offset from the unackedOffsets - val unackedOffsets = unackedOffsetsMap.get(topicPartition) - unackedOffsets.removeOffset(offset) - // Notify the rebalance callback only when all the messages handed to producer are acked. - // There is a very slight chance that one message is held by producer thread and not handed to producer. - // That message might have duplicate. We are not handling that here. - numUnackedMessages synchronized { - if (numUnackedMessages.decrementAndGet() == 0 && waitingForMessageAcks) { - numUnackedMessages.notify() + // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on. + if (abortOnSendFailure) { + exitingOnSendFailure = true + cleanShutdown() } + warn("Not be able to send message to topic %s. Total number of skipped unacked messages is %d.".format( + topic, numSkippedUnackedMessages.incrementAndGet())) } } } - class InternalRebalanceListener (dataChannel: DataChannel, customRebalanceListener: Option[ConsumerRebalanceListener]) - extends ConsumerRebalanceListener { + + class InternalRebalanceListener(customRebalanceListener: Option[ConsumerRebalanceListener]) + extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { - info("Clearing data channel.") - dataChannel.clear() - info("Waiting until all the messages are acked.") - numUnackedMessages synchronized { - waitingForMessageAcks = true - while (numUnackedMessages.get() > 0) { - try { - numUnackedMessages.wait() - } catch { - case e: InterruptedException => info("Ignoring interrupt while waiting.") - } - } - waitingForMessageAcks = false - } - info("Committing offsets.") + producer.flush() commitOffsets() - // invoke custom consumer rebalance listener if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } - } - private[kafka] class MirrorMakerRecord (val sourceTopic: String, - val sourcePartition: Int, - val sourceOffset: Long, - val key: Array[Byte], - val value: Array[Byte]) { - def size = {if (value == null) 0 else value.length} + {if (key == null) 0 else key.length} + override def beforeStartingFetchers(consumerId: String, + partitionAssignment: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) { + if (customRebalanceListener.isDefined) + customRebalanceListener.get.beforeStartingFetchers(consumerId, partitionAssignment) + } } - private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) { + trait MirrorMakerMessageHandler { + def init(handlerArgs: String) + def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): List[ProducerRecord[Array[Byte], Array[Byte]]] } - private class UnackedOffsets { - val offsetList = new DoublyLinkedList[Long] - var maxOffsetSeen: Long = -1L - - def maybeUpdateMaxOffsetSeen(offset: Long) { - this synchronized { - maxOffsetSeen = math.max(maxOffsetSeen, offset) - } - } - - def addOffset(offset: DoublyLinkedListNode[Long]) { - this synchronized { - offsetList.add(offset) - maybeUpdateMaxOffsetSeen(offset.element) - } - } - - def removeOffset(offset: DoublyLinkedListNode[Long]) { - offsetList.remove(offset) + private object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + override def init(handlerArgs: String) {} + override def handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]): List[ProducerRecord[Array[Byte], Array[Byte]]] = { + List[ProducerRecord[Array[Byte], Array[Byte]]]( + new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key(), record.message())) } - - def getOffsetToCommit: Long = { - this synchronized { - val smallestUnackedOffset = offsetList.peek() - if (smallestUnackedOffset == null) - // list is empty, commit maxOffsetSeen + 1 - maxOffsetSeen + 1 - else - // commit the smallest unacked offset - smallestUnackedOffset.element - } - } - - def size: Int = offsetList.size } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala index 543070f..1910fcb 100644 --- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala @@ -164,7 +164,7 @@ private object PartitionAssignorTest extends Logging { verifyAssignmentIsUniform: Boolean = false) { val assignments = scenario.subscriptions.map{ case(consumer, subscription) => val ctx = new AssignmentContext("g1", consumer, excludeInternalTopics = true, zkClient) - assignor.assign(ctx) + assignor.assign(ctx).get(consumer) } // check for uniqueness (i.e., any partition should be assigned to exactly one consumer stream) diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index a17e853..e4feb96 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -17,22 +17,22 @@ package kafka.consumer +import java.util.Collections + import junit.framework.Assert._ +import kafka.common.MessageStreamsExistException import kafka.integration.KafkaServerTestHarness import kafka.javaapi.consumer.ConsumerRebalanceListener -import kafka.server._ -import scala.collection._ -import scala.collection.JavaConversions._ -import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ -import org.I0Itec.zkclient.ZkClient -import kafka.utils._ -import kafka.producer.{KeyedMessage, Producer} -import java.util.{Collections, Properties} -import org.apache.log4j.{Logger, Level} +import kafka.server._ import kafka.utils.TestUtils._ -import kafka.common.{TopicAndPartition, MessageStreamsExistException} +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite + +import scala.collection._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -360,10 +360,18 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // Check if rebalance listener is fired - assertEquals(true, rebalanceListener1.listenerCalled) + assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled) + assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled) assertEquals(null, rebalanceListener1.partitionOwnership.get(topic)) + // Check if partition assignment in rebalance listener is correct + assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer) + assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId) + assertEquals("consumer1", rebalanceListener1.consumerId) // reset the flag - rebalanceListener1.listenerCalled = false + rebalanceListener1.beforeReleasingPartitionsCalled = false + rebalanceListener1.beforeStartingFetchersCalled = false val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_1 = List(("0", "group1_consumer1-0"), @@ -383,10 +391,17 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(expected_2, actual_2) // Check if rebalance listener is fired - assertEquals(true, rebalanceListener1.listenerCalled) + assertEquals(true, rebalanceListener1.beforeReleasingPartitionsCalled) + assertEquals(true, rebalanceListener1.beforeStartingFetchersCalled) assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic)) - assertEquals(true, rebalanceListener2.listenerCalled) - assertEquals(null, rebalanceListener2.partitionOwnership.get(topic)) + // Check if global partition ownership in rebalance listener is correct + assertEquals("group1_consumer1", rebalanceListener1.globalPartitionOwnership.get(topic).get(0).consumer) + assertEquals("group1_consumer2", rebalanceListener1.globalPartitionOwnership.get(topic).get(1).consumer) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(0).threadId) + assertEquals(0, rebalanceListener1.globalPartitionOwnership.get(topic).get(1).threadId) + assertEquals("consuemr1", rebalanceListener1.consumerId) + assertEquals("consumer2", rebalanceListener2.consumerId) + assertEquals(rebalanceListener1.globalPartitionOwnership, rebalanceListener2.globalPartitionOwnership) zkConsumerConnector1.shutdown() zkConsumerConnector2.shutdown() } @@ -395,7 +410,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val children = zkClient.getChildren(path) Collections.sort(children) val childrenAsSeq : Seq[java.lang.String] = { - import JavaConversions._ + import scala.collection.JavaConversions._ children.toSeq } childrenAsSeq.map(partition => @@ -403,13 +418,22 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { - var listenerCalled: Boolean = false + var beforeReleasingPartitionsCalled: Boolean = false + var beforeStartingFetchersCalled: Boolean = false + var consumerId: String = ""; var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null + var globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]] = null override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { - listenerCalled = true + beforeReleasingPartitionsCalled = true this.partitionOwnership = partitionOwnership } + + override def beforeStartingFetchers(consumerId: String, globalPartitionOwnership: java.util.Map[String, java.util.Map[java.lang.Integer, ConsumerThreadId]]) { + beforeStartingFetchersCalled = true + this.consumerId = consumerId + this.globalPartitionOwnership = globalPartitionOwnership + } } } -- 1.8.3.4 (Apple Git-47) From cff66e91f1a3303cfedd8ab6ec760935d3a67f16 Mon Sep 17 00:00:00 2001 From: jqin Date: Tue, 3 Mar 2015 16:27:57 -0800 Subject: [PATCH 2/5] Addressed Guozhang's comments. --- .../clients/producer/internals/RecordAccumulator.java | 16 ++++++++++++++-- .../main/scala/kafka/consumer/PartitionAssignor.scala | 4 ++-- .../javaapi/consumer/ConsumerRebalanceListener.java | 3 ++- core/src/main/scala/kafka/tools/MirrorMaker.scala | 13 +++++++++---- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index b6468f9..88b4e4f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,7 +13,11 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.common.*; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -28,7 +32,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index b0470da..c7a56ea 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -25,8 +25,8 @@ trait PartitionAssignor { /** * Assigns partitions to consumer instances in a group. - * @return An assignment map of partition to consumer thread. This only includes assignments for threads that belong - * to the given assignment-context's consumer. + * @return An assignment map of partition to this consumer group. This includes assignments for threads that belong + * to the same consumer group. */ def assign(ctx: AssignmentContext): Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]] diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index 02fb456..f9fa0c2 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -43,7 +43,8 @@ public interface ConsumerRebalanceListener { * This method is called after the new partition assignment is finished but before fetcher * threads start. A map of new global partition assignment is passed in as parameter. * @param consumerId The consumer Id string of the consumer invoking this callback. - * @param partitionAssignment The global partition assignment of this consumer group. + * @param partitionAssignment A Map[topic, Map[Partition, ConsumerThreadId]]. It is the global partition + * assignment of this consumer group. */ public void beforeStartingFetchers(String consumerId, Map> partitionAssignment); diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 8d98bc2..564ca3e 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -300,15 +300,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { trace("Sending message with value size %d".format(data.message().size)) val records = messageHandler.handle(data) records.foreach(producer.send) + maybeCommitOffsets() } } catch { case e: ConsumerTimeoutException => trace("Caught ConsumerTimeoutException, continue iteration.") } - if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { - producer.flush() - commitOffsets() - } + maybeCommitOffsets() } } catch { case t: Throwable => @@ -324,6 +322,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + def maybeCommitOffsets() { + if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { + producer.flush() + commitOffsets() + } + } + def shutdown() { try { info(threadName + " shutting down") -- 1.8.3.4 (Apple Git-47) From ae1cf4232ba6e2e95e5040fcb0860b9ab28c36ac Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 4 Mar 2015 15:04:11 -0800 Subject: [PATCH 3/5] Changed the exit behavior on send failure because close(0) is not ready yet. Will submit followup patch after KAFKA-1660 is checked in. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 564ca3e..b2f26c8 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -232,7 +232,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } assert(streams.size == numStreams) - Runtime.getRuntime.addShutdownHook(new Thread() { + Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") { override def run() { cleanShutdown() } @@ -293,9 +293,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Starting mirror maker thread " + threadName) val iter = stream.iterator() try { - while (!shutdownFlag) { + while (!exitingOnSendFailure && !shutdownFlag) { try { - while (iter.hasNext()) { + while (!exitingOnSendFailure && iter.hasNext()) { val data = iter.next() trace("Sending message with value size %d".format(data.message().size)) val records = messageHandler.handle(data) @@ -386,10 +386,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // still could not be sent. super.onCompletion(metadata, exception) // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on. - if (abortOnSendFailure) { + if (abortOnSendFailure) exitingOnSendFailure = true - cleanShutdown() - } warn("Not be able to send message to topic %s. Total number of skipped unacked messages is %d.".format( topic, numSkippedUnackedMessages.incrementAndGet())) } @@ -397,7 +395,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } - class InternalRebalanceListener(customRebalanceListener: Option[ConsumerRebalanceListener]) + private class InternalRebalanceListener(customRebalanceListener: Option[ConsumerRebalanceListener]) extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { -- 1.8.3.4 (Apple Git-47) From 5177462d62c0be45a161c12ebff97f1a88d8d2f4 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 4 Mar 2015 15:41:43 -0800 Subject: [PATCH 4/5] Expanded imports from _ and * to full class path --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b2f26c8..5aad5a5 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -18,17 +18,19 @@ package kafka.tools import java.util.Properties -import java.util.concurrent._ +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import com.yammer.metrics.core._ +import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer._ + +import kafka.consumer.{ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Blacklist, Whitelist, ConsumerConfig, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup -import kafka.serializer._ -import kafka.utils._ +import kafka.serializer.DefaultDecoder +import kafka.utils.{Utils, CommandLineUtils, Logging} + import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} -- 1.8.3.4 (Apple Git-47) From e95183f3489ac067a490204889ccaacaff77dc23 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 5 Mar 2015 20:13:41 -0800 Subject: [PATCH 5/5] Incorporated Joel's comments. --- .../scala/kafka/consumer/PartitionAssignor.scala | 26 +++++-------- .../consumer/ZookeeperConsumerConnector.scala | 43 ++++++++++++---------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 35 ++++++++++-------- 3 files changed, 52 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala index c7a56ea..5066739 100644 --- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala +++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala @@ -70,9 +70,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val partitionOwnershipDecision = new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]]( - Some( (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId]) - ) + val valueFactory = (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId] + val partitionAssignment = + new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) if (ctx.consumersForTopic.size > 0) { // check conditions (a) and (b) @@ -105,13 +105,10 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { allTopicPartitions.foreach(topicPartition => { val threadId = threadAssignor.next() - partitionOwnershipDecision.getAndMaybePut(threadId.consumer) += (topicPartition -> threadId) + partitionAssignment.getAndMaybePut(threadId.consumer) += (topicPartition -> threadId) }) } - // If this consumer owns no partitions, make sure an empty partition ownership assignment map is there - // for ZookeeperConsumerConnector to use. - partitionOwnershipDecision.getAndMaybePut(ctx.consumerId) - partitionOwnershipDecision + partitionAssignment } } @@ -127,9 +124,9 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging { class RangeAssignor() extends PartitionAssignor with Logging { def assign(ctx: AssignmentContext) = { - val partitionOwnershipDecision = new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]]( - Some( (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId]) - ) + val valueFactory = (topic: String) => new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId] + val partitionAssignment = + new Pool[String, collection.mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory)) for (topic <- ctx.myTopicThreadIds.keySet) { val curConsumers = ctx.consumersForTopic(topic) val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic) @@ -157,15 +154,12 @@ class RangeAssignor() extends PartitionAssignor with Logging { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) // record the partition ownership decision - partitionOwnershipDecision.getAndMaybePut(consumerThreadId.consumer) += + partitionAssignment.getAndMaybePut(consumerThreadId.consumer) += (TopicAndPartition(topic, partition) -> consumerThreadId) } } } } - // If this consumer owns no partitions, make sure an empty partition ownership assignment map is there - // for ZookeeperConsumerConnector to use. - partitionOwnershipDecision.getAndMaybePut(ctx.consumerId) - partitionOwnershipDecision + partitionAssignment } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9c14652..f5a6e05 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -682,13 +682,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } releasePartitionOwnership(topicRegistry) val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) - val globalPartitionOwnershipDecision = partitionAssignor.assign(assignmentContext) - val partitionOwnershipDecision = globalPartitionOwnershipDecision.get(assignmentContext.consumerId) + val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) + val partitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse( + new collection.mutable.HashMap[TopicAndPartition, ConsumerThreadId] + ) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) // fetch current offsets for all topic-partitions - val topicPartitions = partitionOwnershipDecision.keySet.toSeq + val topicPartitions = partitionAssignment.keySet.toSeq val offsetFetchResponseOpt = fetchOffsets(topicPartitions) @@ -699,7 +701,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicPartitions.foreach(topicAndPartition => { val (topic, partition) = topicAndPartition.asTuple val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset - val threadId = partitionOwnershipDecision(topicAndPartition) + val threadId = partitionAssignment(topicAndPartition) addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) }) @@ -707,10 +709,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ - if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { - allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size + if(reflectPartitionOwnershipDecision(partitionAssignment)) { + allTopicsOwnedPartitionsCount = partitionAssignment.size - partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } + partitionAssignment.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } .foreach { case (topic, partitionThreadPairs) => newGauge("OwnedPartitionsCount", new Gauge[Int] { @@ -723,18 +725,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // Invoke beforeStartingFetchers callback if the consumerRebalanceListener is set. if (consumerRebalanceListener != null) { info("Calling beforeStartingFetchers() from rebalance listener.") - consumerRebalanceListener.beforeStartingFetchers( consumerIdString, { - val partitionAssigment = globalPartitionOwnershipDecision.values.flatten.groupBy[String]( - partitionOwnership => partitionOwnership._1.topic - ).map({ - case (topic, partitionOwnerShips) => - topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnerShips.map({ - case (topicAndPartition, consumerThreadId) => - topicAndPartition.partition -> consumerThreadId - }).toSeq:_*)).asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]] - }) - mapAsJavaMap(collection.mutable.Map(partitionAssigment.toSeq:_*)) + val partitionAssigmentMapForCallback = globalPartitionAssignment.values.flatten.groupBy[String]( + partitionOwnership => partitionOwnership._1.topic + ).map({ + case (topic, partitionOwnerShips) => + topic -> mapAsJavaMap(collection.mutable.Map(partitionOwnerShips.map({ + case (topicAndPartition, consumerThreadId) => + topicAndPartition.partition -> consumerThreadId + }).toSeq:_*)).asInstanceOf[java.util.Map[java.lang.Integer, ConsumerThreadId]] }) + consumerRebalanceListener.beforeStartingFetchers( + consumerIdString, + mapAsJavaMap(collection.mutable.Map(partitionAssigmentMapForCallback.toSeq:_*)) + ) } updateFetcher(cluster) true @@ -809,9 +812,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { + private def reflectPartitionOwnershipDecision(partitionAssignment: Map[TopicAndPartition, ConsumerThreadId]): Boolean = { var successfullyOwnedPartitions : List[(String, Int)] = Nil - val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => + val partitionOwnershipSuccessful = partitionAssignment.map { partitionOwner => val topic = partitionOwner._1.topic val partition = partitionOwner._1.partition val consumerThreadId = partitionOwner._2 diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5aad5a5..dc773f1 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -38,9 +38,8 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordM * The mirror maker has the following architecture: * - There are N mirror maker thread shares one ZookeeperConsumerConnector and each owns a Kafka stream. * - All the mirror maker threads share a same producer. - * - Each mirror maker thread commits all the offsets periodically by first flush the messages previously - * sent to producer. - + * - Each mirror maker thread periodically flush the producer and then commits all offsets. + * * @note For mirror maker, the following settings are set by default to make sure there is no data loss: * 1. use new producer with following settings * acks=all @@ -176,9 +175,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt)) // Disable consumer auto commit because offset will be committed by offset commit thread. - consumerConfigProps.setProperty("auto.commit.enable", "false") + setDefaultProperty(consumerConfigProps, "auto.commit.enable", "false") // Set the consumer timeout so we will not block for low volume topics. - consumerConfigProps.setProperty("consumer.timeout.ms", "10000") + setDefaultProperty(consumerConfigProps,"consumer.timeout.ms", "10000") val consumerConfig = new ConsumerConfig(consumerConfigProps) connector = new ZookeeperConsumerConnector(consumerConfig) @@ -200,11 +199,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { connector.setConsumerRebalanceListener(consumerRebalanceListener) // Create and initialize message handler - val customMessageHanlerClass = options.valueOf(messageHandlerOpt) + val customMessageHandlerClass = options.valueOf(messageHandlerOpt) val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt) messageHandler = { - if (customMessageHanlerClass != null) - Utils.createObject[MirrorMakerMessageHandler](customMessageHanlerClass) + if (customMessageHandlerClass != null) + Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) else defaultMirrorMakerMessageHandler } @@ -213,9 +212,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) // Defaults to no data loss settings. - producerProps.setProperty("retries", Int.MaxValue.toString) - producerProps.setProperty("block.on.buffer.full", "true") - producerProps.setProperty("acks", "all") + setDefaultProperty(producerProps, "retries", Int.MaxValue.toString) + setDefaultProperty(producerProps, "block.on.buffer.full", "true") + setDefaultProperty(producerProps, "acks", "all") producer = new MirrorMakerProducer(producerProps) // create consumer connector and Kafka streams @@ -250,7 +249,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def commitOffsets() { - this.synchronized { + MirrorMaker.synchronized { if (!exitingOnSendFailure) { trace("Committing offsets.") connector.commitOffsets @@ -282,6 +281,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + private def setDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) { + properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue)) + } + class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId @@ -302,13 +305,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { trace("Sending message with value size %d".format(data.message().size)) val records = messageHandler.handle(data) records.foreach(producer.send) - maybeCommitOffsets() + maybeFlushAndCommitOffsets() } } catch { case e: ConsumerTimeoutException => trace("Caught ConsumerTimeoutException, continue iteration.") } - maybeCommitOffsets() + maybeFlushAndCommitOffsets() } } catch { case t: Throwable => @@ -324,7 +327,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - def maybeCommitOffsets() { + def maybeFlushAndCommitOffsets() { if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) { producer.flush() commitOffsets() @@ -428,4 +431,4 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { new ProducerRecord[Array[Byte], Array[Byte]](record.topic, record.key(), record.message())) } } -} \ No newline at end of file +} -- 1.8.3.4 (Apple Git-47)