Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16181

KafkaIdempotentRepository cache incorrectly flagged as ready

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.7.2
    • Fix Version/s: 3.7.3, 3.8.0
    • Component/s: camel-kafka
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:

       

      log.debug("Subscribing consumer to {}", topic);             
      consumer.subscribe(Collections.singleton(topic));             
      log.debug("Seeking to beginning");             
      consumer.seekToBeginning(consumer.assignment());
                   
      POLL_LOOP: while (running.get()) {                 
        log.trace("Polling");                 
        ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);                 
        if (consumerRecords.isEmpty()) {                     
          // the first time this happens, we can assume that we have consumed all messages up to this point                     
          log.trace("0 messages fetched on poll");                     
          if (cacheReadyLatch.getCount() > 0) {                         
            log.debug("Cache warmed up");                         
            cacheReadyLatch.countDown();                     
          }
        }

       

      The problem with this code is:

      1. `consumer.subscribe` doesn't instantaneously assign partitions to the consumer
      2. When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see seekToBeginning doesn't work without auto.offset.reset (apache.org) 
      3. When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to confirm the cache as ready when it isn't yet. That can cause upstream messages not been correctly de-duplicated.  

      The solution is:

      1. Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`.
      2. When partitions are assigned to the `consumer` instance, call `seekToBeginning` there.
      3. Doing an initial `poll(0)` that will never return records but will force the partition assignment process

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              javierholguera Javier Holguera
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: