From dee767ff47cfad38022e0a5c5ea588e11ea24929 Mon Sep 17 00:00:00 2001 From: Ivan Simonenko Date: Mon, 1 Jun 2015 20:00:27 +0300 Subject: [PATCH] fix offsetMap overflow --- core/src/main/scala/kafka/log/LogCleaner.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index c9ade72..3efd4b2 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -559,11 +559,17 @@ private[log] class Cleaner(val id: Int, // but we may be able to fit more (if there is lots of duplication in the dirty section of the log) var offset = dirty.head.baseOffset require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) - val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong - for (segment <- dirty) { + val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt + var full = false + for (segment <- dirty if !full) { checkDone(log.topicAndPartition) - if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor) + val segmentSize = segment.nextOffset() - segment.baseOffset + require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s but offset map can fit only %d.".format(segmentSize, log.name, segment.log.file.getName, maxDesiredMapSize)) + if (map.size + segmentSize < maxDesiredMapSize){ offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) + } else{ + full = true + } } info("Offset map for log %s complete.".format(log.name)) offset -- 2.3.2 (Apple Git-55)