Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-20373

camel-kafka - KafkaIdempotentRepository may allow some duplicates after application restart

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.22.0, 4.2.0
    • 3.22.2, 4.4.0
    • camel-kafka
    • None
    • Unknown

    Description

      Current implementation of KafkaIdempotentRepository gets initialized as follows (after CAMEL-20218 fixed):

      • Run a separate thread: TopicPoller. The TopicPoller executes the KafkaConsumer.poll() method in a while-loop to retrieve the cached keys from the Kafka topic and to populate the local in-memory cache.
      • TopicPoller stops retrieving records from the Kafka topic when KafkaConsumer.poll() returns zero records. The empty output from poll() is considered as a flag, that all records are retrieved.
      • The main thread waits for the TopicPoller thread to finish, but no longer than 30 seconds (the value is hardcoded).

      This implementation allows partially initialized local cache due to the following reasons:
      1. If TopicPoller doesn't manage to consume all Kafka records from the topic within 30-seconds interval.
      2. If KafkaConsumer.poll() returns empty record set, despite it has not yet reached the end of the Kafka topic (this is possible).

      Hence we may have the situation, when after application restart, KafkaIdempotentRepository could not restore the local cache. Then the consumer will re-consume already processed input. This will cause duplicates.

      Proposed implementation

      • Remove asynchronous TopicPoller, retrieve all records from Kafka synchronously in KafkaIdempotentRepository.doStart()
      • Read records from the Kafka topic until end offsets are reached. Do not rely on the condition "poll() returns empty record set".

      Pseudocode:

      partitions = consumer.partitionsFor(topic)
      consumer.assign(partitions)
      consumer.seekToBeginning(partitions)
      endOffsets = consumer.endOffsets(partitions)
      while(!isReachedOffsets(consumer, endOffsets)) {
        consumerRecords = consumer.poll()
        addToLocalCache(consumerRecords)
      }
      

      This implementation makes sure, that KafkaIdempotentRepository is used in a Camel application only after all cached records are restored from the persistent storage (Kafka topic) to the local cache. It prevents duplicates from occurring after the application has restarted.

      In the case of IdempotentConsumer, a Kafka topic plays the role of a database table. We need basically the "SELECT * FROM app_state" operation. Makes little sense to run this SELECT asynchronously and rely on a partial result set.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tashoyan Arseniy Tashoyan
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: