The FlinkKafkaConsumerBaseTest has invalid mocks on a final AbstractFetcher::commitInternalOffsetsToKafka(...) method. While an easy fix would be to simply make that method non-final, that is not ideal since it would be best that the method is left final to prevent overrides in subclasses.
This suggests that offset committing functionality is too tightly coupled with the AbstractFetcher, making it hard to perform concise tests to verify offset committing.
I suggest that we decouple record fetching and offset committing as separate services behind different interfaces. We should introduce a new interface, say KafkaOffsetCommitter, and test against that instead. Initially, we can simply let AbstractFetcher implement KafkaOffsetCommitter.