From a305dcc83e435df60515ca6d5772370319e59374 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 4 Oct 2014 15:26:26 -0700 Subject: [PATCH 1/2] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- core/src/main/scala/kafka/log/Log.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ddf97b..356921a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -253,7 +253,7 @@ class Log(val dir: File, appendInfo.firstOffset = nextOffsetMetadata.messageOffset // maybe roll the log if this segment is full - val segment = maybeRoll() + val segment = maybeRoll(validMessages.sizeInBytes) if(assignOffsets) { // assign offsets to the message set @@ -492,9 +492,9 @@ class Log(val dir: File, * Roll the log over to a new empty log segment if necessary * @return The currently active segment after (perhaps) rolling to a new segment */ - private def maybeRoll(): LogSegment = { + private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment - if (segment.size > config.segmentSize || + if (segment.size > config.segmentSize || segment.size + messagesSize < 0 || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." -- 1.8.5.2 (Apple Git-48) From 91f846bf532e779c265f985c6c2c4ee37986df00 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 4 Oct 2014 20:10:22 -0700 Subject: [PATCH 2/2] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- core/src/main/scala/kafka/log/Log.scala | 4 +++- core/src/test/scala/unit/kafka/log/LogTest.scala | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 356921a..54349fe 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -489,7 +489,9 @@ class Log(val dir: File, def logEndOffset: Long = nextOffsetMetadata.messageOffset /** - * Roll the log over to a new empty log segment if necessary + * Roll the log over to a new empty log segment if necessary. + * if config.segmentSize is close or equal to Int.Max value + * LogSegment will be rolled before segment.size + messagesBatch.size overflows * @return The currently active segment after (perhaps) rolling to a new segment */ private def maybeRoll(messagesSize: Int): LogSegment = { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 577d102..5abd494 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -111,6 +111,24 @@ class LogTest extends JUnitSuite { } /** + * Test the log roll when configured.segmentSize is close or equal to Int.MaxValue + */ + @Test + def testMaxSegmentSizeRoll() { + val set = TestUtils.singleMessageSet("test logfile config.segmentSize set to Int.MaxValue".getBytes) + val setSize = set.sizeInBytes + // create a log + val log = new Log(logDir, logConfig.copy(segmentSize = Int.MaxValue, maxIndexSize = 10 * 1024 * 1024), recoveryPoint = 0L, time.scheduler, time = time) + assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) + + for (i<- 1 to (Int.MaxValue / setSize) + setSize) { + log.append(set) + } + assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) + } + + + /** * Test that we can open and append to an empty log */ @Test -- 1.8.5.2 (Apple Git-48)