From 5d074d67e6392aa948b63732da2b6bf25f60a7dd Mon Sep 17 00:00:00 2001 From: jqin Date: Wed, 11 Mar 2015 11:25:59 -0700 Subject: [PATCH] Because the send callback could be fired in producer.send() as well, so unacked offset needs to be added to unacked offsets list before call producer.send() --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index bafa379..e73331c 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -539,17 +539,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { this.producer.send(record).get() unackedOffsetsMap.getAndMaybePut(sourceTopicPartition).maybeUpdateMaxOffsetSeen(sourceOffset) } else { - val unackedOffsets = unackedOffsetsMap.getAndMaybePut(sourceTopicPartition) - // synchronize to ensure that addOffset precedes removeOffset - unackedOffsets synchronized { - val unackedOffset = new UnackedOffset(sourceOffset) - this.producer.send(record, - new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value)) - // add offset to unackedOffsets - unackedOffsets.addOffset(unackedOffset) - numUnackedMessages.incrementAndGet() - } + val unackedOffset = new UnackedOffset(sourceOffset) + // add offset to unackedOffsets + unackedOffsets.addOffset(unackedOffset) + numUnackedMessages.incrementAndGet() + this.producer.send(record, + new MirrorMakerProducerCallback(sourceTopicPartition, unackedOffset, key, value)) } } -- 1.8.3.4 (Apple Git-47)