Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-12732

Kafka manual commit to file repository doesn't work properly (using Spring boot)

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.22.0
    • Fix Version/s: 2.22.1, 2.23.0
    • Component/s: camel-kafka
    • Labels:
      None
    • Environment:

      Spring boot

      kafka_2.11-1.1.0

    • Estimated Complexity:
      Unknown

      Description

      I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time

       

      @Component
      public class Route extends RouteBuilder {
          @Override
          public void configure() throws Exception {
              from(kafka())
                      .to("log:TEST?level=INFO")
                      .process(Route::commitKafka);
          }
      
          private String kafka() {
      
              String kafkaEndpoint = "kafka:";
      
              kafkaEndpoint += "topictest";
              kafkaEndpoint += "?brokers=";
              kafkaEndpoint += "localhost:9092";
              kafkaEndpoint += "&groupId=";
              kafkaEndpoint += "TEST";
              kafkaEndpoint += "&autoOffsetReset=";
              kafkaEndpoint += "earliest";
              kafkaEndpoint += "&autoCommitEnable=";
              kafkaEndpoint += false;
              kafkaEndpoint += "&allowManualCommit=";
              kafkaEndpoint += true;
              kafkaEndpoint += "&offsetRepository=";
              kafkaEndpoint += "#fileStore";
      
              return kafkaEndpoint;
          }
      
          @Bean(name = "fileStore")
          private FileStateRepository fileStateRepository() {
              FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
      
              // This will be empty
              // System.out.println(fileStateRepository.getCache());
              return fileStateRepository;
          }
      
          private static void commitKafka(Exchange exchange) {
      
              KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
              manual.commitSync();
          }
      }
      
      

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                davsclaus Claus Ibsen
                Reporter:
                michael992 michael elbaz
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: