Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12525

camel-kafka component commits the offset as soon as it is retrieved

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.21.0
    • 2.22.1, 2.23.0
    • camel-kafka
    • None
    • Linux

    • Unknown

    Description

      I am trying the maual commit from consumer below is the code snippet, i want to consume and  commit the message after 2 mins of its arrival in the topic. My consumer retrieves and checks the time difference if it is above 2 mins then it should commit. But message once retrieved and not committed manually. I am expecting it to come back but it does not comeback ever.  when i try creating kafka consumer it works fine

      public void configure() throws Exception {
      from("kafka:BENEFITSLOADER.LOAD?brokers=xxxx:9092,xxxx:9092,xxxx:9092&groupId=BENEFITSLOADER&consumersCount=1&pollTimeoutMs=1000&autoCommitEnable=false&allowManualCommit=true&maxPollRecords=1")
      .process(new Processor() {
      @Override
      public void process(Exchange exchange) throws Exception {

      Long msgDateTime = (Long) exchange.getIn().getHeaders().get(KafkaConstants.TIMESTAMP);
      System.out.println("Message : " + (exchange.getIn().getHeaders()));
      System.out.println("Message : " + (exchange.getIn().getBody()));
      Date msgDate = new Date(msgDateTime);
      Date currentDate = new Date();
      long diff = currentDate.getTime() - msgDate.getTime();
      long diffMinutes = diff / (60 * 1000) % 60;
      System.out.println("Difference in Minutes " + diffMinutes);
      KafkaManualCommit manualCommit = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
      if(diffMinutes > 2)

      { System.out.println("Commiting Message " + exchange.getIn().getBody()); manualCommit.commitSync(); }

      }
      });
      }
      }

       

       

      Code that works fine

       

      public class TestKafkaConsumer {
      static Consumer<String, String> consumer = null;
      static ConsumerRecord<String,String> fetchedRecord;
      static ConsumerRecords<String, String> records;
      public static void main(String... args) {

      String topicName = "BENEFITSLOADER.LOAD";
      consumer = createConsumer();
      consumer.subscribe(Collections.singletonList(topicName));

      try {
      while (true) {

      if(fetchedRecord == null)
      records = consumer.poll(1000);

      records.forEach(record ->

      { fetchedRecord = record; }

      );

      if(fetchedRecord != null)
      {
      Date msgDate = new Date(fetchedRecord.timestamp());
      Date date = new Date(System.currentTimeMillis());
      long diff = date.getTime() - msgDate.getTime();
      long diffMinutes = diff / (60 * 1000) % 60;

      System.out.printf("Consumer Record%s, %s, %d, %d)\n",
      fetchedRecord.key(), fetchedRecord.value(),
      fetchedRecord.partition(), fetchedRecord.offset());
      if(diffMinutes > 2)

      { System.out.printf("Consumer Record Commiting:(%s, %s, %d, %d)\n", fetchedRecord.key(), fetchedRecord.value(), fetchedRecord.partition(), fetchedRecord.offset()); consumer.commitSync(); System.out.println("Commited"); fetchedRecord = null; }

      }
      }
      }

      catch (Exception ex)

      { ex.printStackTrace(); }

      finally

      { consumer.close(); }

      }

      private static Consumer<String, String> createConsumer()

      { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx:9092,xxx:9092,xxx:9093"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "BENEFITSLOADER"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); return new KafkaConsumer<>(props); }

      }

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              Chandwani Mukesh
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: