From 9005c578ef1567f66777b0a573853636b055d8bf Mon Sep 17 00:00:00 2001 From: becketqin Date: Tue, 23 Sep 2014 18:09:14 -0700 Subject: [PATCH 01/13] mirror maker redesign; adding byte bounded blocking queue. --- .../consumer/ZookeeperConsumerConnector.scala | 16 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 292 +++++++++++++++++---- .../kafka/utils/ByteBoundedBlockingQueue.scala | 158 +++++++++++ 3 files changed, 413 insertions(+), 53 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..6e1de37 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -285,17 +285,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def commitOffsets(isAutoCommit: Boolean = true) { + def commitOffsets(isAutoCommit: Boolean = true) = { + commitOffsets(isAutoCommit, null) + } + + def commitOffsets(isAutoCommit: Boolean, + topicPartitionOffsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]) { var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit var done = false while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors - val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => + val offsetsToCommit = if (topicPartitionOffsets == null) {immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => partitionTopicInfos.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } - }.toSeq:_*) + }.toSeq:_*)} else topicPartitionOffsets if (offsetsToCommit.size > 0) { if (config.offsetsStorage == "zookeeper") { @@ -702,7 +707,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case Some(f) => f.stopConnections clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) - info("Committing all offsets after clearing the fetcher queues") /** * here, we need to commit offsets before stopping the consumer from returning any more messages * from the current data chunk. Since partition ownership is not yet released, this commit offsets @@ -711,8 +715,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes * successfully and the fetchers restart to fetch more data chunks **/ - if (config.autoCommitEnable) + if (config.autoCommitEnable) { + info("Committing all offsets after clearing the fetcher queues") commitOffsets() + } case None => } } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..0d651e1 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,28 +17,39 @@ package kafka.tools -import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} +import java.lang.Thread.UncaughtExceptionHandler + +import com.yammer.metrics.core.Gauge +import kafka.common.{TopicAndPartition, OffsetAndMetadata} +import kafka.utils._ import kafka.consumer._ import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} +import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} import scala.collection.JavaConversions._ import joptsimple.OptionParser -import java.util.Random -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} +import java.util.{Properties} +import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.concurrent._ object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null + private var offsetCommitThread: OffsetCommitThread = null + private var isCleanShutdown: AtomicBoolean = new AtomicBoolean(false) + + private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long] + private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] = + new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory)) - private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes) def main(args: Array[String]) { @@ -82,6 +93,13 @@ object MirrorMaker extends Logging { .ofType(classOf[java.lang.Integer]) .defaultsTo(10000) + val bufferByteSizeOpt = parser.accepts("queue.byte.size", + "Number of bytes that are buffered between the consumer and producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100000000) + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") .withRequiredArg() @@ -94,6 +112,12 @@ object MirrorMaker extends Logging { .describedAs("Java regex (String)") .ofType(classOf[String]) + val offsetCommitIntervalMsOpt = parser.accepts("offsetCommitIntervalMs", + "interval to commit offsets") + .withRequiredArg() + .describedAs("offset commit interval in millisecond") + .ofType(classOf[java.lang.Integer]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -115,6 +139,8 @@ object MirrorMaker extends Logging { val numProducers = options.valueOf(numProducersOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() val bufferSize = options.valueOf(bufferSizeOpt).intValue() + val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue() + val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() // create consumer streams connectors = options.valuesOf(consumerConfigOpt).toList @@ -123,7 +149,7 @@ object MirrorMaker extends Logging { val numConsumers = connectors.size * numStreams // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) + val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numConsumers, numProducers) // create producer threads val useNewProducer = options.has(useNewProducerOpt) @@ -133,9 +159,9 @@ object MirrorMaker extends Logging { producerProps.setProperty("client.id", clientId + "-" + i) val producer = if (useNewProducer) - new NewShinyProducer(producerProps) + new MirrorMakerNewProducer(producerProps) else - new OldProducer(producerProps) + new MirrorMakerOldProducer(producerProps) new ProducerThread(mirrorDataChannel, producer, i) }) @@ -164,6 +190,11 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) + // create offset commit thread + if (useNewProducer) { + offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) + offsetCommitThread.start() + } // we wait on producer's shutdown latch instead of consumers // since the consumer threads can hit a timeout/other exception; @@ -173,22 +204,31 @@ object MirrorMaker extends Logging { } def cleanShutdown() { - if (connectors != null) connectors.foreach(_.shutdown) - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) - if (producerThreads != null) { - producerThreads.foreach(_.shutdown) - producerThreads.foreach(_.awaitShutdown) + if (isCleanShutdown.compareAndSet(false, true)) { + info("Start clean shutdown.") + info("Shutting down producer threads.") + if (producerThreads != null) { + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + } + info("Shutting down offset commit thread.") + if (offsetCommitThread != null) { + offsetCommitThread.shutdown + offsetCommitThread.awaitShutdown + } + info("Shutting down consumer threads.") + if (connectors != null) connectors.foreach(_.shutdown) + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + info("Kafka mirror maker shutdown successfully") } - info("Kafka mirror maker shutdown successfully") } - class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { + class DataChannel(capacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) extends KafkaMetricsGroup { - val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers) - for (i <- 0 until numConsumers) - queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) - - private val counter = new AtomicInteger(new Random().nextInt()) + val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numProducers) + val sizeFunction = (record: MirrorMakerRecord) => record.size + for (i <- 0 until numProducers) + queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](capacity, byteCapacity, Some(sizeFunction)) // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -197,16 +237,17 @@ object MirrorMaker extends Logging { private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") - - def put(record: ProducerRecord) { - // If the key of the message is empty, use round-robin to select the queue - // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue - val queueId = - if(record.key() != null) { - Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers - } else { - Utils.abs(counter.getAndIncrement()) % numConsumers - } + private val channelByteSizeHist = newHistogram("MirrorMaker-DataChannel-Byte-Size") + + def put(record: MirrorMakerRecord, qid: Int = -1) { + // If queue id is specified (only when producer thread shuttdown is called), use qid + // otherwise use hash of topic+partition to decide which queue to put the record in. + val queueId = { + if (qid == -1) + Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numProducers + else + qid + } val queue = queues(queueId) var putSucceed = false @@ -215,18 +256,20 @@ object MirrorMaker extends Logging { putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } - channelSizeHist.update(queue.size) + channelSizeHist.update(queues.map(q => q.size()).sum) + channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) } - def take(queueId: Int): ProducerRecord = { + def take(queueId: Int): MirrorMakerRecord = { val queue = queues(queueId) - var data: ProducerRecord = null + var data: MirrorMakerRecord = null while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } - channelSizeHist.update(queue.size) + channelSizeHist.update(queues.map(q => q.size()).sum) + channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) data } } @@ -246,15 +289,21 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val data = new MirrorMakerRecord(msgAndMetadata.topic, + msgAndMetadata.partition, + msgAndMetadata.offset, + msgAndMetadata.key, + msgAndMetadata.message) mirrorDataChannel.put(data) } } catch { - case e: Throwable => - fatal("Stream unexpectedly exited.", e) + case t: Throwable => + fatal("Consumer thread %s unexpectedly exited.".format(this.getName), t) } finally { shutdownLatch.countDown() info("Consumer thread stopped") + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) } } @@ -269,7 +318,7 @@ object MirrorMaker extends Logging { } class ProducerThread (val dataChannel: DataChannel, - val producer: BaseProducer, + val producer: MirrorMakerBaseProducer, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) @@ -277,32 +326,37 @@ object MirrorMaker extends Logging { setName(threadName) - override def run { + override def run() { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take(threadId) - trace("Sending message with value size %d".format(data.value().size)) + val data: MirrorMakerRecord = dataChannel.take(threadId) + trace("Sending message with value size %d".format(data.value.size)) if(data eq shutdownMessage) { info("Received shutdown message") return } - producer.send(data.topic(), data.key(), data.value()) + producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition), + data.sourceOffset, + data.key, + data.value) } } catch { - case t: Throwable => { - fatal("Producer thread failure due to ", t) - } + case t: Throwable => + fatal("Producer thread %s failure due to ".format(this.getName), t) } finally { shutdownComplete.countDown info("Producer thread stopped") + // if it exits accidentally, stop the entire mirror maker + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) } } def shutdown { try { info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage) + dataChannel.put(shutdownMessage, threadId) } catch { case ie: InterruptedException => { @@ -323,5 +377,147 @@ object MirrorMaker extends Logging { } } } + + class OffsetCommitThread(commitIntervalMs: Int) extends Thread with Logging with KafkaMetricsGroup { + private val threadName = "mirrormaker-offset-commit-thread" + private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + this.logIdent = "[%s]".format(threadName) + var shutdownFlag: Boolean = false + var commitCounter: Int = 0 + + this.setName(threadName) + + newGauge("MirrorMaker-Offset-Commit-Counter", + new Gauge[Int] { + def value = commitCounter + }) + + /** + * The offset commit thread is using the first ZookeeperConsumerConnector in the connectors. It is based on the + * assumption that all the consumer instances in mirror maker are in the same consumer group. The impact of this + * approach is that: + * 1. If offsets are commited to ZooKeeper, the checkpointedZkOffsets in the first ZookeeperConsumerConnector + * could be tainted by partitions that it does not own. But this problem exists now anyway since we are not + * cleaning checkpointedZKOffsets on consumer rebalance. + * 2. The offset commit could potentially slow down the first ZookeeperConsumerConnector if the offset commit + * is too big (which is not likely to happen). + * Ideally, committing offsets of a partition should be done by the ZookeeperConsumerConnector instance who is + * consuming from the partition. But implementation could be tricky when consumer rebalance occurs. It needs either + * the offset map in mirror maker to be updated on consumer rebalance or putting separate offset maps in each + * ZookeeperConsumerConnector. It seems not worth doing that. + */ + override def run() { + info("Starting mirror maker offset commit thread") + try { + while (shutdownFlag == false) { + Thread.sleep(commitIntervalMs) + commitOffset + } + } catch { + case t: Throwable => fatal("offset commit thread exits due to", t) + } finally { + swallow(commitOffset) + shutdownComplete.countDown() + info("offset commit thread exited") + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) + } + } + + private def commitOffset { + val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map { + case (topicPartition, partitionOffsetMap) => + topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) + }.toSeq: _*) + trace("committing offset: %s".format(offsetsToCommit)) + val connector = connectors.headOption match { + case None => + warn("No consumer available to commit offset.") + case Some(connector) => + connector.commitOffsets(false, offsetsToCommit) + commitCounter += 1 + } + } + + private def getOffsetToCommit(offsetsMap: Pool[Int, Long]): Long = { + val offsets = offsetsMap.map(_._2).toSeq.sorted + val iter = offsets.iterator + var offsetToCommit = iter.next() + while (iter.hasNext && offsetToCommit + 1 == iter.next()) + offsetToCommit += 1 + offsetToCommit + 1 + } + + def shutdown { + shutdownFlag = true + } + + def awaitShutdown { + try { + shutdownComplete.await + info("Offset commit thread shutdown complete") + } catch { + case ie: InterruptedException => { + warn("Shutdown of the offset commit thread interrupted") + } + } + } + } + + private[kafka] trait MirrorMakerBaseProducer { + def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) + def close() + } + + private class MirrorMakerNewProducer (val producerProps: Properties) + extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer { + + override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topicPartition.topic, key, value) + if(sync) { + topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset) + } else { + this.producer.send(record, + new MirrorMakerProducerCallback(topicPartition, offset, key, value)) + } + } + } + + private class MirrorMakerOldProducer (val producerProps: Properties) + extends OldProducer(producerProps) with MirrorMakerBaseProducer { + + override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { + super.send(topicPartition.topic, key, value) + } + + override def close() { + super.close() + } + } + + private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition, + val offset: Long, + val key: Array[Byte], + val value: Array[Byte]) + extends ErrorLoggingCallback(topicPartition.topic, key, value, false) { + + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception != null) { + super.onCompletion(metadata, exception) + } else { + info("updating offset:[%s] -> %d".format(topicPartition, offset)) + topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) + } + } + } + + private[kafka] class MirrorMakerRecord (val sourceTopic: String, + val sourcePartition: Int, + val sourceOffset: Long, + val key: Array[Byte], + val value: Array[Byte]) { + def size = value.length + {if (key == null) 0 else key.length} + } + } diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala new file mode 100644 index 0000000..2d2a00e --- /dev/null +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -0,0 +1,158 @@ +package kafka.utils + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} + +import scala.xml.Null + +/** + * A blocking queue that have size limits on both number of elements and number of bytes. + */ +class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) + extends Iterable[E] { + private val queue = new LinkedBlockingQueue[E] (queueSize) + private var currentByteSize = new AtomicInteger() + private val putLock = new Object + + /** + * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. + * The size check is only checking if the bytes in the queue reaches threshold or not. That means + * as long as current queue size is under the threshold, the element could be put into the queue, even + * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially + * lead to poor efficiency when the queue is almost full. + * @param e the element to put into the queue + * @param timeout the amount of time to wait before the expire the operation + * @param unit the time unit of timeout parameter, default to millisecond + * @return true if the element is put into queue, false if it is not + * @throws NullPointerException if element is null + */ + def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + val startTime = SystemTime.nanoseconds + val expireTime = startTime + unit.toNanos(timeout) + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + */ + def offer(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize) + putLock.wait() + success = queue.offer(e) + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Get an element from the head of queue. Wait for some time if the queue is empty. + * The poll method almost does not block on offer. + * @param timeout the amount of time to wait if the queue is empty + * @param unit the unit type + * @return the first element in the queue, null if queue is empty + */ + def poll(timeout: Long, unit: TimeUnit): E = { + var e = queue.poll(timeout, unit) + // only wake up waiting threads if the queue size drop under queueByteSize + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def poll(): E = { + var e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteSize + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Iterator for the queue + * @return Iterator for the queue + */ + override def iterator() = new Iterator[E] () { + private val iter = queue.iterator() + private var curr: E = null.asInstanceOf[E] + + def hasNext: Boolean = iter.hasNext + + def next: E = { + curr = iter.next() + curr + } + + def remove { + if (curr == null) + throw new IllegalArgumentException + iter.remove() + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + putLock.notify() + } + } + + /** + * get the number of elements in the queue + * @return number of elements in the queue + */ + override def size() = queue.size() + + /** + * get the current byte size in the queue + * @return current queue size in bytes + */ + def byteSize() = currentByteSize.get() + + /** + * get the number of unused slots in the queue + * @return the number of unused slots in the queue + */ + def remainingSize = queue.remainingCapacity() + + /** + * get the remaining bytes capacity of the queue + * @return the remaining bytes capacity of the queue + */ + def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) +} -- 1.8.3.4 (Apple Git-47) From 73a30a7bde3d395c25c8120950fb0f6e00a32cd3 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sun, 5 Oct 2014 00:48:09 -0700 Subject: [PATCH 02/13] Addressed Guozhang's comments. --- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 119 ++++++++++++--------- .../kafka/utils/ByteBoundedBlockingQueue.scala | 17 +++ 3 files changed, 90 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 78b7514..aa6fa31 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -483,9 +483,7 @@ class ReplicaManager(val config: KafkaConfig, val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) - partitionsToMakeFollower += partition - else + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, @@ -498,6 +496,10 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) } + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 0d651e1..71791ee 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,9 +17,7 @@ package kafka.tools -import java.lang.Thread.UncaughtExceptionHandler - -import com.yammer.metrics.core.Gauge +import com.yammer.metrics.core._ import kafka.common.{TopicAndPartition, OffsetAndMetadata} import kafka.utils._ import kafka.consumer._ @@ -27,19 +25,18 @@ import kafka.serializer._ import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback - import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} import scala.collection.JavaConversions._ import joptsimple.OptionParser -import java.util.{Properties} -import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent._ object MirrorMaker extends Logging { - private var connectors: Seq[ZookeeperConsumerConnector] = null + private var connector: ZookeeperConsumerConnector = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null private var offsetCommitThread: OffsetCommitThread = null @@ -57,8 +54,7 @@ object MirrorMaker extends Logging { val parser = new OptionParser val consumerConfigOpt = parser.accepts("consumer.config", - "Consumer config to consume from a source cluster. " + - "You may specify multiple of these.") + "Consumer config to consume from a source cluster.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) @@ -94,9 +90,9 @@ object MirrorMaker extends Logging { .defaultsTo(10000) val bufferByteSizeOpt = parser.accepts("queue.byte.size", - "Number of bytes that are buffered between the consumer and producer") + "Maximum bytes that can be buffered in each data channel queue") .withRequiredArg() - .describedAs("Queue size in terms of number of messages") + .describedAs("Data channel queue size in terms of number of bytes") .ofType(classOf[java.lang.Integer]) .defaultsTo(100000000) @@ -107,16 +103,16 @@ object MirrorMaker extends Logging { .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", - "Blacklist of topics to mirror.") - .withRequiredArg() - .describedAs("Java regex (String)") - .ofType(classOf[String]) + "Blacklist of topics to mirror.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) val offsetCommitIntervalMsOpt = parser.accepts("offsetCommitIntervalMs", - "interval to commit offsets") - .withRequiredArg() - .describedAs("offset commit interval in millisecond") - .ofType(classOf[java.lang.Integer]) + "Offset commit interval in ms") + .withRequiredArg() + .describedAs("offset commit interval in millisecond") + .ofType(classOf[java.lang.Integer]) val helpOpt = parser.accepts("help", "Print this message.") @@ -143,13 +139,13 @@ object MirrorMaker extends Logging { val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() // create consumer streams - connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) - .map(new ZookeeperConsumerConnector(_)) - val numConsumers = connectors.size * numStreams + connector = { + val consumerConfig = new ConsumerConfig(Utils.loadProps(options.valuesOf(consumerConfigOpt).head)) + new ZookeeperConsumerConnector(consumerConfig) + } // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numConsumers, numProducers) + val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numStreams, numProducers) // create producer threads val useNewProducer = options.has(useNewProducerOpt) @@ -173,14 +169,14 @@ object MirrorMaker extends Logging { var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil try { - streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder())).flatten + streams = connector.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder()) } catch { case t: Throwable => fatal("Unable to create stream - shutting down mirror maker.") - connectors.foreach(_.shutdown) + connector.shutdown } consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2)) - assert(consumerThreads.size == numConsumers) + assert(consumerThreads.size == numStreams) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -190,8 +186,15 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) + // create offset commit thread if (useNewProducer) { + /** + * The offset commit thread commits offsets based on the new producer's callback. Using offset commit thread + * and new producer guarantees there is no data loss when mirror maker is uncleanly shutdown. For old producer, + * the offset commit will be based on the consumer's offsets. When unclean shutdown occurs, it is possible that + * all the messages in data channel will be lost. + */ offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) offsetCommitThread.start() } @@ -206,19 +209,25 @@ object MirrorMaker extends Logging { def cleanShutdown() { if (isCleanShutdown.compareAndSet(false, true)) { info("Start clean shutdown.") + // Consumer threads will exit when isCleanShutdown is set. + info("Waiting consumer threads to shutdown.") + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + // After consumer threads exit, shutdown producer. info("Shutting down producer threads.") if (producerThreads != null) { producerThreads.foreach(_.shutdown) producerThreads.foreach(_.awaitShutdown) } + // offset commit thread should only be shutdown after producer threads are shutdown, so we don't lose offsets. info("Shutting down offset commit thread.") if (offsetCommitThread != null) { offsetCommitThread.shutdown offsetCommitThread.awaitShutdown } - info("Shutting down consumer threads.") - if (connectors != null) connectors.foreach(_.shutdown) - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + // connector can only be shutdown after offsets are committed. + info("Shutting down consumer connectors.") + if (connector != null) + connector.shutdown info("Kafka mirror maker shutdown successfully") } } @@ -226,9 +235,14 @@ object MirrorMaker extends Logging { class DataChannel(capacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) extends KafkaMetricsGroup { val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numProducers) + val channelSizeHists = new Array[Histogram](numProducers) + val channelByteSizeHists = new Array[Histogram](numProducers) val sizeFunction = (record: MirrorMakerRecord) => record.size - for (i <- 0 until numProducers) + for (i <- 0 until numProducers) { queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](capacity, byteCapacity, Some(sizeFunction)) + channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Size".format(i)) + channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Byte-Size".format(i)) + } // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -236,17 +250,15 @@ object MirrorMaker extends Logging { // time should be discounted by # threads. private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) - private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") - private val channelByteSizeHist = newHistogram("MirrorMaker-DataChannel-Byte-Size") - def put(record: MirrorMakerRecord, qid: Int = -1) { - // If queue id is specified (only when producer thread shuttdown is called), use qid + def put(record: MirrorMakerRecord, specifiedQueueId: Int = -1) { + // If queue id is specified (only when producer thread shutdown is called), use qid // otherwise use hash of topic+partition to decide which queue to put the record in. val queueId = { - if (qid == -1) + if (specifiedQueueId == -1) Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numProducers else - qid + specifiedQueueId } val queue = queues(queueId) @@ -256,8 +268,8 @@ object MirrorMaker extends Logging { putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } - channelSizeHist.update(queues.map(q => q.size()).sum) - channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) + channelSizeHists(queueId).update(queue.size()) + channelByteSizeHists(queueId).update(queue.byteSize()) } def take(queueId: Int): MirrorMakerRecord = { @@ -268,8 +280,8 @@ object MirrorMaker extends Logging { data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } - channelSizeHist.update(queues.map(q => q.size()).sum) - channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) + channelSizeHists(queueId).update(queue.size()) + channelByteSizeHists(queueId).update(queue.byteSize()) data } } @@ -288,7 +300,9 @@ object MirrorMaker extends Logging { override def run() { info("Starting mirror maker consumer thread " + threadName) try { - for (msgAndMetadata <- stream) { + val iter = stream.iterator() + while (isCleanShutdown.get() == false && iter.hasNext()) { + val msgAndMetadata = iter.next() val data = new MirrorMakerRecord(msgAndMetadata.topic, msgAndMetadata.partition, msgAndMetadata.offset, @@ -302,6 +316,10 @@ object MirrorMaker extends Logging { } finally { shutdownLatch.countDown() info("Consumer thread stopped") + + /**If the thread did not exit during a clean shutdown, then something went wrong. We just stop the + * entire mirror maker in this case because mirror maker has essentially stopped consuming from some partitions. + */ if (isCleanShutdown.compareAndSet(false, true)) System.exit(-1) } @@ -414,11 +432,11 @@ object MirrorMaker extends Logging { commitOffset } } catch { - case t: Throwable => fatal("offset commit thread exits due to", t) + case t: Throwable => fatal("Offset commit thread exits due to", t) } finally { swallow(commitOffset) shutdownComplete.countDown() - info("offset commit thread exited") + info("Offset commit thread exited") if (isCleanShutdown.compareAndSet(false, true)) System.exit(-1) } @@ -430,12 +448,11 @@ object MirrorMaker extends Logging { topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) }.toSeq: _*) trace("committing offset: %s".format(offsetsToCommit)) - val connector = connectors.headOption match { - case None => - warn("No consumer available to commit offset.") - case Some(connector) => - connector.commitOffsets(false, offsetsToCommit) - commitCounter += 1 + if (connector == null) { + warn("No consumer connector available to commit offset.") + } else { + connector.commitOffsets(false, offsetsToCommit) + commitCounter += 1 } } @@ -505,7 +522,7 @@ object MirrorMaker extends Logging { if (exception != null) { super.onCompletion(metadata, exception) } else { - info("updating offset:[%s] -> %d".format(topicPartition, offset)) + trace("updating offset:[%s] -> %d".format(topicPartition, offset)) topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) } } diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 2d2a00e..9c61e68 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.utils import java.util.concurrent.atomic.AtomicInteger -- 1.8.3.4 (Apple Git-47) From c781b820dfe54810641a7bab9c13549346ef7123 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 6 Oct 2014 10:15:31 -0700 Subject: [PATCH 03/13] Addressed Guozhang's comments --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 71791ee..1aca8a7 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -411,18 +411,7 @@ object MirrorMaker extends Logging { }) /** - * The offset commit thread is using the first ZookeeperConsumerConnector in the connectors. It is based on the - * assumption that all the consumer instances in mirror maker are in the same consumer group. The impact of this - * approach is that: - * 1. If offsets are commited to ZooKeeper, the checkpointedZkOffsets in the first ZookeeperConsumerConnector - * could be tainted by partitions that it does not own. But this problem exists now anyway since we are not - * cleaning checkpointedZKOffsets on consumer rebalance. - * 2. The offset commit could potentially slow down the first ZookeeperConsumerConnector if the offset commit - * is too big (which is not likely to happen). - * Ideally, committing offsets of a partition should be done by the ZookeeperConsumerConnector instance who is - * consuming from the partition. But implementation could be tricky when consumer rebalance occurs. It needs either - * the offset map in mirror maker to be updated on consumer rebalance or putting separate offset maps in each - * ZookeeperConsumerConnector. It seems not worth doing that. + * Use the connector to commit all the offsets. */ override def run() { info("Starting mirror maker offset commit thread") -- 1.8.3.4 (Apple Git-47) From c467544f34165dfd8d2aa22465e213a9baba15d1 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 15 Oct 2014 09:12:59 -0700 Subject: [PATCH 04/13] commit before switch to trunk --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 1aca8a7..04fcf0b 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -210,8 +210,11 @@ object MirrorMaker extends Logging { if (isCleanShutdown.compareAndSet(false, true)) { info("Start clean shutdown.") // Consumer threads will exit when isCleanShutdown is set. - info("Waiting consumer threads to shutdown.") - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + info("Shutting down consumer threads.") + if (consumerThreads != null) { + consumerThreads.foreach(_.shutdown) + consumerThreads.foreach(_.awaitShutdown) + } // After consumer threads exit, shutdown producer. info("Shutting down producer threads.") if (producerThreads != null) { @@ -294,6 +297,7 @@ object MirrorMaker extends Logging { private val shutdownLatch = new CountDownLatch(1) private val threadName = "mirrormaker-consumer-" + threadId this.logIdent = "[%s] ".format(threadName) + private var shutdownFlag: Boolean = false this.setName(threadName) @@ -301,7 +305,7 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { val iter = stream.iterator() - while (isCleanShutdown.get() == false && iter.hasNext()) { + while (!shutdownFlag && iter.hasNext()) { val msgAndMetadata = iter.next() val data = new MirrorMakerRecord(msgAndMetadata.topic, msgAndMetadata.partition, @@ -325,6 +329,10 @@ object MirrorMaker extends Logging { } } + def shutdown() { + shutdownFlag = true + } + def awaitShutdown() { try { shutdownLatch.await() @@ -416,7 +424,7 @@ object MirrorMaker extends Logging { override def run() { info("Starting mirror maker offset commit thread") try { - while (shutdownFlag == false) { + while (!shutdownFlag) { Thread.sleep(commitIntervalMs) commitOffset } -- 1.8.3.4 (Apple Git-47) From a42732080ed4f86e04bda152bc29ba1cfe2203de Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 6 Nov 2014 10:18:58 -0800 Subject: [PATCH 05/13] commit before rebase --- .../kafka/consumer/ZookeeperConsumerConnector.scala | 2 +- core/src/main/scala/kafka/server/ReplicaManager.scala | 18 ++++++++++-------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 16 +++++++++------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 6e1de37..52d0985 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -285,7 +285,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def commitOffsets(isAutoCommit: Boolean = true) = { + def commitOffsets(isAutoCommit: Boolean = true) { commitOffsets(isAutoCommit, null) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index aa6fa31..9666299 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -483,23 +483,25 @@ class ReplicaManager(val config: KafkaConfig, val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) + partitionsToMakeFollower += partition + else stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - partition.topic, partition.partitionId, newLeaderBrokerId)) + partition.topic, partition.partitionId, newLeaderBrokerId)) case None => // The leader broker should always be present in the leaderAndIsrRequest. // If not, we should record the error message and abort the transition process for this partition - stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + - "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available") + stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller %d" + + " epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } - // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the - // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a - // data loss issue. See KAFKA-1647. - partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 04fcf0b..13bddcd 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -54,7 +54,7 @@ object MirrorMaker extends Logging { val parser = new OptionParser val consumerConfigOpt = parser.accepts("consumer.config", - "Consumer config to consume from a source cluster.") + "Embedded consumer config for consuming from the source cluster.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) @@ -190,10 +190,11 @@ object MirrorMaker extends Logging { // create offset commit thread if (useNewProducer) { /** - * The offset commit thread commits offsets based on the new producer's callback. Using offset commit thread - * and new producer guarantees there is no data loss when mirror maker is uncleanly shutdown. For old producer, - * the offset commit will be based on the consumer's offsets. When unclean shutdown occurs, it is possible that - * all the messages in data channel will be lost. + * The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, + * the offsets are updated upon the returned future metadata of the send() call; with the old producer, + * the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data + * loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer + * messages inside the data channel could be lost upon mirror maker unclean shutdown. */ offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) offsetCommitThread.start() @@ -235,14 +236,15 @@ object MirrorMaker extends Logging { } } - class DataChannel(capacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) extends KafkaMetricsGroup { + class DataChannel(messageCapacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) + extends KafkaMetricsGroup { val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numProducers) val channelSizeHists = new Array[Histogram](numProducers) val channelByteSizeHists = new Array[Histogram](numProducers) val sizeFunction = (record: MirrorMakerRecord) => record.size for (i <- 0 until numProducers) { - queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](capacity, byteCapacity, Some(sizeFunction)) + queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](messageCapacity, byteCapacity, Some(sizeFunction)) channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Size".format(i)) channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Byte-Size".format(i)) } -- 1.8.3.4 (Apple Git-47) From 346449bdfbad9d3900676db5a5dbab6f2da325cd Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 12 Nov 2014 09:44:09 -0800 Subject: [PATCH 06/13] Addressed Guozhang's comments on MaxInFlightRequests --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 2891200..1f336aa 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -161,6 +161,9 @@ object MirrorMaker extends Logging { // create producer threads val useNewProducer = options.has(useNewProducerOpt) + // MaxInFlightRequests should be set to 1 for producer if the order of the messages needs to be preserved. + // We are not force it to be one because in some use cases throughput might be important whereas order does + // not matter. val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) val clientId = producerProps.getProperty("client.id", "") producerThreads = (0 until numProducers).map(i => { -- 1.8.3.4 (Apple Git-47) From a84325a988ed641ca7429527e1bc827be7dfd97e Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 17 Nov 2014 18:41:12 -0800 Subject: [PATCH 07/13] Incorporated Guozhang's comments --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 1f336aa..4dc76a4 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -37,14 +37,19 @@ import java.util.concurrent._ /** * The mirror maker consists of three major modules: * Consumer Threads - The consumer threads consume messages from source Kafka cluster through - * ZookeeperConsumerConnector and put them into corresponding data channel queue based on hash of - * source topic-partition. This guarantees the message order in source partition is preserved. + * ZookeeperConsumerConnector and put them into corresponding data channel queue based on hash value + * of source topic-partitionId string. This guarantees the message order in source partition is + * preserved. * Producer Threads - Producer threads take messages out of data channel queues and send them to target cluster. Each * producer thread is bound to one data channel queue, so that the message order is preserved. * Data Channel - The data channel has multiple queues. The number of queue is same as number of producer threads. * * If new producer is used, the offset will be committed based on the new producer's callback. An offset map is * maintained and updated on each send() callback. A separate offset commit thread will commit the offset periodically. + * @note For mirror maker, MaxInFlightRequests of producer should be set to 1 for producer if the order of the messages + * needs to be preserved. Mirror maker also depends on the in-order delivery to guarantee no data loss. + * We are not force it to be 1 because in some use cases throughput might be important whereas out of order or + * minor data loss is acceptable. */ object MirrorMaker extends Logging { @@ -120,7 +125,7 @@ object MirrorMaker extends Logging { .describedAs("Java regex (String)") .ofType(classOf[String]) - val offsetCommitIntervalMsOpt = parser.accepts("offsetCommitIntervalMs", + val offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms", "Offset commit interval in ms") .withRequiredArg() .describedAs("offset commit interval in millisecond") @@ -161,9 +166,6 @@ object MirrorMaker extends Logging { // create producer threads val useNewProducer = options.has(useNewProducerOpt) - // MaxInFlightRequests should be set to 1 for producer if the order of the messages needs to be preserved. - // We are not force it to be one because in some use cases throughput might be important whereas order does - // not matter. val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) val clientId = producerProps.getProperty("client.id", "") producerThreads = (0 until numProducers).map(i => { @@ -389,7 +391,7 @@ object MirrorMaker extends Logging { } } catch { case t: Throwable => - fatal("Producer thread %s failure due to ".format(this.getName), t) + fatal("Producer thread failure due to ", t) } finally { shutdownComplete.countDown() info("Producer thread stopped") @@ -451,7 +453,7 @@ object MirrorMaker extends Logging { commitOffset() } } catch { - case t: Throwable => fatal("Offset commit thread exits due to", t) + case t: Throwable => fatal("Exits due to", t) } finally { swallow(commitOffset()) shutdownComplete.countDown() -- 1.8.3.4 (Apple Git-47) From b1d7257b567040affdc2b6856dc8aea8b33f7f28 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 19 Nov 2014 16:32:21 -0800 Subject: [PATCH 08/13] Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. --- .../consumer/ZookeeperConsumerConnector.scala | 26 ++++++++-- .../consumer/ConsumerRebalanceListener.java | 50 ++++++++++++++++++ .../consumer/ZookeeperConsumerConnector.scala | 4 ++ .../consumer/ZookeeperConsumerConnectorTest.scala | 59 +++++++++++++++++++++- 4 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e565c08..eed3ed9 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -28,6 +28,7 @@ import kafka.api._ import kafka.client.ClientUtils import kafka.cluster._ import kafka.common._ +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.metrics._ import kafka.network.BlockingChannel import kafka.serializer._ @@ -39,7 +40,7 @@ import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ - +import scala.collection.JavaConversions._ /** * This class handles the consumers interaction with zookeeper @@ -102,6 +103,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val offsetsChannelLock = new Object private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null + private var consumerRebalanceListener: ConsumerRebalanceListener = null // useful for tracking migration of consumers to store offsets in kafka private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) @@ -160,6 +162,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, wildcardStreamsHandler.streams } + def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) { + if (messageStreamCreated.get()) + throw new MessageStreamsExistException(this.getClass.getSimpleName + + " can only set consumer rebalance listener before creating streams",null) + consumerRebalanceListener = listener + } + private def createFetcher() { if (enableFetcher) fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) @@ -610,7 +619,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. **/ - info("exception during rebalance ", e) + error("exception during rebalance ", e) } info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { @@ -651,9 +660,18 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) - + if (consumerRebalanceListener != null) { + info("Calling beforeReleasingPartitions() from rebalance listener.") + consumerRebalanceListener.beforeReleasingPartitions( + if (topicRegistry.size == 0) + new java.util.HashMap[String, java.util.Set[java.lang.Integer]] + else + mapAsJavaMap(topicRegistry.map(topics => + topics._1 -> topics._2.keys + ).toMap).asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]] + ) + } releasePartitionOwnership(topicRegistry) - val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java new file mode 100644 index 0000000..b5b8832 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi.consumer; + +import kafka.common.TopicAndPartition; +import kafka.consumer.ConsumerThreadId; + +import java.util.Map; +import java.util.Set; + +/** + * This listener is used for execution of tasks defined by user when a consumer rebalance + * occurs in {@link kafka.consumer.ZookeeperConsumerConnector} + */ +public interface ConsumerRebalanceListener { + + /** + * This method is called after all the fetcher threads are stopped but before the + * ownership of partitions are released. Depending on whether auto offset commit is + * enabled or not, offsets may or may not have been committed. + * A specific use case for this callback is in mirror maker, where we have to turn off consumer offset + * auto commit to prevent potential data loss. Mirror maker follows a consume-then-produce pattern and it + * buffers the message in a data channel between consumer and producers. If consumer offset is enabled and + * right after an offset commit, mirror maker exits. When mirror maker restarts, it will consume from + * the last committed offset and the messages buffered in data channel will be lost. After disabling + * the offset auto commit, mirror maker needs to be notified so it can: + * 1. Clean up the messages in the data channel (as they will be re-consumed after rebalance) + * 2. Wait until all the messages already sent by producer are confirmed to be received by broker. + * 3. Commit offsets. + * All these works need to be done after the fetcher threads are stopped and before the consumer + * release its ownership of the partitions, i.e. in this callback. + */ + public void beforeReleasingPartitions(Map> partitionOwnership); + +} diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 1f98db5..fd37c2a 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -115,6 +115,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, underlying.commitOffsets(retryOnFailure) } + def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) { + underlying.setConsumerRebalanceListener(consumerRebalanceListener) + } + def shutdown() { underlying.shutdown } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index e1d8711..b39e58b 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -19,18 +19,20 @@ package kafka.consumer import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.server._ import scala.collection._ +import scala.collection.JavaConversions._ import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ import org.I0Itec.zkclient.ZkClient import kafka.utils._ -import kafka.producer.{ProducerConfig, KeyedMessage, Producer} +import kafka.producer.{KeyedMessage, Producer} import java.util.{Collections, Properties} import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ -import kafka.common.MessageStreamsExistException +import kafka.common.{TopicAndPartition, MessageStreamsExistException} class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -345,6 +347,49 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkClient.close() } + def testConsumerRebalanceListener() { + // Send messages to create topic + sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) + sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) + + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) + val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) + // Register consumer rebalance listener + val rebalanceListener1 = new TestConsumerRebalanceListener() + zkConsumerConnector1.setConsumerRebalanceListener(rebalanceListener1) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + // Check if rebalance listener is fired + assertEquals(true, rebalanceListener1.listenerCalled) + assertEquals(null, rebalanceListener1.partitionOwnership.get(topic)) + // reset the flag + rebalanceListener1.listenerCalled = false + + val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) + val expected_1 = List(("0", "group1_consumer1-0"), + ("1", "group1_consumer1-0")) + assertEquals(expected_1, actual_1) + + val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) + val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) + // Register consumer rebalance listener + val rebalanceListener2 = new TestConsumerRebalanceListener() + zkConsumerConnector2.setConsumerRebalanceListener(rebalanceListener2) + val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) + val expected_2 = List(("0", "group1_consumer1-0"), + ("1", "group1_consumer2-0")) + assertEquals(expected_2, actual_2) + + // Check if rebalance listener is fired + assertEquals(true, rebalanceListener1.listenerCalled) + assertEquals(Set[Int](0, 1), rebalanceListener1.partitionOwnership.get(topic)) + assertEquals(true, rebalanceListener2.listenerCalled) + assertEquals(null, rebalanceListener2.partitionOwnership.get(topic)) + + } + def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, @@ -420,4 +465,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } + private class TestConsumerRebalanceListener extends ConsumerRebalanceListener { + var listenerCalled: Boolean = false + var partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]] = null + + override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { + listenerCalled = true + this.partitionOwnership = partitionOwnership + } + } + } -- 1.8.3.4 (Apple Git-47) From b17271ba62de12616125d92b5a474a5c05a38f39 Mon Sep 17 00:00:00 2001 From: jqin Date: Thu, 20 Nov 2014 09:30:22 -0800 Subject: [PATCH 09/13] Added consumer rebalance listener to mirror maker, will test it later. --- .../consumer/ConsumerRebalanceListener.java | 14 ++----- core/src/main/scala/kafka/tools/MirrorMaker.scala | 49 ++++++++++++++++++++-- .../kafka/utils/ByteBoundedBlockingQueue.scala | 11 +++++ 3 files changed, 60 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java index b5b8832..facf509 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java @@ -33,17 +33,9 @@ public interface ConsumerRebalanceListener { * This method is called after all the fetcher threads are stopped but before the * ownership of partitions are released. Depending on whether auto offset commit is * enabled or not, offsets may or may not have been committed. - * A specific use case for this callback is in mirror maker, where we have to turn off consumer offset - * auto commit to prevent potential data loss. Mirror maker follows a consume-then-produce pattern and it - * buffers the message in a data channel between consumer and producers. If consumer offset is enabled and - * right after an offset commit, mirror maker exits. When mirror maker restarts, it will consume from - * the last committed offset and the messages buffered in data channel will be lost. After disabling - * the offset auto commit, mirror maker needs to be notified so it can: - * 1. Clean up the messages in the data channel (as they will be re-consumed after rebalance) - * 2. Wait until all the messages already sent by producer are confirmed to be received by broker. - * 3. Commit offsets. - * All these works need to be done after the fetcher threads are stopped and before the consumer - * release its ownership of the partitions, i.e. in this callback. + * This listener is initially added to prevent duplicate messages on consumer rebalance + * in mirror maker, where offset auto commit is disabled to prevent data loss. It could + * also be used in more general cases. */ public void beforeReleasingPartitions(Map> partitionOwnership); diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4dc76a4..1523ad6 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -19,6 +19,7 @@ package kafka.tools import com.yammer.metrics.core._ import kafka.common.{TopicAndPartition, OffsetAndMetadata} +import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.utils._ import kafka.consumer._ import kafka.serializer._ @@ -26,12 +27,13 @@ import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} +import org.apache.kafka.common.KafkaException import scala.collection.JavaConversions._ import joptsimple.OptionParser import java.util.Properties -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean} import java.util.concurrent._ /** @@ -51,7 +53,7 @@ import java.util.concurrent._ * We are not force it to be 1 because in some use cases throughput might be important whereas out of order or * minor data loss is acceptable. */ -object MirrorMaker extends Logging { +object MirrorMaker extends Logging with KafkaMetricsGroup { private var connector: ZookeeperConsumerConnector = null private var consumerThreads: Seq[ConsumerThread] = null @@ -62,9 +64,20 @@ object MirrorMaker extends Logging { private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long] private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] = new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory)) + // Track the messages unacked for consumer rebalance + private var numMessageUnacked: AtomicInteger = new AtomicInteger(0) + private var consumerRebalanceListener: MirrorMakerConsumerRebalanceListener = null + // This is to indicate whether the rebalance is going on so the producer callback knows if + // the rebalance latch needs to be pulled. + private var inRebalance: AtomicBoolean = new AtomicBoolean(false) private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes) + newGauge("MirrorMaker-Unacked-Messages", + new Gauge[Int] { + def value = numMessageUnacked.get() + }) + def main(args: Array[String]) { info ("Starting mirror maker") @@ -308,6 +321,10 @@ object MirrorMaker extends Logging { channelByteSizeHists(queueId).update(queue.byteSize()) data } + + def clear() { + queues.foreach(queue => queue.clear()) + } } class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], @@ -465,7 +482,7 @@ object MirrorMaker extends Logging { } } - private def commitOffset() { + def commitOffset() { val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map { case (topicPartition, partitionOffsetMap) => topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) @@ -520,6 +537,7 @@ object MirrorMaker extends Logging { } else { this.producer.send(record, new MirrorMakerProducerCallback(topicPartition, offset, key, value)) + numMessageUnacked.incrementAndGet() } } } @@ -544,11 +562,36 @@ object MirrorMaker extends Logging { override def onCompletion(metadata: RecordMetadata, exception: Exception) { if (exception != null) { + // Use default call back to log error super.onCompletion(metadata, exception) } else { trace("updating offset:[%s] -> %d".format(topicPartition, offset)) topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) + + // Notify the rebalance callback only when all the messages handed to producer are acked. + // There is a very slight chance that 1 message is held by producer thread and not handed to producer. + // That message might have duplicate. We are not handling that here. + if (numMessageUnacked.decrementAndGet() == 0 && inRebalance.get()) { + inRebalance synchronized {inRebalance.notify()} + } + } + } + } + + private class MirrorMakerConsumerRebalanceListener(dataChannel: DataChannel) extends ConsumerRebalanceListener { + + override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { + info("Clearing data channel.") + dataChannel.clear() + info("Waiting until all the messages are acked.") + inRebalance synchronized { + inRebalance.set(true) + if (numMessageUnacked.get() > 0) + inRebalance.wait() } + info("Committing offsets.") + offsetCommitThread.commitOffset() + inRebalance.set(true) } } diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 6a85d7e..26149af 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -216,4 +216,15 @@ class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueBy * @return the remaining bytes capacity of the queue */ def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get()) + + /** + * remove all the items in the queue + */ + def clear() { + putLock synchronized { + queue.clear() + currentByteSize.set(0) + putLock.notify() + } + } } -- 1.8.3.4 (Apple Git-47) From 19fa880a15476ab67a05cd3c6e05d6179a21d80b Mon Sep 17 00:00:00 2001 From: jqin Date: Sun, 23 Nov 2014 22:46:46 -0800 Subject: [PATCH 10/13] added custom config for consumer rebalance listener --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 1523ad6..b4ffab8 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -168,15 +168,26 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue() val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() - // create consumer streams - connector = { - val consumerConfig = new ConsumerConfig(Utils.loadProps(options.valuesOf(consumerConfigOpt).head)) - new ZookeeperConsumerConnector(consumerConfig) - } + // create consumer connector + val consumerConfigProps = Utils.loadProps(options.valuesOf(consumerConfigOpt).head) + val consumerConfig = new ConsumerConfig(consumerConfigProps) + connector = new ZookeeperConsumerConnector(consumerConfig) // create a data channel btw the consumers and the producers val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers) + // set consumer rebalance listener + val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener") + val consumerRebalanceListenerClass = { + if (customRebalanceListenerClass == null) + "kafka.tools.MirrorMakerConsumerRebalanceListener" + else + customRebalanceListenerClass + } + consumerRebalanceListener = + Utils.createObject[MirrorMakerConsumerRebalanceListener](consumerRebalanceListenerClass, mirrorDataChannel) + connector.setConsumerRebalanceListener(consumerRebalanceListener) + // create producer threads val useNewProducer = options.has(useNewProducerOpt) val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) @@ -578,7 +589,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - private class MirrorMakerConsumerRebalanceListener(dataChannel: DataChannel) extends ConsumerRebalanceListener { + class MirrorMakerConsumerRebalanceListener (dataChannel: DataChannel) extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { info("Clearing data channel.") -- 1.8.3.4 (Apple Git-47) From aa47b1a6f6402e5dbed79427de2771f606d3e423 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 24 Nov 2014 08:14:24 -0800 Subject: [PATCH 11/13] Add configurable consumer rebalance listener --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 39 +++++++++++------------ 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b4ffab8..4dacbc9 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -143,6 +143,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .withRequiredArg() .describedAs("offset commit interval in millisecond") .ofType(classOf[java.lang.Integer]) + .defaultsTo(60000) val helpOpt = parser.accepts("help", "Print this message.") @@ -178,14 +179,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // set consumer rebalance listener val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener") - val consumerRebalanceListenerClass = { - if (customRebalanceListenerClass == null) - "kafka.tools.MirrorMakerConsumerRebalanceListener" - else - customRebalanceListenerClass + consumerRebalanceListener = { + if (customRebalanceListenerClass == null) { + new MirrorMakerConsumerRebalanceListener(mirrorDataChannel) + } else + Utils.createObject[MirrorMakerConsumerRebalanceListener](customRebalanceListenerClass, mirrorDataChannel) } - consumerRebalanceListener = - Utils.createObject[MirrorMakerConsumerRebalanceListener](consumerRebalanceListenerClass, mirrorDataChannel) connector.setConsumerRebalanceListener(consumerRebalanceListener) // create producer threads @@ -202,6 +201,19 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { new ProducerThread(mirrorDataChannel, producer, i) }) + // create offset commit thread + if (useNewProducer) { + /** + * The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, + * the offsets are updated upon the returned future metadata of the send() call; with the old producer, + * the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data + * loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer + * messages inside the data channel could be lost upon mirror maker unclean shutdown. + */ + offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) + offsetCommitThread.start() + } + // create consumer threads val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) @@ -228,19 +240,6 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) - // create offset commit thread - if (useNewProducer) { - /** - * The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, - * the offsets are updated upon the returned future metadata of the send() call; with the old producer, - * the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data - * loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer - * messages inside the data channel could be lost upon mirror maker unclean shutdown. - */ - offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) - offsetCommitThread.start() - } - // we wait on producer's shutdown latch instead of consumers // since the consumer threads can hit a timeout/other exception; // but in this case the producer should still be able to shutdown -- 1.8.3.4 (Apple Git-47) From 6e3181800bfa2c5c3edda443ddd2fe8aa3106e7d Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 3 Dec 2014 14:56:01 -0800 Subject: [PATCH 12/13] Incorporated Guozhang's comments --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4dacbc9..53c5e25 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -89,6 +89,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .describedAs("config file") .ofType(classOf[String]) + // Please see note about MaxInflightRequests val producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.") .withRequiredArg() @@ -178,6 +179,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers) // set consumer rebalance listener + // Customized consumer rebalance listener should extends MirrorMakerConsumerRebalanceListener + // and take datachannel as argument. val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener") consumerRebalanceListener = { if (customRebalanceListenerClass == null) { -- 1.8.3.4 (Apple Git-47) From 4320aaa6dc7fa67c21459e391c56e9121e835829 Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 3 Dec 2014 15:02:06 -0800 Subject: [PATCH 13/13] Incorporated Guozhang's comments. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 53c5e25..e18703d 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -179,7 +179,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers) // set consumer rebalance listener - // Customized consumer rebalance listener should extends MirrorMakerConsumerRebalanceListener + // Customized consumer rebalance listener should extend MirrorMakerConsumerRebalanceListener // and take datachannel as argument. val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener") consumerRebalanceListener = { -- 1.8.3.4 (Apple Git-47)