From 35e7dcfbbd60c43bb5cce80e1de0ee37ddab6430 Mon Sep 17 00:00:00 2001 From: jqin Date: Sat, 7 Mar 2015 11:08:33 -0800 Subject: [PATCH] Patch for KAFKA-2009, fix mirror maker UnackedOffset.removeOffset synchronization and a trace logging issue. --- core/src/main/scala/kafka/tools/MirrorMaker.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 5374280..bafa379 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -589,7 +589,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { "of skipped unacked messages is" + numSkippedUnackedMessages.incrementAndGet()) super.onCompletion(metadata, exception) } else { - trace("Updating offset for %s to %d".format(topicPartition, offset)) + trace("Updating offset for %s to %d".format(topicPartition, offset.element)) } // remove the offset from the unackedOffsets val unackedOffsets = unackedOffsetsMap.get(topicPartition) @@ -662,7 +662,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def removeOffset(offset: DoublyLinkedListNode[Long]) { - offsetList.remove(offset) + this synchronized { + offsetList.remove(offset) + } } def getOffsetToCommit: Long = { -- 1.8.3.4 (Apple Git-47)