Description
The problem we saw can be explained by the example below:
1. Message offset 100 is appended to partition p0, log segment 00000000.log. at time T. After that no message is appended.
2. This message is replicated, leader replica update its highWatermark.messageOffset=100, highWatermark.segmentBaseOffset=0.
3. At time T + retention.ms, because no message has been appended to current active log segment for retention.ms, the last modified time of the current log segment reaches retention time.
4. Broker rolls out a new log segment 00000001.log, and deletes the old log segment 00000000.log. The new log segment in this case is empty because there is no message appended.
5. In Log, the nextOffsetMetadata.segmentBaseOffset will be updated to the new log segment's base offset, but nextOffsetMetadata.messageOffset does not change. so nextOffsetMetadata.messageOffset=1, nextOffsetMetadata.segmentBaseOffset=1.
6. Now a FetchRequest comes and try to fetch from offset 1, fetch.wait.max.ms=1000.
7. In ReplicaManager, because there is no data to return, the fetch request will be put into purgatory. When delayedFetchPurgatory.tryCompleteElseWatch() is called, the DelayedFetch.tryComplete() compares replica.highWatermark and the fetchOffset returned by log.read(), it will see the replica.highWatermark.segmentBaseOffset=0 and fetchOffset.segmentBaseOffset=1. So it will assume the fetch occurs on a later segment and complete the delayed fetch immediately.
In this case, the replica.highWatermark was not updated because the LogOffsetMetadata.preceds() only checks the messageOffset but ignored segmentBaseOffset. The fix is to let LogOffsetMetadata first check the messageOffset then check the segmentBaseOffset. So replica.highWatermark will get updated after the follower fetches from the leader.