Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every segment listed in the `oldSegments`. oldSegments should be constructed from Log.segments and only contain segments listed in Log.segments.
However, Log.segments may be modified between the time oldSegments is determined to the time Log.replaceSegments() is called. If there are concurrent async deletion of the same log segment file, Log.replaceSegments() will call asyncDeleteSegment() for a segment that does not exist and Kafka server may shutdown the log directory due to NoSuchFileException.
This is likely the root cause of
Given the understanding of the problem, we should be able to fix the issue by only deleting segment if the segment can be found in Log.segments.