diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 2522604..39361fe 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -54,7 +54,7 @@ class FileMessageSet private[kafka](@volatile var file: File, /* if this is not a slice, update the file pointer to the end of the file */ if (!isSlice) /* set the file position to the last byte in the file */ - channel.position(channel.size) + channel.position(math.min(channel.size().toInt, end)) /** * Create a file message set with no slicing. @@ -66,12 +66,25 @@ class FileMessageSet private[kafka](@volatile var file: File, * Create a file message set with no slicing */ def this(file: File) = - this(file, CoreUtils.openChannel(file, mutable = true)) + this(file, FileMessageSet.openChannel(file, mutable = true)) + + /** + * Create a file message set with no slicing, and with initFileSize and preallocate. + * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize + * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance. + * If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue. + */ + def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) = + this(file, + channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate), + start = 0, + end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue), + isSlice = false) /** * Create a file message set with mutable option */ - def this(file: File, mutable: Boolean) = this(file, CoreUtils.openChannel(file, mutable)) + def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable)) /** * Create a slice view of the file message set that begins and ends at the given byte offsets @@ -223,10 +236,18 @@ class FileMessageSet private[kafka](@volatile var file: File, */ def close() { flush() + trim() channel.close() } /** + * Trim file when close or roll to next file + */ + def trim() { + truncateTo(sizeInBytes()) + } + + /** * Delete this message set from the filesystem * @return True iff this message set was deleted. */ @@ -272,6 +293,37 @@ class FileMessageSet private[kafka](@volatile var file: File, } } + +object FileMessageSet +{ + /** + * Open a channel for the given file + * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize + * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance. + * @param file File path + * @param mutable mutable + * @param fileAlreadyExists File already exists or not + * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024 + * @param preallocate Pre allocate file or not, gotten from configuration. + */ + def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = { + if (mutable) { + if (fileAlreadyExists) + new RandomAccessFile(file, "rw").getChannel() + else { + if (preallocate) { + val randomAccessFile = new RandomAccessFile(file, "rw") + randomAccessFile.setLength(initFileSize) + randomAccessFile.getChannel() + } + else + new RandomAccessFile(file, "rw").getChannel() + } + } + else + new FileInputStream(file).getChannel() + } +} object LogFlushStats extends KafkaMetricsGroup { val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 84e7b8f..6b9274d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -78,6 +78,13 @@ class Log(val dir: File, /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) + def initFileSize() : Int = { + if (config.preallocate) + config.segmentSize + else + 0 + } + /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() @@ -168,7 +175,8 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time) + time = time, + fileAlreadyExists = true) if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) @@ -205,7 +213,10 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time)) + time = time, + fileAlreadyExists = false, + initFileSize = this.initFileSize(), + preallocate = config.preallocate)) } else { recoverLog() // reset the index size of the currently active log segment to allow more entries @@ -586,14 +597,20 @@ class Log(val dir: File, segments.lastEntry() match { case null => - case entry => entry.getValue.index.trimToValidSize() + case entry => { + entry.getValue.index.trimToValidSize() + entry.getValue.log.trim() + } } val segment = new LogSegment(dir, startOffset = newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time) + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate) val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) @@ -687,7 +704,10 @@ class Log(val dir: File, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, - time = time)) + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate)) updateLogEndOffset(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index c9ade72..d07a391 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -344,7 +344,7 @@ private[log] class Cleaner(val id: Int, logFile.delete() val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix) indexFile.delete() - val messages = new FileMessageSet(logFile) + val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate) val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index e9af221..fc41132 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -43,6 +43,7 @@ object Defaults { val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType + val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) { @@ -64,6 +65,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp) val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase + val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -95,6 +97,7 @@ object LogConfig { val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" + val PreAllocateEnableProp = "preallocate" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -118,6 +121,7 @@ object LogConfig { val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " + "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + "no compression; and 'producer' which means retain the original compression codec set by the producer." + val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" private val configDef = { import ConfigDef.Range._ @@ -149,6 +153,8 @@ object LogConfig { MEDIUM, UncleanLeaderElectionEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) + .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, + MEDIUM, PreAllocateEnableDoc) } def apply(): LogConfig = LogConfig(new Properties()) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index ed03953..1377e8f 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -52,8 +52,8 @@ class LogSegment(val log: FileMessageSet, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) = - this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) = + this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate), new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e0b2480..c1f0cca 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -83,6 +83,7 @@ object Defaults { val LogDeleteDelayMs = 60000 val LogFlushSchedulerIntervalMs = Long.MaxValue val LogFlushOffsetCheckpointIntervalMs = 60000 + val LogPreAllocateEnable = false val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 @@ -206,6 +207,7 @@ object KafkaConfig { val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms" val LogFlushIntervalMsProp = "log.flush.interval.ms" val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms" + val LogPreAllocateProp = "log.preallocate" val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" val AutoCreateTopicsEnableProp = "auto.create.topics.enable" val MinInSyncReplicasProp = "min.insync.replicas" @@ -332,6 +334,7 @@ object KafkaConfig { val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk" val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk" val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point" + val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true." val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown" val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server" val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)" @@ -466,6 +469,7 @@ object KafkaConfig { .define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc) .define(LogFlushIntervalMsProp, LONG, HIGH, LogFlushIntervalMsDoc, false) .define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc) + .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc) .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc) .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc) .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc) @@ -609,6 +613,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp)) val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) + val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp) /** ********* Replication configuration ***********/ val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9de2a6f..52dc728 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -443,6 +443,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue) case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue) case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue) + case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue) case _ => // we just leave those out } } diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index f5d704c..168a18d 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -70,16 +70,6 @@ object CoreUtils extends Logging { Utils.daemonThread(name, runnable(fun)) /** - * Open a channel for the given file - */ - def openChannel(file: File, mutable: Boolean): FileChannel = { - if(mutable) - new RandomAccessFile(file, "rw").getChannel() - else - new FileInputStream(file).getChannel() - } - - /** * Do the given action and log any exceptions thrown without rethrowing them * @param log The log method to use for logging. E.g. logger.warn * @param action The action to execute diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index cec1cae..02cf668 100644 --- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -17,6 +17,7 @@ package kafka.log +import java.io._ import java.nio._ import java.util.concurrent.atomic._ import junit.framework.Assert._ @@ -146,5 +147,57 @@ class FileMessageSetTest extends BaseMessageSetTestCases { assertEquals(List(message), messageSet.toList) assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes) } - + + /** + * Test the new FileMessageSet with pre allocate as true + */ + @Test + def testPreallocateTrue() { + val temp = tempFile() + val set = new FileMessageSet(temp, false, 512 *1024 *1024, true) + val position = set.channel.position + val size = set.sizeInBytes() + assertEquals(0, position) + assertEquals(0, size) + assertEquals(512 *1024 *1024, temp.length) + } + + /** + * Test the new FileMessageSet with pre allocate as false + */ + @Test + def testPreallocateFalse() { + val temp = tempFile() + val set = new FileMessageSet(temp, false, 512 *1024 *1024, false) + val position = set.channel.position + val size = set.sizeInBytes() + assertEquals(0, position) + assertEquals(0, size) + assertEquals(0, temp.length) + } + + /** + * Test the new FileMessageSet with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data. + */ + @Test + def testPreallocateClearShutdown() { + val temp = tempFile() + val set = new FileMessageSet(temp, false, 512 *1024 *1024, true) + set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) + val oldposition = set.channel.position + val oldsize = set.sizeInBytes() + assertEquals(messageSet.sizeInBytes, oldposition) + assertEquals(messageSet.sizeInBytes, oldsize) + set.close() + + val tempReopen = new File(temp.getAbsolutePath()) + val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true) + val position = setReopen.channel.position + val size = setReopen.sizeInBytes() + + assertEquals(oldposition, position) + assertEquals(oldposition, size) + assertEquals(oldposition, tempReopen.length) + } + } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index c31f884..19dcb47 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -45,6 +45,7 @@ class LogConfigTest extends JUnit3Suite { case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString) case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString) + case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false")) case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) } }) diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 03fb351..bdb2fd7 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -226,5 +226,57 @@ class LogSegmentTest extends JUnit3Suite { seg.delete() } } - + + /* create a segment with pre allocate */ + def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Long = 0, preallocate: Boolean = false): LogSegment = { + val tempDir = TestUtils.tempDir() + val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate) + segments += seg + seg + } + + /* create a segment with pre allocate, put message to it and verify */ + @Test + def testCreateWithInitFileSizeAppendMessage() { + val seg = createSegment(40, false, 512*1024*1024, true) + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val ms2 = messages(60, "alpha", "beta") + seg.append(60, ms2) + val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, read.messageSet.toList) + } + + /* create a segment with pre allocate and clearly shut down*/ + @Test + def testCreateWithInitFileSizeClearShutdown() { + val tempDir = TestUtils.tempDir() + val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true) + + val ms = messages(50, "hello", "there") + seg.append(50, ms) + val ms2 = messages(60, "alpha", "beta") + seg.append(60, ms2) + val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, read.messageSet.toList) + val oldSize = seg.log.sizeInBytes() + val oldPosition = seg.log.channel.position + val oldFileSize = seg.log.file.length + assertEquals(512*1024*1024, oldFileSize) + seg.close() + //After close, file should be trimed + assertEquals(oldSize, seg.log.file.length) + + val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true) + segments += segReopen + + val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None) + assertEquals(ms2.toList, readAgain.messageSet.toList) + val size = segReopen.log.sizeInBytes() + val position = segReopen.log.channel.position + val fileSize = segReopen.log.file.length + assertEquals(oldPosition, position) + assertEquals(oldSize, size) + assertEquals(size, fileSize) + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 8268852..98a5b04 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -192,6 +192,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp) case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false")) case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false")) + case KafkaConfig.LogPreAllocateProp => expected.setProperty(name, randFrom("true", "false")) case KafkaConfig.InterBrokerSecurityProtocolProp => expected.setProperty(name, SecurityProtocol.PLAINTEXT.toString) case KafkaConfig.InterBrokerProtocolVersionProp => expected.setProperty(name, ApiVersion.latestVersion.toString)