The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
The problem with this code is:
- `consumer.subscribe` doesn't instantaneously assign partitions to the consumer
- When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see seekToBeginning doesn't work without auto.offset.reset (apache.org)
- When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to confirm the cache as ready when it isn't yet. That can cause upstream messages not been correctly de-duplicated.
The solution is:
- Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`.
- When partitions are assigned to the `consumer` instance, call `seekToBeginning` there.
- Doing an initial `poll(0)` that will never return records but will force the partition assignment process