Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-19972

camel-kafka manual commit configuration inconsistency

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.18.5, 3.21.1
    • None
    • camel-kafka
    • Novice

    Description

      Expected behavior

      • Should not matter if manual commits are enabled component level or endpoint level, consumer should not commit automatically when autoCommitEnable is set to false

      Actual behavior

      The behavior for manual commits is inconsistent at the moment (Test on Camel versions 3.18.5 and 3.21.1), depending on whether the KafkaComponent is configured on component or endpoint level

      • If configured on component level, will still automatically commit offsets back to broker (after records from a partition have been processed)
      • If configured on endpoint level, will never automatically commit offsets
      • autoCommitEnable is

      Configuration of manual commit for Kafka client during startup

      During startup for Kafka component, Camel context will go through the following classes and methods while setting up manual commit configuration.

      KafkaComponent

      https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java#L222

      Class: org.apache.camel.component.kafka.KafkaComponent

      Method: doInit
       

      • When the component is being initialized, will check if configuration.isAllowManualCommit() is true AND kafkaManualCommitFactory is null
        • If they are, will initialize kafkaManualCommitFactory as DefaultKafkaManualCommitFactory
        • If not, will leave kafkaManualCommitFactory as null

       

      CommitManagers

      https://github.com/apache/camel/blob/camel-3.18.5/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManagers.java#L32

      Class: org.apache.camel.component.kafka.consumer.CommitManagers

      Method: createCommitManager

      • Called when initializing Consumer
        • For clarity's sake, omitting Async-related logic
        • If configuration.isAllowManualCommit() is true AND manualCommitFactory is instance of DefaultKafkaManualCommitFactory
          • Will initialize SyncCommitManager
          • Otherwise, will initialize NoopCommitManager

      Summary

      • Configuring manual commits on component level still results in automatic commits when the changing partitions (or all records processed) since SyncCommitManager is configured
        • allowManualCommit is true when calling doInit -> initializes kafkaManualCommitFactory as DefaultKafkaManualCommitFactory -> initilize commitManager as SyncCommitManager
      • If allowManualCommit is only configured on endpoint level, it will be false when invoking doInit, therefore leaving kafkaManualCommitFactory as null, resulting in NoopCommitManager in createCommitManager

       

      Workaround

      • Configure manual commits on endpoint level

      Attachments

        Activity

          People

            Unassigned Unassigned
            samipeltola Sami Peltola
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: