Description
We've noticed Kafka using a lot of CPU in a pretty much idle environment and tracked it down to it's DelayedItem implementation. In particular, the time conversion for how much longer to wait:
def getDelay(unit: TimeUnit): Long =
{ val elapsedMs = (SystemTime.milliseconds - createdMs) unit.convert(max(delayMs - elapsedMs, 0), unit) }does not actually convert, so Kafka ends up treating a ms value like nanoseconds, e.g. waking up every 100 ns or so. The above code should really be:
def getDelay(unit: TimeUnit): Long =
{ val elapsedMs = (SystemTime.milliseconds - createdMs) unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS) }I'll attach a patch.