diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b2a7170..1875b38 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -318,7 +318,8 @@ private[kafka] class Log(val dir: File, } // maybe flush the log and index - maybeFlush(messageSetInfo.count) + val numAppendedMessages = (offsets._2 - offsets._1).toInt + 1 + maybeFlush(numAppendedMessages) // return the first and last offset offsets diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b8970c8..549b4b0 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -110,7 +110,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue)) /* the number of messages accumulated on a log partition before messages are flushed to disk */ - val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue)) + val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 10000, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)