From 74331968edc57d020d619dbd40c056827433bbe8 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 4 Oct 2014 15:26:26 -0700 Subject: [PATCH 1/4] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- core/src/main/scala/kafka/log/Log.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ddf97b..356921a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -253,7 +253,7 @@ class Log(val dir: File, appendInfo.firstOffset = nextOffsetMetadata.messageOffset // maybe roll the log if this segment is full - val segment = maybeRoll() + val segment = maybeRoll(validMessages.sizeInBytes) if(assignOffsets) { // assign offsets to the message set @@ -492,9 +492,9 @@ class Log(val dir: File, * Roll the log over to a new empty log segment if necessary * @return The currently active segment after (perhaps) rolling to a new segment */ - private def maybeRoll(): LogSegment = { + private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment - if (segment.size > config.segmentSize || + if (segment.size > config.segmentSize || segment.size + messagesSize < 0 || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." -- 1.8.5.2 (Apple Git-48) From 320cee6e11b511f540e6523659ccebdd11bcba8d Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 6 Oct 2014 09:47:36 -0700 Subject: [PATCH 2/4] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- core/src/main/scala/kafka/log/Log.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 356921a..8fd8ad3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -252,9 +252,6 @@ class Log(val dir: File, lock synchronized { appendInfo.firstOffset = nextOffsetMetadata.messageOffset - // maybe roll the log if this segment is full - val segment = maybeRoll(validMessages.sizeInBytes) - if(assignOffsets) { // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) @@ -282,7 +279,10 @@ class Log(val dir: File, } } - // now append to the log + // maybe roll the log if this segment is full + val segment = maybeRoll(validMessages.sizeInBytes) + + // now append to the log segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset @@ -489,7 +489,9 @@ class Log(val dir: File, def logEndOffset: Long = nextOffsetMetadata.messageOffset /** - * Roll the log over to a new empty log segment if necessary + * Roll the log over to a new empty log segment if necessary. + * if config.segmentSize is close or equal to Int.Max value + * LogSegment will be rolled before segment.size + messagesBatch.size overflows * @return The currently active segment after (perhaps) rolling to a new segment */ private def maybeRoll(messagesSize: Int): LogSegment = { -- 1.8.5.2 (Apple Git-48) From e3b61218507f3ba85df518e71756b5b36927265a Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 7 Oct 2014 11:59:52 -0700 Subject: [PATCH 3/4] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- .../common/MessageSetSizeTooLargeException.scala | 22 +++++++++++++++++++++ core/src/main/scala/kafka/log/Log.scala | 23 +++++++++++++++++----- core/src/test/scala/unit/kafka/log/LogTest.scala | 6 +++--- 3 files changed, 43 insertions(+), 8 deletions(-) create mode 100644 core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala diff --git a/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala new file mode 100644 index 0000000..94a616e --- /dev/null +++ b/core/src/main/scala/kafka/common/MessageSetSizeTooLargeException.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +class MessageSetSizeTooLargeException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8fd8ad3..361963a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -267,6 +267,13 @@ class Log(val dir: File, throw new IllegalArgumentException("Out of order offsets found in " + messages) } + // check messages set size may be exceed config.segmentSize + if(validMessages.sizeInBytes > config.segmentSize) { + throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d." + .format(validMessages.sizeInBytes, config.segmentSize)) + } + + // re-validate message sizes since after re-compression some may exceed the limit for(messageAndOffset <- validMessages.shallowIterator) { if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { @@ -281,8 +288,8 @@ class Log(val dir: File, // maybe roll the log if this segment is full val segment = maybeRoll(validMessages.sizeInBytes) - - // now append to the log + + // now append to the log segment.append(appendInfo.firstOffset, validMessages) // increment the log end offset @@ -490,13 +497,19 @@ class Log(val dir: File, /** * Roll the log over to a new empty log segment if necessary. - * if config.segmentSize is close or equal to Int.Max value - * LogSegment will be rolled before segment.size + messagesBatch.size overflows + * + * @param messagesSize The messages set size in bytes + * logSegment will be rolled if one of the following conditions met + *
    + *
  1. The logSegment is full + *
  2. The maxTime has elapsed + *
  3. The index is full + *
* @return The currently active segment after (perhaps) rolling to a new segment */ private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment - if (segment.size > config.segmentSize || segment.size + messagesSize < 0 || + if (segment.size > config.segmentSize - messagesSize || segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 577d102..ccf1201 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -239,7 +239,7 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -375,7 +375,7 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes val msgPerSeg = 10 - val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages + val segmentSize = msgPerSeg * setSize // each segment will be 10 messages // create a log val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time) @@ -429,7 +429,7 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes val msgPerSeg = 10 - val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages + val segmentSize = msgPerSeg * setSize // each segment will be 10 messages val config = logConfig.copy(segmentSize = segmentSize) val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) -- 1.8.5.2 (Apple Git-48) From adc8de7c40513ece16bfdf07c14d6a24d14f5158 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 7 Oct 2014 13:25:38 -0700 Subject: [PATCH 4/4] KAFKA-1670. Corrupt log files for segment.bytes values close to Int.MaxInt. --- .../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 28 +++++++++++++++++----- .../scala/unit/kafka/server/LogOffsetTest.scala | 8 +++---- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 59bd8a9..90cd530 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -114,7 +114,7 @@ class LogManagerTest extends JUnit3Suite { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes logManager.shutdown() - val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L) + val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L) logManager = createLogManager() logManager.startup diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ccf1201..a0cbd3b 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -18,15 +18,13 @@ package kafka.log import java.io._ -import java.util.ArrayList import java.util.concurrent.atomic._ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.message._ -import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException} +import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException, MessageSetSizeTooLargeException} import kafka.utils._ -import scala.Some import kafka.server.KafkaConfig class LogTest extends JUnitSuite { @@ -286,7 +284,26 @@ class LogTest extends JUnitSuite { } /** - * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * MessageSet size shouldn't exceed the config.segmentSize, check that it is properly enforced by + * appending a message set larger than the config.segmentSize setting and checking that an exception is thrown. + */ + @Test + def testMessageSetSizeCheck() { + val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) + // append messages to log + val configSegmentSize = messageSet.sizeInBytes - 1 + val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time) + + try { + log.append(messageSet) + fail("message set should throw MessageSetSizeTooLargeException.") + } catch { + case e: MessageSetSizeTooLargeException => // this is good + } + } + + /** + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the * setting and checking that an exception is thrown. */ @Test @@ -305,10 +322,9 @@ class LogTest extends JUnitSuite { log.append(second) fail("Second message set should throw MessageSizeTooLargeException.") } catch { - case e: MessageSizeTooLargeException => // this is good + case e: MessageSizeTooLargeException => // this is good } } - /** * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. */ diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 9556ed9..c06ee75 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -92,7 +92,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.flush() val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10) - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) @@ -101,7 +101,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { replicaId = 0) val consumerOffsets = simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), consumerOffsets) // try to fetch using latest offset val fetchResponse = simpleConsumer.fetch( @@ -155,14 +155,14 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10) - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be elected") val topicAndPartition = TopicAndPartition(topic, part) val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) val consumerOffsets = simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets - assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets) + assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), consumerOffsets) } @Test -- 1.8.5.2 (Apple Git-48)