Reviewed patch v4 -
1.1 Typos - enableClenaer, dedupeBufferLoadFactor (probably dedupBufferLoadFactor is better?)
"If the given key is not present" -> "If the given property is not present"
3.1 The comment for explaining log.cleaner.min.cleanable.ratio is confusing
"/* the minimum ratio of bytes of log eligible for cleaning to bytes to total bytes which a log must
contain to be eligible for cleaning */"
3.2 The config "log.retention.check.interval.ms" says the retention check is in milliseconds, but the name of the config is logCleanupIntervalMinutes and we multiple this value by 60K before passing it into LogManager
3.3 Can we document the different values for log.cleanup.policy in the comment ?
4.1 Remove unused import "import java.util.concurrent._"
4.2 entries should be updated in put() API
5.1 Rolling new log segment in %s (log = %d/%d, index = %d/%d, age = %d/%d)
This log statement got a little confusing but sofisticated. The last part of the statement should be index and last but one should be age
6.1 In the cleanSegments() API, we pass in SystemTime to the LogSegment. However, we already pass in a Time instance to LogCleaner. In order to test it independently, we can pass in MockTime to LogCleaner but we should pass in the same instance to LogSegment for it to work correctly.
6.2 In the cleanInto() API, we log a custom message in the IllegalArgumentException. I'm not sure I quite understood that. Aren't the log segments to be cleaned a mix of previously cleaned segments and yet to be cleaned ones ? Why not just use "require" like we did while building the offsetmap ?
6.3 If the server crashes in replaceSegments() after addSegment() and before asyncDeleteSegment() and let's say 2 log segments (xxxx.log,yyyy.log) were replaced with one new log segment(xxxx.log). Now, when this server restarts, the loadSegments() API will swap in the new xxxx.log.swap as xxxx.log, but it will leave yyyy.log.
6.4 Do we have a unit test to cover the grouping logic in groupSegmentsBySize() API ? It looks correct to me, but I've been bitten by several scala collection append nuances before.
6.5 Remove unused import "import java.util.concurrent.locks.ReentrantLock"
6.6 allCleanerCheckpoints() is only called from within LogCleaner. Can we make this private ?
7.1 Typo in API doc "enableClenaer" and "clenaer"
7.2 Why 3MB for the minimum buffer space per thread ? Can we keep this configurable as well ?
8.1 Can we rename configs to topicConfigs or topicOverrides ?
9.1 Fix log4j statement for the .log renaming - "Failed to change the index file suffix"
In checkpointHighWatermarks(), it is better to use fatal("Error writing to highwatermark file: ", e)
Even though this is not introduced in this patch, while reading the code, realized that the MockScheduler actually executes tasks before their nextExecution time is reached. This is because we just check if the nextExecutionTime <= now and then call task.fun() without waiting until nextExecution time.