Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.22.0
-
None
-
Spring boot
kafka_2.11-1.1.0
-
Unknown
Description
I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time
@Component public class Route extends RouteBuilder { @Override public void configure() throws Exception { from(kafka()) .to("log:TEST?level=INFO") .process(Route::commitKafka); } private String kafka() { String kafkaEndpoint = "kafka:"; kafkaEndpoint += "topictest"; kafkaEndpoint += "?brokers="; kafkaEndpoint += "localhost:9092"; kafkaEndpoint += "&groupId="; kafkaEndpoint += "TEST"; kafkaEndpoint += "&autoOffsetReset="; kafkaEndpoint += "earliest"; kafkaEndpoint += "&autoCommitEnable="; kafkaEndpoint += false; kafkaEndpoint += "&allowManualCommit="; kafkaEndpoint += true; kafkaEndpoint += "&offsetRepository="; kafkaEndpoint += "#fileStore"; return kafkaEndpoint; } @Bean(name = "fileStore") private FileStateRepository fileStateRepository() { FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat")); // This will be empty // System.out.println(fileStateRepository.getCache()); return fileStateRepository; } private static void commitKafka(Exchange exchange) { KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); manual.commitSync(); } }
Attachments
Issue Links
- is related to
-
CAMEL-12525 camel-kafka component commits the offset as soon as it is retrieved
- Resolved
- links to
(2 links to)