From 0333fd4309e5587aa84fea5245d185a498526819 Mon Sep 17 00:00:00 2001 From: jqin Date: Mon, 9 Feb 2015 15:57:45 -0800 Subject: [PATCH] Fix for KAFKA-1937 --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5374280..9df98ff 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -593,7 +593,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } // remove the offset from the unackedOffsets val unackedOffsets = unackedOffsetsMap.get(topicPartition) - unackedOffsets.removeOffset(offset) + // The callback could be fired after the unacked offset map is cleared due to the previously mentioned race. + // Just ignore the callback since this message will be consumed again. + if (unackedOffsets != null) + unackedOffsets.removeOffset(offset) // Notify the rebalance callback only when all the messages handed to producer are acked. // There is a very slight chance that one message is held by producer thread and not handed to producer. // That message might have duplicate. We are not handling that here. @@ -625,7 +628,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } info("Committing offsets.") commitOffsets() - + // Clear the unackedOffsetsMap to avoid committing offsets to the partitions the consumer no longer owns. + unackedOffsetsMap.clear() // invoke custom consumer rebalance listener if (customRebalanceListener.isDefined) customRebalanceListener.get.beforeReleasingPartitions(partitionOwnership) -- 1.8.3.4 (Apple Git-47)