Current implementation of org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords's onPartitionsRevoked, uses org.apache.kafka.clients.consumer.KafkaConsumer.position(partition). This approach causes message loss when multiple processes listening to same topic for point to point messaging (like a QUEUE) type implementation.
Issue is noticed when partition gets assigned and then gets revoked in quick succession. Upon partition assignment, say at the beginning of processing offset is set to 0, and say there was no poll for this partition (may be due to earlier poll brought in bunch of records and they are still being processed). Subsequently, say partition got revoked, before polling.
In this case, as onPartitionsRevoked looks at org.apache.kafka.clients.consumer.KafkaConsumer.position(partition) to save offset state and so 0 gets saved in this case in StateRepository implementation. When the same partition get assigned again, StateRepository.getState returns 0. Since Camel KafkaConsumer treats this as last processed offset, it adds 1 to it moving pointer to offset 1. Because of this, message at offset 0 never gets processed.
Two fixes might be needed
- a) onPartitionsRevoked should look at last processed offset (possibly store 'last processed offset' for each topic/partition in a memory map) and use it to save offset
- b) Currently onPartitionsRevoked just saves offset state when an implementation of StateRepository configured. Ideally it should call KafkaFetchRecords.commitOffset so commitSync call goes through when partition revoked and no StateRepository implementation configured