Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.7.2
-
None
-
Unknown
Description
The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
log.debug("Subscribing consumer to {}", topic); consumer.subscribe(Collections.singleton(topic)); log.debug("Seeking to beginning"); consumer.seekToBeginning(consumer.assignment()); POLL_LOOP: while (running.get()) { log.trace("Polling"); ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs); if (consumerRecords.isEmpty()) { // the first time this happens, we can assume that we have consumed all messages up to this point log.trace("0 messages fetched on poll"); if (cacheReadyLatch.getCount() > 0) { log.debug("Cache warmed up"); cacheReadyLatch.countDown(); } }
The problem with this code is:
- `consumer.subscribe` doesn't instantaneously assign partitions to the consumer
- When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see seekToBeginning doesn't work without auto.offset.reset (apache.org)
- When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to confirm the cache as ready when it isn't yet. That can cause upstream messages not been correctly de-duplicated.
The solution is:
- Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`.
- When partitions are assigned to the `consumer` instance, call `seekToBeginning` there.
- Doing an initial `poll(0)` that will never return records but will force the partition assignment process
Attachments
Attachments
Issue Links
- is related to
-
CAMEL-20218 KafkaIdempotentRepository cache incorrectly flagged as ready
- Resolved
- links to