Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-13768

camel-kafka: seek to specific offset and KafkaConsumer access

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.24.1, 3.x
    • 3.12.0
    • camel-kafka
    • None
    • Unknown
    • Important

    Description

      1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) there is no way to do that using camel-kafka component. The main idea is to replay older kafka messages without starting from the beginning.
      for example: https://blog.sysco.no/integration/kafka-rewind-consumers-offset/

      boolean flag = true;
      
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(100);
      
          if(flag) {
              Map<TopicPartition, Long> query = new HashMap<>();
              query.put(
                      new TopicPartition("simple-topic-1", 0),
                      Instant.now().minus(10, MINUTES).toEpochMilli());
      
              // Get offset from timestamp
              Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
      
              // Rewind offset to previous position using seekTo
              result.entrySet()
                      .stream()
                      .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
      
              flag = false;
          }
      
          for (ConsumerRecord<String, String> record : records)
              System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
      

      2. Provide a way to access kafkaConsumer
      Add camel header with reference to kafkaConsumer to be able to perform some Kafka api call.We can use the same way that we do with KafkaManualCommit

      public void process(Exchange exchange) {
          KafkaManualCommit manual =
              exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
          manual.commitSync();
      }
      

      Attachments

        Issue Links

          Activity

            People

              orpiske Otavio Rodolfo Piske
              michael992 michael elbaz
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: