Details
-
New Feature
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.24.1, 3.x
-
None
-
Unknown
-
Important
Description
1. Provide a way to rewind kafka offset to specific offset (improve seekTo ?) there is no way to do that using camel-kafka component. The main idea is to replay older kafka messages without starting from the beginning.
for example: https://blog.sysco.no/integration/kafka-rewind-consumers-offset/
boolean flag = true; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); if(flag) { Map<TopicPartition, Long> query = new HashMap<>(); query.put( new TopicPartition("simple-topic-1", 0), Instant.now().minus(10, MINUTES).toEpochMilli()); // Get offset from timestamp Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query); // Rewind offset to previous position using seekTo result.entrySet() .stream() .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset())); flag = false; } for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
2. Provide a way to access kafkaConsumer
Add camel header with reference to kafkaConsumer to be able to perform some Kafka api call.We can use the same way that we do with KafkaManualCommit
public void process(Exchange exchange) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
}
Attachments
Issue Links
- relates to
-
CAMEL-16974 camel-kafka: make the resume strategy configurable
- Resolved