Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14553

RecordAccumulator hangs in infinite NOP loop

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 3.3.1
    • 3.3.2
    • clients
    • None
    • - Spring Boot 3.0.1
      - Spring Cloud 2022.0.0

      Versions of dependencies are defined in boms of SB and SC:

      - micrometer-tracing-bridge-brave 1.0.0
      - zipkin-reporter-brave 2.16.3
      - zipkin-sender-kafka 2.16.3

    Description

      Summary:
      There is an infinite loop in RecordAccumulator, if stickyBatchSize is configured to be 0 in BuiltinPartitioner.
      (Which is the default case when using KafkaSender's default Builder.)

      Details:
      The infinite loop is caused by this while:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
      and this continue particularly:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
      because the partitionChanged() call in the condition always return true if batchSize is 0.

      So program flow never reaches this point:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
      Thus no span data sent to Kafka ever.

      The problematic line in partitionChanged() is when it calls an update on the BuiltInPartitioner:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
      which in fact always updates the partition because of this condition:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
      therefore the next confdition in RecordAccumulator will evaluate to true also:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
      thus returning 'true' and forcing the 'continue' in the while(true) loop.

      Suggested fix:
      I think these conditions should be changed:
      https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
      The equal signs should be removed from the conditions:

      if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > stickyBatchSize * 2) {

      (Btw: line 213 also needs this modification.)

      Note:
      The problem arises because KafkaSender sets the batchSize to 0.
      https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88

      Workaround:
      Simply set the batch size greater than zero.

      @Configuration
      public class SenderConfiguration {
      
          @Bean
          KafkaSender kafkaSender() {
              Properties overrides = new Properties();
              overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
              return KafkaSender.newBuilder()
                  .bootstrapServers("localhost:9092")
                  .topic("zipkin")
                  .overrides(overrides)
                  .build();
          }
      }

      Using:

      • Spring Boot 3.0.1
      • Spring Cloud 2022.0.0

      pom.xml (fragment):

              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-autoconfigure</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-actuator</artifactId>
              </dependency>
              <dependency>
                  <groupId>io.micrometer</groupId>
                  <artifactId>micrometer-registry-prometheus</artifactId>
              </dependency>
              <dependency>
                  <groupId>io.micrometer</groupId>
                  <artifactId>micrometer-tracing-bridge-brave</artifactId>
              </dependency>
              <dependency>
                  <groupId>io.zipkin.reporter2</groupId>
                  <artifactId>zipkin-reporter-brave</artifactId>
              </dependency>
              <dependency>
                  <groupId>io.zipkin.reporter2</groupId>
                  <artifactId>zipkin-sender-kafka</artifactId>
              </dependency>

      Everything is on default settings, except a KafkaSender is explicitely created as illustrated above. (No autoconfiguration available for Kafka sender.)

      Attachments

        Issue Links

          Activity

            People

              showuon Luke Chen
              gviczai Viczai Gábor
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: