From 9005c578ef1567f66777b0a573853636b055d8bf Mon Sep 17 00:00:00 2001 From: becketqin Date: Tue, 23 Sep 2014 18:09:14 -0700 Subject: [PATCH 1/3] mirror maker redesign; adding byte bounded blocking queue. --- .../consumer/ZookeeperConsumerConnector.scala | 16 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 292 +++++++++++++++++---- .../kafka/utils/ByteBoundedBlockingQueue.scala | 158 +++++++++++ 3 files changed, 413 insertions(+), 53 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index fbc680f..6e1de37 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -285,17 +285,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def commitOffsets(isAutoCommit: Boolean = true) { + def commitOffsets(isAutoCommit: Boolean = true) = { + commitOffsets(isAutoCommit, null) + } + + def commitOffsets(isAutoCommit: Boolean, + topicPartitionOffsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]) { var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit var done = false while (!done) { val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors - val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => + val offsetsToCommit = if (topicPartitionOffsets == null) {immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => partitionTopicInfos.map { case (partition, info) => TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) } - }.toSeq:_*) + }.toSeq:_*)} else topicPartitionOffsets if (offsetsToCommit.size > 0) { if (config.offsetsStorage == "zookeeper") { @@ -702,7 +707,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, case Some(f) => f.stopConnections clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) - info("Committing all offsets after clearing the fetcher queues") /** * here, we need to commit offsets before stopping the consumer from returning any more messages * from the current data chunk. Since partition ownership is not yet released, this commit offsets @@ -711,8 +715,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes * successfully and the fetchers restart to fetch more data chunks **/ - if (config.autoCommitEnable) + if (config.autoCommitEnable) { + info("Committing all offsets after clearing the fetcher queues") commitOffsets() + } case None => } } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b8698ee..0d651e1 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,28 +17,39 @@ package kafka.tools -import kafka.utils.{SystemTime, Utils, CommandLineUtils, Logging} +import java.lang.Thread.UncaughtExceptionHandler + +import com.yammer.metrics.core.Gauge +import kafka.common.{TopicAndPartition, OffsetAndMetadata} +import kafka.utils._ import kafka.consumer._ import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer, BaseProducer} +import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} import scala.collection.JavaConversions._ import joptsimple.OptionParser -import java.util.Random -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, BlockingQueue, ArrayBlockingQueue, CountDownLatch} +import java.util.{Properties} +import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.concurrent._ object MirrorMaker extends Logging { private var connectors: Seq[ZookeeperConsumerConnector] = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null + private var offsetCommitThread: OffsetCommitThread = null + private var isCleanShutdown: AtomicBoolean = new AtomicBoolean(false) + + private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long] + private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] = + new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory)) - private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes) def main(args: Array[String]) { @@ -82,6 +93,13 @@ object MirrorMaker extends Logging { .ofType(classOf[java.lang.Integer]) .defaultsTo(10000) + val bufferByteSizeOpt = parser.accepts("queue.byte.size", + "Number of bytes that are buffered between the consumer and producer") + .withRequiredArg() + .describedAs("Queue size in terms of number of messages") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100000000) + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.") .withRequiredArg() @@ -94,6 +112,12 @@ object MirrorMaker extends Logging { .describedAs("Java regex (String)") .ofType(classOf[String]) + val offsetCommitIntervalMsOpt = parser.accepts("offsetCommitIntervalMs", + "interval to commit offsets") + .withRequiredArg() + .describedAs("offset commit interval in millisecond") + .ofType(classOf[java.lang.Integer]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -115,6 +139,8 @@ object MirrorMaker extends Logging { val numProducers = options.valueOf(numProducersOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() val bufferSize = options.valueOf(bufferSizeOpt).intValue() + val bufferByteSize = options.valueOf(bufferByteSizeOpt).intValue() + val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() // create consumer streams connectors = options.valuesOf(consumerConfigOpt).toList @@ -123,7 +149,7 @@ object MirrorMaker extends Logging { val numConsumers = connectors.size * numStreams // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) + val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numConsumers, numProducers) // create producer threads val useNewProducer = options.has(useNewProducerOpt) @@ -133,9 +159,9 @@ object MirrorMaker extends Logging { producerProps.setProperty("client.id", clientId + "-" + i) val producer = if (useNewProducer) - new NewShinyProducer(producerProps) + new MirrorMakerNewProducer(producerProps) else - new OldProducer(producerProps) + new MirrorMakerOldProducer(producerProps) new ProducerThread(mirrorDataChannel, producer, i) }) @@ -164,6 +190,11 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) + // create offset commit thread + if (useNewProducer) { + offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) + offsetCommitThread.start() + } // we wait on producer's shutdown latch instead of consumers // since the consumer threads can hit a timeout/other exception; @@ -173,22 +204,31 @@ object MirrorMaker extends Logging { } def cleanShutdown() { - if (connectors != null) connectors.foreach(_.shutdown) - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) - if (producerThreads != null) { - producerThreads.foreach(_.shutdown) - producerThreads.foreach(_.awaitShutdown) + if (isCleanShutdown.compareAndSet(false, true)) { + info("Start clean shutdown.") + info("Shutting down producer threads.") + if (producerThreads != null) { + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + } + info("Shutting down offset commit thread.") + if (offsetCommitThread != null) { + offsetCommitThread.shutdown + offsetCommitThread.awaitShutdown + } + info("Shutting down consumer threads.") + if (connectors != null) connectors.foreach(_.shutdown) + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + info("Kafka mirror maker shutdown successfully") } - info("Kafka mirror maker shutdown successfully") } - class DataChannel(capacity: Int, numProducers: Int, numConsumers: Int) extends KafkaMetricsGroup { + class DataChannel(capacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) extends KafkaMetricsGroup { - val queues = new Array[BlockingQueue[ProducerRecord]](numConsumers) - for (i <- 0 until numConsumers) - queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) - - private val counter = new AtomicInteger(new Random().nextInt()) + val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numProducers) + val sizeFunction = (record: MirrorMakerRecord) => record.size + for (i <- 0 until numProducers) + queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](capacity, byteCapacity, Some(sizeFunction)) // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -197,16 +237,17 @@ object MirrorMaker extends Logging { private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") - - def put(record: ProducerRecord) { - // If the key of the message is empty, use round-robin to select the queue - // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue - val queueId = - if(record.key() != null) { - Utils.abs(java.util.Arrays.hashCode(record.key())) % numConsumers - } else { - Utils.abs(counter.getAndIncrement()) % numConsumers - } + private val channelByteSizeHist = newHistogram("MirrorMaker-DataChannel-Byte-Size") + + def put(record: MirrorMakerRecord, qid: Int = -1) { + // If queue id is specified (only when producer thread shuttdown is called), use qid + // otherwise use hash of topic+partition to decide which queue to put the record in. + val queueId = { + if (qid == -1) + Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numProducers + else + qid + } val queue = queues(queueId) var putSucceed = false @@ -215,18 +256,20 @@ object MirrorMaker extends Logging { putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } - channelSizeHist.update(queue.size) + channelSizeHist.update(queues.map(q => q.size()).sum) + channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) } - def take(queueId: Int): ProducerRecord = { + def take(queueId: Int): MirrorMakerRecord = { val queue = queues(queueId) - var data: ProducerRecord = null + var data: MirrorMakerRecord = null while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } - channelSizeHist.update(queue.size) + channelSizeHist.update(queues.map(q => q.size()).sum) + channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) data } } @@ -246,15 +289,21 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val data = new MirrorMakerRecord(msgAndMetadata.topic, + msgAndMetadata.partition, + msgAndMetadata.offset, + msgAndMetadata.key, + msgAndMetadata.message) mirrorDataChannel.put(data) } } catch { - case e: Throwable => - fatal("Stream unexpectedly exited.", e) + case t: Throwable => + fatal("Consumer thread %s unexpectedly exited.".format(this.getName), t) } finally { shutdownLatch.countDown() info("Consumer thread stopped") + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) } } @@ -269,7 +318,7 @@ object MirrorMaker extends Logging { } class ProducerThread (val dataChannel: DataChannel, - val producer: BaseProducer, + val producer: MirrorMakerBaseProducer, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-producer-" + threadId private val shutdownComplete: CountDownLatch = new CountDownLatch(1) @@ -277,32 +326,37 @@ object MirrorMaker extends Logging { setName(threadName) - override def run { + override def run() { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take(threadId) - trace("Sending message with value size %d".format(data.value().size)) + val data: MirrorMakerRecord = dataChannel.take(threadId) + trace("Sending message with value size %d".format(data.value.size)) if(data eq shutdownMessage) { info("Received shutdown message") return } - producer.send(data.topic(), data.key(), data.value()) + producer.send(new TopicAndPartition(data.sourceTopic, data.sourcePartition), + data.sourceOffset, + data.key, + data.value) } } catch { - case t: Throwable => { - fatal("Producer thread failure due to ", t) - } + case t: Throwable => + fatal("Producer thread %s failure due to ".format(this.getName), t) } finally { shutdownComplete.countDown info("Producer thread stopped") + // if it exits accidentally, stop the entire mirror maker + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) } } def shutdown { try { info("Producer thread " + threadName + " shutting down") - dataChannel.put(shutdownMessage) + dataChannel.put(shutdownMessage, threadId) } catch { case ie: InterruptedException => { @@ -323,5 +377,147 @@ object MirrorMaker extends Logging { } } } + + class OffsetCommitThread(commitIntervalMs: Int) extends Thread with Logging with KafkaMetricsGroup { + private val threadName = "mirrormaker-offset-commit-thread" + private val shutdownComplete: CountDownLatch = new CountDownLatch(1) + this.logIdent = "[%s]".format(threadName) + var shutdownFlag: Boolean = false + var commitCounter: Int = 0 + + this.setName(threadName) + + newGauge("MirrorMaker-Offset-Commit-Counter", + new Gauge[Int] { + def value = commitCounter + }) + + /** + * The offset commit thread is using the first ZookeeperConsumerConnector in the connectors. It is based on the + * assumption that all the consumer instances in mirror maker are in the same consumer group. The impact of this + * approach is that: + * 1. If offsets are commited to ZooKeeper, the checkpointedZkOffsets in the first ZookeeperConsumerConnector + * could be tainted by partitions that it does not own. But this problem exists now anyway since we are not + * cleaning checkpointedZKOffsets on consumer rebalance. + * 2. The offset commit could potentially slow down the first ZookeeperConsumerConnector if the offset commit + * is too big (which is not likely to happen). + * Ideally, committing offsets of a partition should be done by the ZookeeperConsumerConnector instance who is + * consuming from the partition. But implementation could be tricky when consumer rebalance occurs. It needs either + * the offset map in mirror maker to be updated on consumer rebalance or putting separate offset maps in each + * ZookeeperConsumerConnector. It seems not worth doing that. + */ + override def run() { + info("Starting mirror maker offset commit thread") + try { + while (shutdownFlag == false) { + Thread.sleep(commitIntervalMs) + commitOffset + } + } catch { + case t: Throwable => fatal("offset commit thread exits due to", t) + } finally { + swallow(commitOffset) + shutdownComplete.countDown() + info("offset commit thread exited") + if (isCleanShutdown.compareAndSet(false, true)) + System.exit(-1) + } + } + + private def commitOffset { + val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map { + case (topicPartition, partitionOffsetMap) => + topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) + }.toSeq: _*) + trace("committing offset: %s".format(offsetsToCommit)) + val connector = connectors.headOption match { + case None => + warn("No consumer available to commit offset.") + case Some(connector) => + connector.commitOffsets(false, offsetsToCommit) + commitCounter += 1 + } + } + + private def getOffsetToCommit(offsetsMap: Pool[Int, Long]): Long = { + val offsets = offsetsMap.map(_._2).toSeq.sorted + val iter = offsets.iterator + var offsetToCommit = iter.next() + while (iter.hasNext && offsetToCommit + 1 == iter.next()) + offsetToCommit += 1 + offsetToCommit + 1 + } + + def shutdown { + shutdownFlag = true + } + + def awaitShutdown { + try { + shutdownComplete.await + info("Offset commit thread shutdown complete") + } catch { + case ie: InterruptedException => { + warn("Shutdown of the offset commit thread interrupted") + } + } + } + } + + private[kafka] trait MirrorMakerBaseProducer { + def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) + def close() + } + + private class MirrorMakerNewProducer (val producerProps: Properties) + extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer { + + override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(topicPartition.topic, key, value) + if(sync) { + topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset) + } else { + this.producer.send(record, + new MirrorMakerProducerCallback(topicPartition, offset, key, value)) + } + } + } + + private class MirrorMakerOldProducer (val producerProps: Properties) + extends OldProducer(producerProps) with MirrorMakerBaseProducer { + + override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { + super.send(topicPartition.topic, key, value) + } + + override def close() { + super.close() + } + } + + private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition, + val offset: Long, + val key: Array[Byte], + val value: Array[Byte]) + extends ErrorLoggingCallback(topicPartition.topic, key, value, false) { + + override def onCompletion(metadata: RecordMetadata, exception: Exception) { + if (exception != null) { + super.onCompletion(metadata, exception) + } else { + info("updating offset:[%s] -> %d".format(topicPartition, offset)) + topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) + } + } + } + + private[kafka] class MirrorMakerRecord (val sourceTopic: String, + val sourcePartition: Int, + val sourceOffset: Long, + val key: Array[Byte], + val value: Array[Byte]) { + def size = value.length + {if (key == null) 0 else key.length} + } + } diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala new file mode 100644 index 0000000..2d2a00e --- /dev/null +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -0,0 +1,158 @@ +package kafka.utils + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} + +import scala.xml.Null + +/** + * A blocking queue that have size limits on both number of elements and number of bytes. + */ +class ByteBoundedBlockingQueue[E] (val queueSize: Int, val queueByteSize: Int, sizeFunction: Option[(E) => Int]) + extends Iterable[E] { + private val queue = new LinkedBlockingQueue[E] (queueSize) + private var currentByteSize = new AtomicInteger() + private val putLock = new Object + + /** + * Put an element to the tail of the queue. Wait for certain amount of time if queue is full. + * The size check is only checking if the bytes in the queue reaches threshold or not. That means + * as long as current queue size is under the threshold, the element could be put into the queue, even + * after that the queue byte size will exceed the threshold. Strict queue size check is easy but could potentially + * lead to poor efficiency when the queue is almost full. + * @param e the element to put into the queue + * @param timeout the amount of time to wait before the expire the operation + * @param unit the time unit of timeout parameter, default to millisecond + * @return true if the element is put into queue, false if it is not + * @throws NullPointerException if element is null + */ + def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + val startTime = SystemTime.nanoseconds + val expireTime = startTime + unit.toNanos(timeout) + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize && SystemTime.nanoseconds < expireTime) + putLock.wait(expireTime - SystemTime.nanoseconds) + // only proceed if queue has capacity and not timeout + if (currentByteSize.get() < queueByteSize && SystemTime.nanoseconds < expireTime) { + success = queue.offer(e, expireTime - SystemTime.nanoseconds, TimeUnit.NANOSECONDS) + // only increase queue byte size iff put succeeds + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Put an element to the tail of the queue, block if queue is full + * @param e The element to put into queue + * @return true on succeed, false on failure + * @throws NullPointerException if element is null + */ + def offer(e: E): Boolean = { + if (e == null) throw new NullPointerException("Putting null element into queue.") + var success = false + try { + putLock synchronized { + while (currentByteSize.get() >= queueByteSize) + putLock.wait() + success = queue.offer(e) + if (success == true) + currentByteSize.addAndGet(sizeFunction.get(e)) + // wake up another thread in case multiple threads are waiting + if (currentByteSize.get() < queueByteSize) + putLock.notify() + } + } catch { + case ie: InterruptedException => + } + success + } + + /** + * Get an element from the head of queue. Wait for some time if the queue is empty. + * The poll method almost does not block on offer. + * @param timeout the amount of time to wait if the queue is empty + * @param unit the unit type + * @return the first element in the queue, null if queue is empty + */ + def poll(timeout: Long, unit: TimeUnit): E = { + var e = queue.poll(timeout, unit) + // only wake up waiting threads if the queue size drop under queueByteSize + if (e != null && + currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Get an element from the head of the queue, block if the queue is empty + * @return the first element in the queue, null if queue is empty + */ + def poll(): E = { + var e = queue.poll() + // only wake up waiting threads if the queue size drop under queueByteSize + if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteSize && + currentByteSize.get() < queueByteSize) + putLock.notify() + e + } + + /** + * Iterator for the queue + * @return Iterator for the queue + */ + override def iterator() = new Iterator[E] () { + private val iter = queue.iterator() + private var curr: E = null.asInstanceOf[E] + + def hasNext: Boolean = iter.hasNext + + def next: E = { + curr = iter.next() + curr + } + + def remove { + if (curr == null) + throw new IllegalArgumentException + iter.remove() + if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteSize) + putLock.notify() + } + } + + /** + * get the number of elements in the queue + * @return number of elements in the queue + */ + override def size() = queue.size() + + /** + * get the current byte size in the queue + * @return current queue size in bytes + */ + def byteSize() = currentByteSize.get() + + /** + * get the number of unused slots in the queue + * @return the number of unused slots in the queue + */ + def remainingSize = queue.remainingCapacity() + + /** + * get the remaining bytes capacity of the queue + * @return the remaining bytes capacity of the queue + */ + def remainingByteSize = math.max(0, queueByteSize - currentByteSize.get()) +} -- 1.8.3.4 (Apple Git-47) From 73a30a7bde3d395c25c8120950fb0f6e00a32cd3 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sun, 5 Oct 2014 00:48:09 -0700 Subject: [PATCH 2/3] Addressed Guozhang's comments. --- .../main/scala/kafka/server/ReplicaManager.scala | 8 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 119 ++++++++++++--------- .../kafka/utils/ByteBoundedBlockingQueue.scala | 17 +++ 3 files changed, 90 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 78b7514..aa6fa31 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -483,9 +483,7 @@ class ReplicaManager(val config: KafkaConfig, val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) - partitionsToMakeFollower += partition - else + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager) == false) stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader") .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, @@ -498,6 +496,10 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) } + // Partition needs to be added to the partitionsToMakeFollower even if leader is not up yet. Thus the + // highWatermark can be read before it's overwritten by new checkpoints. Otherwise we could have a + // data loss issue. See KAFKA-1647. + partitionsToMakeFollower += partition } replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_))) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 0d651e1..71791ee 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,9 +17,7 @@ package kafka.tools -import java.lang.Thread.UncaughtExceptionHandler - -import com.yammer.metrics.core.Gauge +import com.yammer.metrics.core._ import kafka.common.{TopicAndPartition, OffsetAndMetadata} import kafka.utils._ import kafka.consumer._ @@ -27,19 +25,18 @@ import kafka.serializer._ import kafka.producer.{OldProducer, NewShinyProducer} import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback - import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} import scala.collection.JavaConversions._ import joptsimple.OptionParser -import java.util.{Properties} -import java.util.concurrent.atomic.{AtomicBoolean} +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent._ object MirrorMaker extends Logging { - private var connectors: Seq[ZookeeperConsumerConnector] = null + private var connector: ZookeeperConsumerConnector = null private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null private var offsetCommitThread: OffsetCommitThread = null @@ -57,8 +54,7 @@ object MirrorMaker extends Logging { val parser = new OptionParser val consumerConfigOpt = parser.accepts("consumer.config", - "Consumer config to consume from a source cluster. " + - "You may specify multiple of these.") + "Consumer config to consume from a source cluster.") .withRequiredArg() .describedAs("config file") .ofType(classOf[String]) @@ -94,9 +90,9 @@ object MirrorMaker extends Logging { .defaultsTo(10000) val bufferByteSizeOpt = parser.accepts("queue.byte.size", - "Number of bytes that are buffered between the consumer and producer") + "Maximum bytes that can be buffered in each data channel queue") .withRequiredArg() - .describedAs("Queue size in terms of number of messages") + .describedAs("Data channel queue size in terms of number of bytes") .ofType(classOf[java.lang.Integer]) .defaultsTo(100000000) @@ -107,16 +103,16 @@ object MirrorMaker extends Logging { .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", - "Blacklist of topics to mirror.") - .withRequiredArg() - .describedAs("Java regex (String)") - .ofType(classOf[String]) + "Blacklist of topics to mirror.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) val offsetCommitIntervalMsOpt = parser.accepts("offsetCommitIntervalMs", - "interval to commit offsets") - .withRequiredArg() - .describedAs("offset commit interval in millisecond") - .ofType(classOf[java.lang.Integer]) + "Offset commit interval in ms") + .withRequiredArg() + .describedAs("offset commit interval in millisecond") + .ofType(classOf[java.lang.Integer]) val helpOpt = parser.accepts("help", "Print this message.") @@ -143,13 +139,13 @@ object MirrorMaker extends Logging { val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() // create consumer streams - connectors = options.valuesOf(consumerConfigOpt).toList - .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) - .map(new ZookeeperConsumerConnector(_)) - val numConsumers = connectors.size * numStreams + connector = { + val consumerConfig = new ConsumerConfig(Utils.loadProps(options.valuesOf(consumerConfigOpt).head)) + new ZookeeperConsumerConnector(consumerConfig) + } // create a data channel btw the consumers and the producers - val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numConsumers, numProducers) + val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numStreams, numProducers) // create producer threads val useNewProducer = options.has(useNewProducerOpt) @@ -173,14 +169,14 @@ object MirrorMaker extends Logging { var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil try { - streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder())).flatten + streams = connector.createMessageStreamsByFilter(filterSpec, numStreams, new DefaultDecoder(), new DefaultDecoder()) } catch { case t: Throwable => fatal("Unable to create stream - shutting down mirror maker.") - connectors.foreach(_.shutdown) + connector.shutdown } consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2)) - assert(consumerThreads.size == numConsumers) + assert(consumerThreads.size == numStreams) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -190,8 +186,15 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) + // create offset commit thread if (useNewProducer) { + /** + * The offset commit thread commits offsets based on the new producer's callback. Using offset commit thread + * and new producer guarantees there is no data loss when mirror maker is uncleanly shutdown. For old producer, + * the offset commit will be based on the consumer's offsets. When unclean shutdown occurs, it is possible that + * all the messages in data channel will be lost. + */ offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) offsetCommitThread.start() } @@ -206,19 +209,25 @@ object MirrorMaker extends Logging { def cleanShutdown() { if (isCleanShutdown.compareAndSet(false, true)) { info("Start clean shutdown.") + // Consumer threads will exit when isCleanShutdown is set. + info("Waiting consumer threads to shutdown.") + if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + // After consumer threads exit, shutdown producer. info("Shutting down producer threads.") if (producerThreads != null) { producerThreads.foreach(_.shutdown) producerThreads.foreach(_.awaitShutdown) } + // offset commit thread should only be shutdown after producer threads are shutdown, so we don't lose offsets. info("Shutting down offset commit thread.") if (offsetCommitThread != null) { offsetCommitThread.shutdown offsetCommitThread.awaitShutdown } - info("Shutting down consumer threads.") - if (connectors != null) connectors.foreach(_.shutdown) - if (consumerThreads != null) consumerThreads.foreach(_.awaitShutdown) + // connector can only be shutdown after offsets are committed. + info("Shutting down consumer connectors.") + if (connector != null) + connector.shutdown info("Kafka mirror maker shutdown successfully") } } @@ -226,9 +235,14 @@ object MirrorMaker extends Logging { class DataChannel(capacity: Int, byteCapacity: Int, numConsumers: Int, numProducers: Int) extends KafkaMetricsGroup { val queues = new Array[ByteBoundedBlockingQueue[MirrorMakerRecord]](numProducers) + val channelSizeHists = new Array[Histogram](numProducers) + val channelByteSizeHists = new Array[Histogram](numProducers) val sizeFunction = (record: MirrorMakerRecord) => record.size - for (i <- 0 until numProducers) + for (i <- 0 until numProducers) { queues(i) = new ByteBoundedBlockingQueue[MirrorMakerRecord](capacity, byteCapacity, Some(sizeFunction)) + channelSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Size".format(i)) + channelByteSizeHists(i) = newHistogram("MirrorMaker-DataChannel-queue-%d-Byte-Size".format(i)) + } // We use a single meter for aggregated wait percentage for the data channel. // Since meter is calculated as total_recorded_value / time_window and @@ -236,17 +250,15 @@ object MirrorMaker extends Logging { // time should be discounted by # threads. private val waitPut = newMeter("MirrorMaker-DataChannel-WaitOnPut", "percent", TimeUnit.NANOSECONDS) private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) - private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") - private val channelByteSizeHist = newHistogram("MirrorMaker-DataChannel-Byte-Size") - def put(record: MirrorMakerRecord, qid: Int = -1) { - // If queue id is specified (only when producer thread shuttdown is called), use qid + def put(record: MirrorMakerRecord, specifiedQueueId: Int = -1) { + // If queue id is specified (only when producer thread shutdown is called), use qid // otherwise use hash of topic+partition to decide which queue to put the record in. val queueId = { - if (qid == -1) + if (specifiedQueueId == -1) Utils.abs(java.util.Arrays.hashCode((record.sourceTopic + record.sourcePartition).toCharArray)) % numProducers else - qid + specifiedQueueId } val queue = queues(queueId) @@ -256,8 +268,8 @@ object MirrorMaker extends Logging { putSucceed = queue.offer(record, 500, TimeUnit.MILLISECONDS) waitPut.mark((SystemTime.nanoseconds - startPutTime) / numProducers) } - channelSizeHist.update(queues.map(q => q.size()).sum) - channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) + channelSizeHists(queueId).update(queue.size()) + channelByteSizeHists(queueId).update(queue.byteSize()) } def take(queueId: Int): MirrorMakerRecord = { @@ -268,8 +280,8 @@ object MirrorMaker extends Logging { data = queue.poll(500, TimeUnit.MILLISECONDS) waitTake.mark((SystemTime.nanoseconds - startTakeTime) / numConsumers) } - channelSizeHist.update(queues.map(q => q.size()).sum) - channelByteSizeHist.update(queues.map(q => q.byteSize()).sum) + channelSizeHists(queueId).update(queue.size()) + channelByteSizeHists(queueId).update(queue.byteSize()) data } } @@ -288,7 +300,9 @@ object MirrorMaker extends Logging { override def run() { info("Starting mirror maker consumer thread " + threadName) try { - for (msgAndMetadata <- stream) { + val iter = stream.iterator() + while (isCleanShutdown.get() == false && iter.hasNext()) { + val msgAndMetadata = iter.next() val data = new MirrorMakerRecord(msgAndMetadata.topic, msgAndMetadata.partition, msgAndMetadata.offset, @@ -302,6 +316,10 @@ object MirrorMaker extends Logging { } finally { shutdownLatch.countDown() info("Consumer thread stopped") + + /**If the thread did not exit during a clean shutdown, then something went wrong. We just stop the + * entire mirror maker in this case because mirror maker has essentially stopped consuming from some partitions. + */ if (isCleanShutdown.compareAndSet(false, true)) System.exit(-1) } @@ -414,11 +432,11 @@ object MirrorMaker extends Logging { commitOffset } } catch { - case t: Throwable => fatal("offset commit thread exits due to", t) + case t: Throwable => fatal("Offset commit thread exits due to", t) } finally { swallow(commitOffset) shutdownComplete.countDown() - info("offset commit thread exited") + info("Offset commit thread exited") if (isCleanShutdown.compareAndSet(false, true)) System.exit(-1) } @@ -430,12 +448,11 @@ object MirrorMaker extends Logging { topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) }.toSeq: _*) trace("committing offset: %s".format(offsetsToCommit)) - val connector = connectors.headOption match { - case None => - warn("No consumer available to commit offset.") - case Some(connector) => - connector.commitOffsets(false, offsetsToCommit) - commitCounter += 1 + if (connector == null) { + warn("No consumer connector available to commit offset.") + } else { + connector.commitOffsets(false, offsetsToCommit) + commitCounter += 1 } } @@ -505,7 +522,7 @@ object MirrorMaker extends Logging { if (exception != null) { super.onCompletion(metadata, exception) } else { - info("updating offset:[%s] -> %d".format(topicPartition, offset)) + trace("updating offset:[%s] -> %d".format(topicPartition, offset)) topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) } } diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala index 2d2a00e..9c61e68 100644 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.utils import java.util.concurrent.atomic.AtomicInteger -- 1.8.3.4 (Apple Git-47) From c781b820dfe54810641a7bab9c13549346ef7123 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 6 Oct 2014 10:15:31 -0700 Subject: [PATCH 3/3] Addressed Guozhang's comments --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 71791ee..1aca8a7 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -411,18 +411,7 @@ object MirrorMaker extends Logging { }) /** - * The offset commit thread is using the first ZookeeperConsumerConnector in the connectors. It is based on the - * assumption that all the consumer instances in mirror maker are in the same consumer group. The impact of this - * approach is that: - * 1. If offsets are commited to ZooKeeper, the checkpointedZkOffsets in the first ZookeeperConsumerConnector - * could be tainted by partitions that it does not own. But this problem exists now anyway since we are not - * cleaning checkpointedZKOffsets on consumer rebalance. - * 2. The offset commit could potentially slow down the first ZookeeperConsumerConnector if the offset commit - * is too big (which is not likely to happen). - * Ideally, committing offsets of a partition should be done by the ZookeeperConsumerConnector instance who is - * consuming from the partition. But implementation could be tricky when consumer rebalance occurs. It needs either - * the offset map in mirror maker to be updated on consumer rebalance or putting separate offset maps in each - * ZookeeperConsumerConnector. It seems not worth doing that. + * Use the connector to commit all the offsets. */ override def run() { info("Starting mirror maker offset commit thread") -- 1.8.3.4 (Apple Git-47)