From 10c6dec34dae8820bf7ce24839c938135f9a9189 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 23 Dec 2014 17:04:45 -0800 Subject: [PATCH 1/2] KAFKA-1650; (Follow-up patch) to support no data loss in mirror maker; reviewed by Joel Koshy --- .../scala/kafka/consumer/ConsumerConnector.scala | 15 + .../consumer/ZookeeperConsumerConnector.scala | 56 ++-- .../consumer/ZookeeperConsumerConnector.scala | 9 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 335 ++++++++++++--------- .../main/scala/kafka/utils/DoublyLinkedList.scala | 126 ++++++++ .../test/scala/unit/kafka/utils/UtilsTest.scala | 50 +++ 6 files changed, 423 insertions(+), 168 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/DoublyLinkedList.scala diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala index 62c0686..384be74 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -17,6 +17,9 @@ package kafka.consumer +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.javaapi.consumer.ConsumerRebalanceListener + import scala.collection._ import kafka.utils.Logging import kafka.serializer._ @@ -76,6 +79,18 @@ trait ConsumerConnector { * KAFKA-1743: This method added for backward compatibility. */ def commitOffsets + + /** + * Commit offsets from an external offsets map. + * @param offsetsToCommit the offsets to be committed. + */ + def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) + + /** + * Wire in a consumer rebalance listener to be executed when consumer rebalance occurs. + * @param listener The consumer rebalance listener to wire in + */ + def setConsumerRebalanceListener(listener: ConsumerRebalanceListener) /** * Shut down the connector diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e991d21..191a867 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -302,25 +302,28 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def commitOffsets { commitOffsets(true) } def commitOffsets(isAutoCommit: Boolean) { - commitOffsets(isAutoCommit, null) + + val offsetsToCommit = + immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => + partitionTopicInfos.map { case (partition, info) => + TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) + } + }.toSeq: _*) + + commitOffsets(offsetsToCommit, isAutoCommit) + } - def commitOffsets(isAutoCommit: Boolean, - topicPartitionOffsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]) { - var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit + def commitOffsets(offsetsToCommit: immutable.Map[TopicAndPartition, OffsetAndMetadata], isAutoCommit: Boolean) { + trace("OffsetMap: %s".format(offsetsToCommit)) + var retriesRemaining = 1 + (if (isAutoCommit) 0 else config.offsetsCommitMaxRetries) // no retries for commits from auto-commit var done = false - while (!done) { - val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors - val offsetsToCommit = if (topicPartitionOffsets == null) {immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) => - partitionTopicInfos.map { case (partition, info) => - TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset()) - } - }.toSeq:_*)} else topicPartitionOffsets - + val committed = offsetsChannelLock synchronized { + // committed when we receive either no error codes or only MetadataTooLarge errors if (offsetsToCommit.size > 0) { if (config.offsetsStorage == "zookeeper") { - offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) => + offsetsToCommit.foreach { case (topicAndPartition, offsetAndMetadata) => commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset) } true @@ -334,25 +337,25 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, trace("Offset commit response: %s.".format(offsetCommitResponse)) val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = { - offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) => + offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case (folded, (topicPartition, errorCode)) => if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) { - val offset = offsetsToCommit(topicPartition).offset - commitOffsetToZooKeeper(topicPartition, offset) + val offset = offsetsToCommit(topicPartition).offset + commitOffsetToZooKeeper(topicPartition, offset) } (folded._1 || // update commitFailed - errorCode != ErrorMapping.NoError, + errorCode != ErrorMapping.NoError, - folded._2 || // update retryableIfFailed - (only metadata too large is not retryable) - (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), + folded._2 || // update retryableIfFailed - (only metadata too large is not retryable) + (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode), - folded._3 || // update shouldRefreshCoordinator - errorCode == ErrorMapping.NotCoordinatorForConsumerCode || - errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, + folded._3 || // update shouldRefreshCoordinator + errorCode == ErrorMapping.NotCoordinatorForConsumerCode || + errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode, - // update error count - folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) + // update error count + folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0)) } } debug(errorCount + " errors in offset commit response.") @@ -381,11 +384,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely if shutting down + done = { retriesRemaining -= 1 retriesRemaining == 0 || committed - } else - true + } if (!done) { debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs)) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 9baad34..bfd8d37 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,9 +18,8 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ -import kafka.common.MessageStreamsExistException -import scala.collection.mutable -import scala.collection.JavaConversions +import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException} +import scala.collection.{immutable, mutable, JavaConversions} import java.util.concurrent.atomic.AtomicBoolean /** @@ -115,6 +114,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, underlying.commitOffsets(retryOnFailure) } + def commitOffsets(offsetsToCommit: java.util.Map[TopicAndPartition, OffsetAndMetadata], retryOnFailure: Boolean) { + underlying.commitOffsets(offsetsToCommit.asInstanceOf[immutable.Map[TopicAndPartition, OffsetAndMetadata]], retryOnFailure) + } + def setConsumerRebalanceListener(consumerRebalanceListener: ConsumerRebalanceListener) { underlying.setConsumerRebalanceListener(consumerRebalanceListener) } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 53cb16c..191542c 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -23,10 +23,11 @@ import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.utils._ import kafka.consumer._ import kafka.serializer._ -import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.producer.{KeyedMessage, ProducerConfig} import kafka.metrics.KafkaMetricsGroup import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{RecordMetadata, ProducerRecord} +import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata, ProducerRecord} +import org.apache.kafka.common.KafkaException import scala.collection.JavaConversions._ @@ -47,10 +48,14 @@ import java.util.concurrent._ * * If new producer is used, the offset will be committed based on the new producer's callback. An offset map is * maintained and updated on each send() callback. A separate offset commit thread will commit the offset periodically. - * @note For mirror maker, MaxInFlightRequests of producer should be set to 1 for producer if the order of the messages - * needs to be preserved. Mirror maker also depends on the in-order delivery to guarantee no data loss. - * We are not force it to be 1 because in some use cases throughput might be important whereas out of order or - * minor data loss is acceptable. + * @note For mirror maker, the following settings are required to make sure there is no data loss: + * 1. use new producer with following settings + * acks=all + * retries=max integer + * block.on.buffer.full=true + * 2. Consumer Settings + * auto.commit.enable=false + * If --no.data.loss flag is set in option, then those settings are automatically applied. */ object MirrorMaker extends Logging with KafkaMetricsGroup { @@ -58,23 +63,39 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var consumerThreads: Seq[ConsumerThread] = null private var producerThreads: Seq[ProducerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) - private var offsetCommitThread: OffsetCommitThread = null + private val scheduler: KafkaScheduler = new KafkaScheduler(threads = 1) - private val valueFactory = (k: TopicAndPartition) => new Pool[Int, Long] - private val topicPartitionOffsetMap: Pool[TopicAndPartition, Pool[Int, Long]] = - new Pool[TopicAndPartition, Pool[Int,Long]](Some(valueFactory)) + private val unackedOffsetsMap: Pool[TopicAndPartition, UnackedOffsets] = + new Pool[TopicAndPartition, UnackedOffsets](valueFactory = Some((k: TopicAndPartition) => new UnackedOffsets)) // Track the messages unacked for consumer rebalance - private var numMessageUnacked: AtomicInteger = new AtomicInteger(0) - private var consumerRebalanceListener: MirrorMakerConsumerRebalanceListener = null + private var numUnackedMessages: AtomicInteger = new AtomicInteger(0) + private var numSkippedUnackedMessages: AtomicInteger = new AtomicInteger(0) + private var consumerRebalanceListener: ConsumerRebalanceListener = null // This is to indicate whether the rebalance is going on so the producer callback knows if - // the rebalance latch needs to be pulled. - private var inRebalance: AtomicBoolean = new AtomicBoolean(false) + // the flag indicates internal consumer rebalance callback is waiting for all the messages sent to be acked. + private var waitingForMessageAcks: Boolean = false private val shutdownMessage : MirrorMakerRecord = new MirrorMakerRecord("shutdown", 0, 0, null, "shutdown".getBytes) - newGauge("MirrorMaker-Unacked-Messages", + newGauge("MirrorMaker-NumUnackedMessages", new Gauge[Int] { - def value = numMessageUnacked.get() + def value = numUnackedMessages.get() + }) + + // The number of unacked offsets in the unackedOffsetsMap + newGauge("MirrorMaker-UnackedOffsetListsSize", + new Gauge[Int] { + def value = unackedOffsetsMap.iterator.map{ + case(_, unackedOffsets) => unackedOffsets.size + }.sum + }) + + // If a message send failed after retries are exhausted. The offset of the messages will also be removed from + // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that + // message was not really acked, but was skipped. This metric records the number of skipped offsets. + newGauge("MirrorMaker-NumSkippedOffsets", + new Gauge[Int] { + def value = numSkippedUnackedMessages.get() }) def main(args: Array[String]) { @@ -98,6 +119,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val useNewProducerOpt = parser.accepts("new.producer", "Use the new producer implementation.") + val noDataLossOpt = parser.accepts("no.data.loss", + "Configure the mirror maker to have no data loss.") + val numProducersOpt = parser.accepts("num.producers", "Number of producer instances") .withRequiredArg() @@ -145,6 +169,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { .ofType(classOf[java.lang.Integer]) .defaultsTo(60000) + val consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener", + "The consumer rebalance listener to use for mirror maker consumer.") + .withRequiredArg() + .describedAs("A custom rebalance listener of type ConsumerRebalanceListener") + .ofType(classOf[String]) + val helpOpt = parser.accepts("help", "Print this message.") if(args.length == 0) @@ -170,7 +200,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() // create consumer connector - val consumerConfigProps = Utils.loadProps(options.valuesOf(consumerConfigOpt).head) + val consumerConfigProps = Utils.loadProps(options.valueOf(consumerConfigOpt)) + val noDataLoss = options.has(noDataLossOpt) + // disable consumer auto commit because offset will be committed by offset commit thread. + if (noDataLoss) + consumerConfigProps.setProperty("auto.commit.enable","false") val consumerConfig = new ConsumerConfig(consumerConfigProps) connector = new ZookeeperConsumerConnector(consumerConfig) @@ -178,20 +212,30 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val mirrorDataChannel = new DataChannel(bufferSize, bufferByteSize, numInputs = numStreams, numOutputs = numProducers) // set consumer rebalance listener - // Customized consumer rebalance listener should extend MirrorMakerConsumerRebalanceListener - // and take datachannel as argument. - val customRebalanceListenerClass = consumerConfigProps.getProperty("consumer.rebalance.listener") - consumerRebalanceListener = { - if (customRebalanceListenerClass == null) { - new MirrorMakerConsumerRebalanceListener(mirrorDataChannel) - } else - Utils.createObject[MirrorMakerConsumerRebalanceListener](customRebalanceListenerClass, mirrorDataChannel) + // custom rebalance listener will be invoked after internal listener finishes its work. + val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) + val customRebalanceListener = { + if (customRebalanceListenerClass != null) + Utils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass) + else + null } + consumerRebalanceListener = new InternalRebalanceListener(mirrorDataChannel, Some(customRebalanceListener)) connector.setConsumerRebalanceListener(consumerRebalanceListener) // create producer threads - val useNewProducer = options.has(useNewProducerOpt) val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) + val useNewProducer = { + // Override producer settings if no.data.loss is set + if (noDataLoss) { + producerProps.setProperty("retries",Int.MaxValue.toString) + producerProps.setProperty("block.on.buffer.full", "true") + producerProps.setProperty("acks","all") + true + } else { + options.has(useNewProducerOpt) + } + } val clientId = producerProps.getProperty("client.id", "") producerThreads = (0 until numProducers).map(i => { producerProps.setProperty("client.id", clientId + "-" + i) @@ -203,17 +247,17 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { new ProducerThread(mirrorDataChannel, producer, i) }) - // create offset commit thread - if (useNewProducer) { + // start offset commit thread + if (noDataLoss) { /** - * The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, + * The offset commit thread periodically commit consumed offsets. With the new producer, * the offsets are updated upon the returned future metadata of the send() call; with the old producer, * the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data * loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer * messages inside the data channel could be lost upon mirror maker unclean shutdown. */ - offsetCommitThread = new OffsetCommitThread(offsetCommitIntervalMs) - offsetCommitThread.start() + scheduler.startup() + scheduler.schedule("offset-commit", commitOffsets, 0, offsetCommitIntervalMs, TimeUnit.MILLISECONDS) } // create consumer threads @@ -252,7 +296,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def cleanShutdown() { if (isShuttingdown.compareAndSet(false, true)) { info("Start clean shutdown.") - // Consumer threads will exit when isCleanShutdown is set. + // Shutdown consumer threads. info("Shutting down consumer threads.") if (consumerThreads != null) { consumerThreads.foreach(_.shutdown()) @@ -265,12 +309,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { producerThreads.foreach(_.awaitShutdown()) } // offset commit thread should only be shutdown after producer threads are shutdown, so we don't lose offsets. - info("Shutting down offset commit thread.") - if (offsetCommitThread != null) { - offsetCommitThread.shutdown() - offsetCommitThread.awaitShutdown() - } - // connector can only be shutdown after offsets are committed. + scheduler.shutdown() + swallow(commitOffsets()) + + // connector should only be shutdown after offsets are committed. info("Shutting down consumer connectors.") if (connector != null) connector.shutdown() @@ -457,155 +499,134 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } - class OffsetCommitThread(commitIntervalMs: Int) extends Thread with Logging with KafkaMetricsGroup { - private val threadName = "mirrormaker-offset-commit-thread" - private val shutdownComplete: CountDownLatch = new CountDownLatch(1) - this.logIdent = "[%s]".format(threadName) - var shutdownFlag: Boolean = false - var commitCounter: Int = 0 - - this.setName(threadName) - - newGauge("MirrorMaker-Offset-Commit-Counter", - new Gauge[Int] { - def value = commitCounter - }) - - /** - * Use the connector to commit all the offsets. - */ - override def run() { - info("Starting mirror maker offset commit thread") - try { - while (!shutdownFlag) { - Thread.sleep(commitIntervalMs) - commitOffset() - } - } catch { - case t: Throwable => fatal("Exits due to", t) - } finally { - swallow(commitOffset()) - shutdownComplete.countDown() - info("Offset commit thread exited") - if (!isShuttingdown.get()) { - fatal("Offset commit thread exited abnormally, stopping the whole mirror maker.") - System.exit(-1) - } - } - } - - def commitOffset() { - val offsetsToCommit = collection.immutable.Map(topicPartitionOffsetMap.map { - case (topicPartition, partitionOffsetMap) => - topicPartition -> OffsetAndMetadata(getOffsetToCommit(partitionOffsetMap), null) + private def commitOffsets() { + try { + info("Committing offsets") + val offsetsToCommit = collection.immutable.Map(unackedOffsetsMap.map { + case (topicPartition, unackedOffsets) => + topicPartition -> OffsetAndMetadata(unackedOffsets.getOffsetToCommit, null) }.toSeq: _*) - trace("committing offset: %s".format(offsetsToCommit)) if (connector == null) { warn("No consumer connector available to commit offset.") } else { - connector.commitOffsets( - isAutoCommit = false, - topicPartitionOffsets = offsetsToCommit - ) - commitCounter += 1 - } - } - - private def getOffsetToCommit(offsetsMap: Pool[Int, Long]): Long = { - val offsets = offsetsMap.map(_._2).toSeq.sorted - val iter = offsets.iterator - var offsetToCommit = iter.next() - while (iter.hasNext && offsetToCommit + 1 == iter.next()) - offsetToCommit += 1 - // The committed offset will be the first offset of un-consumed message, hence we need to increment by one. - offsetToCommit + 1 - } - - def shutdown() { - shutdownFlag = true - } - - def awaitShutdown() { - try { - shutdownComplete.await() - info("Offset commit thread shutdown complete") - } catch { - case ie: InterruptedException => { - warn("Shutdown of the offset commit thread interrupted") - } + connector.commitOffsets(offsetsToCommit, isAutoCommit = false) } + } catch { + case e: OutOfMemoryError => + fatal("Shutting down mirror maker due to error when committing offsets.", e) + System.exit(-1) + case t: Throwable => + warn("Offsets commit failed due to ", t) } } private[kafka] trait MirrorMakerBaseProducer { - def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) + def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) def close() } - private class MirrorMakerNewProducer (val producerProps: Properties) - extends NewShinyProducer(producerProps) with MirrorMakerBaseProducer { + private class MirrorMakerNewProducer (val producerProps: Properties) extends MirrorMakerBaseProducer { - override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { - val record = new ProducerRecord[Array[Byte],Array[Byte]](topicPartition.topic, key, value) + val sync = producerProps.getProperty("producer.type", "async").equals("sync") + + val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + + override def send(sourceTopicPartition: TopicAndPartition, sourceOffset: Long, key: Array[Byte], value: Array[Byte]) { + val record = new ProducerRecord(sourceTopicPartition.topic, key, value) if(sync) { - topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(this.producer.send(record).get().partition(), offset) + this.producer.send(record).get() + unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset) } else { - this.producer.send(record, - new MirrorMakerProducerCallback(topicPartition, offset, key, value)) - numMessageUnacked.incrementAndGet() + + val unackedOffsets = unackedOffsetsMap.getAndMaybePut(sourceTopicPartition) + // synchronize to ensure that addOffset precedes removeOffset + unackedOffsets synchronized { + val unackedOffset = new UnackedOffset(sourceOffset) + this.producer.send(record, + new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value)) + // add offset to unackedOffsets + unackedOffsets.addOffset(unackedOffset) + numUnackedMessages.incrementAndGet() + } } } + + override def close() { + this.producer.close() + } } - private class MirrorMakerOldProducer (val producerProps: Properties) - extends OldProducer(producerProps) with MirrorMakerBaseProducer { + private class MirrorMakerOldProducer (val producerProps: Properties) extends MirrorMakerBaseProducer { + + // default to byte array partitioner + if (producerProps.getProperty("partitioner.class") == null) + producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName) + val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps)) override def send(topicPartition: TopicAndPartition, offset: Long, key: Array[Byte], value: Array[Byte]) { - super.send(topicPartition.topic, key, value) + this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topicPartition.topic, key, value)) } override def close() { - super.close() + this.producer.close() } } private class MirrorMakerProducerCallback (val topicPartition: TopicAndPartition, - val offset: Long, + val offset: UnackedOffset, val key: Array[Byte], val value: Array[Byte]) extends ErrorLoggingCallback(topicPartition.topic, key, value, false) { override def onCompletion(metadata: RecordMetadata, exception: Exception) { if (exception != null) { - // Use default call back to log error + // Use default call back to log error. This means the max retries of producer has reached and message + // still could not be sent. In this case we have to remove the offsets from list to let the mirror maker + // move on. The message failed to be sent will be lost in target cluster. + warn("Not be able to send message, offset of "+ topicPartition + " will not advance. Total number" + + "of skipped unacked messages is" + numSkippedUnackedMessages.incrementAndGet()) super.onCompletion(metadata, exception) } else { - trace("updating offset:[%s] -> %d".format(topicPartition, offset)) - topicPartitionOffsetMap.getAndMaybePut(topicPartition).put(metadata.partition(), offset) + trace("Updating offset for %s to %d".format(topicPartition, offset)) } + // remove the offset from the unackedOffsets + val unackedOffsets = unackedOffsetsMap.get(topicPartition) + unackedOffsets.removeOffset(offset) // Notify the rebalance callback only when all the messages handed to producer are acked. - // There is a very slight chance that 1 message is held by producer thread and not handed to producer. + // There is a very slight chance that one message is held by producer thread and not handed to producer. // That message might have duplicate. We are not handling that here. - if (numMessageUnacked.decrementAndGet() == 0 && inRebalance.get()) { - inRebalance synchronized {inRebalance.notify()} + numUnackedMessages synchronized { + if (numUnackedMessages.decrementAndGet() == 0 && waitingForMessageAcks) { + numUnackedMessages.notify() + } } } } - class MirrorMakerConsumerRebalanceListener (dataChannel: DataChannel) extends ConsumerRebalanceListener { + class InternalRebalanceListener (dataChannel: DataChannel, customRebalanceListener: Option[ConsumerRebalanceListener]) + extends ConsumerRebalanceListener { override def beforeReleasingPartitions(partitionOwnership: java.util.Map[String, java.util.Set[java.lang.Integer]]) { info("Clearing data channel.") dataChannel.clear() info("Waiting until all the messages are acked.") - inRebalance synchronized { - inRebalance.set(true) - while (numMessageUnacked.get() > 0) - inRebalance.wait() + numUnackedMessages synchronized { + waitingForMessageAcks = true + while (numUnackedMessages.get() > 0) { + try { + numUnackedMessages.wait() + } catch { + case e: InterruptedException => info("Ignoring interrupt while waiting.") + } + } + waitingForMessageAcks = false } info("Committing offsets.") - offsetCommitThread.commitOffset() - inRebalance.set(true) + commitOffsets() + + // invoke custom consumer rebalance listener + if (customRebalanceListener.isDefined) + customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) } } @@ -617,5 +638,43 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { def size = value.length + {if (key == null) 0 else key.length} } -} + private class UnackedOffset(offset: Long) extends DoublyLinkedListNode[Long](offset) { + + } + + private class UnackedOffsets { + val offsetList = new DoublyLinkedList[Long] + var maxOffsetSeen: Long = -1L + + def maybeUpdateMaxOffsetSeen(offset: Long) { + this synchronized { + maxOffsetSeen = math.max(maxOffsetSeen, offset) + } + } + def addOffset(offset: DoublyLinkedListNode[Long]) { + this synchronized { + offsetList.add(offset) + maybeUpdateMaxOffsetSeen(offset.element) + } + } + + def removeOffset(offset: DoublyLinkedListNode[Long]) { + offsetList.remove(offset) + } + + def getOffsetToCommit: Long = { + this synchronized { + val smallestUnackedOffset = offsetList.peek() + if (smallestUnackedOffset == null) + // list is empty, commit maxOffsetSeen + 1 + maxOffsetSeen + 1 + else + // commit the smallest unacked offset + smallestUnackedOffset.element + } + } + + def size: Int = offsetList.size + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/DoublyLinkedList.scala b/core/src/main/scala/kafka/utils/DoublyLinkedList.scala new file mode 100644 index 0000000..e637ef3 --- /dev/null +++ b/core/src/main/scala/kafka/utils/DoublyLinkedList.scala @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +/** + * Simple doubly LinkedList node + * @param element The element + * @tparam T The type of element + */ +class DoublyLinkedListNode[T] (val element: T) { + var prev: DoublyLinkedListNode[T] = null + var next: DoublyLinkedListNode[T] = null +} + +/** + * A simple doubly linked list util to allow O(1) remove. + * @tparam T type of element in nodes + */ +@threadsafe +class DoublyLinkedList[T] { + private var head: DoublyLinkedListNode[T] = null + private var tail: DoublyLinkedListNode[T] = null + @volatile private var listSize: Int = 0 + + /** + * Add offset to the tail of the list + * @param node the node to be added to the tail of the list + */ + def add (node: DoublyLinkedListNode[T]) { + this synchronized { + if (head == null) { + // empty list + head = node + tail = node + node.prev = null + node.next = null + } else { + // add to tail + tail.next = node + node.next = null + node.prev = tail + tail = node + } + listSize += 1 + } + } + + /** + * Remove a node from the list. The list will not check if the node is really in the list. + * @param node the node to be removed from the list + */ + def remove (node: DoublyLinkedListNode[T]) { + this synchronized { + if (node ne head) + node.prev.next = node.next + else + head = node.next + + if (node ne tail) + node.next.prev = node.prev + else + tail = node.prev + + node.prev = null + node.next = null + + listSize -= 1 + } + } + + /** + * Remove the first node in the list and return it if the list is not empty. + * @return The first node in the list if the list is not empty. Return Null if the list is empty. + */ + def remove(): DoublyLinkedListNode[T] = { + this synchronized { + if (head != null) { + val node = head + remove(head) + node + } else { + null + } + } + } + + /** + * Get the first node in the list without removing it. + * @return the first node in the list. + */ + def peek(): DoublyLinkedListNode[T] = head + + def size: Int = listSize + + def iterator: Iterator[DoublyLinkedListNode[T]] = { + new IteratorTemplate[DoublyLinkedListNode[T]] { + var current = head + override protected def makeNext(): DoublyLinkedListNode[T] = { + this synchronized { + if (current != null) { + val nextNode = current + current = current.next + nextNode + } else { + allDone() + } + } + } + } + } +} diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index 066553c..8c3797a 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -150,4 +150,54 @@ class UtilsTest extends JUnitSuite { assertEquals(2, result) assertFalse("Should be unlocked", lock.isLocked) } + + @Test + def testDoublyLinkedList() { + val list = new DoublyLinkedList[Int] + + // test remove from a single-entry list. + list.add(new DoublyLinkedListNode[Int](0)) + list.remove() + assert(list.size == 0) + assert(list.peek() == null) + + // test add + for (i <- 0 to 2) { + list.add(new DoublyLinkedListNode[Int](i)) + } + val toBeRemoved1 = new DoublyLinkedListNode[Int](3) + list.add(toBeRemoved1) + for (i <- 4 to 6) { + list.add(new DoublyLinkedListNode[Int](i)) + } + val toBeRemoved2 = new DoublyLinkedListNode[Int](7) + list.add(toBeRemoved2) + + // test iterator + val iter = list.iterator + for (i <- 0 to 7) { + assert(iter.hasNext) + assert(iter.next().element == i) + } + assert(!iter.hasNext) + + // remove from head + list.remove() + assert(list.peek().element == 1) + // remove from middle + list.remove(toBeRemoved1) + // remove from tail + list.remove(toBeRemoved2) + + // List = [1,2,4,5,6] + val iter2 = list.iterator + for (i <- Array[Int](1,2,4,5,6)) { + assert(iter2.hasNext) + assert(iter2.next().element == i) + } + + // test size + assert(list.size == 5) + } + } -- 1.9.3 (Apple Git-50) From 04de4c5aaa1f2235a5209df33f0beb00a2be0078 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 29 Dec 2014 15:02:39 -0800 Subject: [PATCH 2/2] KAFKA-1805: adding equals and hashcode methods to ProducerRecord. --- .../kafka/clients/producer/ProducerRecord.java | 25 ++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index 065d4e6..e47d12a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -73,6 +73,7 @@ public final class ProducerRecord { } /** + * The key (or null if no key is specified) */ public K key() { @@ -99,4 +100,28 @@ public final class ProducerRecord { String value = this.value == null ? "null" : this.value.toString(); return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ProducerRecord)) return false; + + ProducerRecord that = (ProducerRecord) o; + + if (!key.equals(that.key)) return false; + if (!partition.equals(that.partition)) return false; + if (!topic.equals(that.topic)) return false; + if (!value.equals(that.value)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = topic.hashCode(); + result = 31 * result + partition.hashCode(); + result = 31 * result + key.hashCode(); + result = 31 * result + value.hashCode(); + return result; + } } -- 1.9.3 (Apple Git-50)