Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16181

KafkaIdempotentRepository cache incorrectly flagged as ready

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.7.2
    • 3.7.3, 3.8.0
    • camel-kafka
    • None
    • Unknown

    Description

      The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:

       

      log.debug("Subscribing consumer to {}", topic);             
      consumer.subscribe(Collections.singleton(topic));             
      log.debug("Seeking to beginning");             
      consumer.seekToBeginning(consumer.assignment());
                   
      POLL_LOOP: while (running.get()) {                 
        log.trace("Polling");                 
        ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);                 
        if (consumerRecords.isEmpty()) {                     
          // the first time this happens, we can assume that we have consumed all messages up to this point                     
          log.trace("0 messages fetched on poll");                     
          if (cacheReadyLatch.getCount() > 0) {                         
            log.debug("Cache warmed up");                         
            cacheReadyLatch.countDown();                     
          }
        }

       

      The problem with this code is:

      1. `consumer.subscribe` doesn't instantaneously assign partitions to the consumer
      2. When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see seekToBeginning doesn't work without auto.offset.reset (apache.org) 
      3. When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to confirm the cache as ready when it isn't yet. That can cause upstream messages not been correctly de-duplicated.  

      The solution is:

      1. Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`.
      2. When partitions are assigned to the `consumer` instance, call `seekToBeginning` there.
      3. Doing an initial `poll(0)` that will never return records but will force the partition assignment process

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            javierholguera Javier Holguera
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment