From 74331968edc57d020d619dbd40c056827433bbe8 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 4 Oct 2014 15:26:26 -0700 Subject: [PATCH 1/2] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- core/src/main/scala/kafka/log/Log.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ddf97b..356921a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -253,7 +253,7 @@ class Log(val dir: File, appendInfo.firstOffset = nextOffsetMetadata.messageOffset // maybe roll the log if this segment is full - val segment = maybeRoll() + val segment = maybeRoll(validMessages.sizeInBytes) if(assignOffsets) { // assign offsets to the message set @@ -492,9 +492,9 @@ class Log(val dir: File, * Roll the log over to a new empty log segment if necessary * @return The currently active segment after (perhaps) rolling to a new segment */ - private def maybeRoll(): LogSegment = { + private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment - if (segment.size > config.segmentSize || + if (segment.size > config.segmentSize || segment.size + messagesSize < 0 || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." -- 1.8.5.2 (Apple Git-48) From 320cee6e11b511f540e6523659ccebdd11bcba8d Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 6 Oct 2014 09:47:36 -0700 Subject: [PATCH 2/2] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- core/src/main/scala/kafka/log/Log.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 356921a..8fd8ad3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -252,9 +252,6 @@ class Log(val dir: File, lock synchronized { appendInfo.firstOffset = nextOffsetMetadata.messageOffset - // maybe roll the log if this segment is full - val segment = maybeRoll(validMessages.sizeInBytes) - if(assignOffsets) { // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) @@ -282,7 +279,10 @@ class Log(val dir: File, } } - // now append to the log + // maybe roll the log if this segment is full + val segment = maybeRoll(validMessages.sizeInBytes) + + // now append to the log segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset @@ -489,7 +489,9 @@ class Log(val dir: File, def logEndOffset: Long = nextOffsetMetadata.messageOffset /** - * Roll the log over to a new empty log segment if necessary + * Roll the log over to a new empty log segment if necessary. + * if config.segmentSize is close or equal to Int.Max value + * LogSegment will be rolled before segment.size + messagesBatch.size overflows * @return The currently active segment after (perhaps) rolling to a new segment */ private def maybeRoll(messagesSize: Int): LogSegment = { -- 1.8.5.2 (Apple Git-48)