Description
Typically, the HWM is increase after one round of Fetch requests from the majority of the replicas. The HWM is propagated after another round of Fetch requests. If the LEO doesn't change the propagation of the HWM can be delay by one Fetch wait timeout (500ms).
Looking at the KafkaRaftClient implementation we would have to have an index for both the fetch offset and the last sent high-watermark for that replica.
Another issue here is that we changed the KafkaRaftManager so that it doesn't set the replica id when it is an observer/broker. Since the HWM is not part of the Fetch request the leader would have to keep track of this in the LeaderState.
val nodeId = if (config.processRoles.contains(ControllerRole)) { OptionalInt.of(config.nodeId) } else { OptionalInt.empty() }
We would need to find a better solution for https://issues.apache.org/jira/browse/KAFKA-13168 or improve the FETCH request so that it includes the HWM.