Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-13339

Partition revoke implemented to save offset state using KafkaConsumer.position API results in message loss

    XMLWordPrintableJSON

    Details

    • Estimated Complexity:
      Unknown

      Description

      Current implementation of org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords's onPartitionsRevoked, uses org.apache.kafka.clients.consumer.KafkaConsumer.position(partition). This approach causes message loss when multiple processes listening to same topic for point to point messaging (like a QUEUE) type implementation.

       

      Issue is noticed when partition gets assigned and then gets revoked in quick succession. Upon partition assignment, say at the beginning of processing offset is set to 0, and say there was no poll for this partition (may be due to earlier poll brought in bunch of records and they are still being processed). Subsequently, say partition got revoked, before polling.

      In this case, as onPartitionsRevoked looks at org.apache.kafka.clients.consumer.KafkaConsumer.position(partition) to save offset state and so 0 gets saved in this case in StateRepository implementation. When the same partition get assigned again, StateRepository.getState returns 0. Since Camel KafkaConsumer treats this as last processed offset, it adds 1 to it moving pointer to offset 1. Because of this, message at offset 0 never gets processed.

       

      Two fixes might be needed

      1. a) onPartitionsRevoked should look at last processed offset (possibly store 'last processed offset' for each topic/partition in a memory map) and use it to save offset
      2. b) Currently onPartitionsRevoked just saves offset state when an implementation of StateRepository configured. Ideally it should call KafkaFetchRecords.commitOffset so commitSync call goes through when partition revoked and no StateRepository implementation configured

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                davsclaus Claus Ibsen
                Reporter:
                viswaram Viswa Ramamoorthy
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m