Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12732

Kafka manual commit to file repository doesn't work properly (using Spring boot)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.22.0
    • 2.22.1, 2.23.0
    • camel-kafka
    • None
    • Spring boot

      kafka_2.11-1.1.0

    • Unknown

    Description

      I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time

       

      @Component
      public class Route extends RouteBuilder {
          @Override
          public void configure() throws Exception {
              from(kafka())
                      .to("log:TEST?level=INFO")
                      .process(Route::commitKafka);
          }
      
          private String kafka() {
      
              String kafkaEndpoint = "kafka:";
      
              kafkaEndpoint += "topictest";
              kafkaEndpoint += "?brokers=";
              kafkaEndpoint += "localhost:9092";
              kafkaEndpoint += "&groupId=";
              kafkaEndpoint += "TEST";
              kafkaEndpoint += "&autoOffsetReset=";
              kafkaEndpoint += "earliest";
              kafkaEndpoint += "&autoCommitEnable=";
              kafkaEndpoint += false;
              kafkaEndpoint += "&allowManualCommit=";
              kafkaEndpoint += true;
              kafkaEndpoint += "&offsetRepository=";
              kafkaEndpoint += "#fileStore";
      
              return kafkaEndpoint;
          }
      
          @Bean(name = "fileStore")
          private FileStateRepository fileStateRepository() {
              FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
      
              // This will be empty
              // System.out.println(fileStateRepository.getCache());
              return fileStateRepository;
          }
      
          private static void commitKafka(Exchange exchange) {
      
              KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
              manual.commitSync();
          }
      }
      
      

       

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              michael992 michael elbaz
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: