Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-13768

Seek to specific offset and KafkaConsumer access

    XMLWordPrintableJSON

    Details

    • Type: New Feature
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.24.1
    • Fix Version/s: 3.x
    • Component/s: camel-kafka
    • Labels:
      None
    • Estimated Complexity:
      Unknown
    • Flags:
      Important

      Description

      1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) there is no way to do that using camel-kafka component. The main idea is to replay older kafka messages without starting from the beginning.
      for example: https://blog.sysco.no/integration/kafka-rewind-consumers-offset/

      boolean flag = true;
      
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(100);
      
          if(flag) {
              Map<TopicPartition, Long> query = new HashMap<>();
              query.put(
                      new TopicPartition("simple-topic-1", 0),
                      Instant.now().minus(10, MINUTES).toEpochMilli());
      
              // Get offset from timestamp
              Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
      
              // Rewind offset to previous position using seekTo
              result.entrySet()
                      .stream()
                      .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
      
              flag = false;
          }
      
          for (ConsumerRecord<String, String> record : records)
              System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
      

      2. Provide a way to access kafkaConsumer
      Add camel header with reference to kafkaConsumer to be able to perform some Kafka api call.We can use the same way that we do with KafkaManualCommit

      public void process(Exchange exchange) {
          KafkaManualCommit manual =
              exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
          manual.commitSync();
      }
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              michael992 michael elbaz
            • Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: