The method void onReceived(Iterator<FetchedMessage> messages) in KafkaConsumer.MessageCallback can be more flexible with the change to Long onReceived(Iterator<FetchedMessage> messages) so that it can provide additional functionalities:
1. To return the next offset to be fetched
2. To handle offset non-existence or offset mismatch error and take action on the error
This method will return null for backward compatibility when it doesn't need to provide the next offset.
In concrete implementation, a class of a new interface KafkaOffsetProvider can be added as a member in KafkaConsumer.MessageCallback to perform the offset error handling and provide the next offset. Besides, KafkaOffsetProvider also has methods to provide the following functionalities:
1. To fetch earliest/latest offset in Kafka
2. To find the offset of a message with timestamp equal to the given timestamp in Kafka
For backward compatibility, if KafkaOffsetProvider instance is not provided, its default value will be null and none of its methods will be called.