Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15259

Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.1
    • None
    • streams
    • None

    Description

      [Problem]

      • Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using exactly_once.
        • "CONTINUE will signal that Streams should ignore the issue and continue processing"(1), so Kafka Streams should continue processing even if using exactly_once when ProductionExceptionHandlerResponse.CONTINUE used.
        • However, if using exactly_once, Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 

      [Environment]

      • Kafka Streams 3.5.1

      [Reproduction procedure]

      1. Create "input-topic" topic and "output-topic"
      2. Put several messages on "input-topic"
      3. Execute a simple Kafka streams program that transfers too large messages from "input-topic" to "output-topic" with exactly_once and returns ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the producer. Please refer to the reproducer program (attached file: Reproducer.java).
      4. ==> However, Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread shutdown as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to the debug log (attached file: app_exactly_once.log).
        1. My excepted behavior is that Kafka Streams should continue processing even if using exactly_once. when ProductionExceptionHandlerResponse.CONTINUE used.

      [As far as my investigation]

      • FYI, if using at_least_once instead of exactly_once, Kafka Streams continue processing without rollback when ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the debug log (attached file: app_at_least_once.log).
      • "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs.

      (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler

      (2) Transaction abort and shutdown occur

      2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Exception occurred during message send:
      org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] stream-task [0_0] Error encountered sending record to topic output-topic for task 0_0 due to:
      org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.
      org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Transiting to abortable error state due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      2023-07-26 21:27:19 DEBUG TransactionManager:986 - [Producer clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer, transactionalId=java-kafka-streams-0_0] Transition from state IN_TRANSACTION to error state ABORTABLE_ERROR
      org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      2023-07-26 21:27:19 DEBUG StreamThread:825 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Processed 1 records with 1 iterations; invoking punctuators if necessary
      2023-07-26 21:27:19 DEBUG StreamThread:837 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 0 punctuators ran.
      2023-07-26 21:27:19 DEBUG StreamThread:1117 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Committing all active tasks [0_0] and standby tasks [] since 273ms has elapsed (commit interval is 100ms)
      2023-07-26 21:27:19 DEBUG RecordCollectorImpl:345 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] stream-task [0_0] Flushing record collector
      2023-07-26 21:27:19 DEBUG StreamTask:419 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] task [0_0] Prepared RUNNING task for committing
      2023-07-26 21:27:19 DEBUG TaskExecutor:176 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Committing task offsets {0_0={input-topic-0=OffsetAndMetadata{offset=629, leaderEpoch=null, metadata='AgAAAYmSGuLv'}}}
      2023-07-26 21:27:19 ERROR KafkaStreams:537 - stream-client [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 
      org.apache.kafka.streams.errors.StreamsException: Error encountered trying to commit a transaction [stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] stream-task [0_0]]
      	at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:313) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.TaskExecutor.commitOffsetsOrTransaction(TaskExecutor.java:186) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:154) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1678) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1646) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1121) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:840) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) [kafka-streams-3.5.1.jar:?]
      Caused by: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
      	at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010) ~[kafka-clients-3.5.1.jar:?]
      	at org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:306) ~[kafka-clients-3.5.1.jar:?]
      	at org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:757) ~[kafka-clients-3.5.1.jar:?]
      	at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:300) ~[kafka-streams-3.5.1.jar:?]
      	... 8 more
      Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 11000088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      2023-07-26 21:27:19 INFO  KafkaStreams:340 - stream-client [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] State transition from RUNNING to PENDING_ERROR
      2023-07-26 21:27:19 INFO  StreamThread:239 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
      2023-07-26 21:27:19 INFO  StreamThread:1182 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Shutting down unclean
      2023-07-26 21:27:19 INFO  StreamThread:1168 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Informed to shut down
      2023-07-26 21:27:19 DEBUG StreamThread:224 - stream-thread [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] Ignoring request to transit from PENDING_SHUTDOWN to PENDING_SHUTDOWN: only DEAD state is a valid next state
      2023-07-26 21:27:19 INFO  KafkaStreams:1367 - stream-client [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutting down 1 stream threads
      2023-07-26 21:27:19 DEBUG KafkaStreams:1374 - stream-client [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d] Shutdown java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1 complete
      

      Attachments

        1. Reproducer.java
          3 kB
          Tomonari Yamashita
        2. app_exactly_once.log
          203 kB
          Tomonari Yamashita
        3. app_at_least_once.log
          1.20 MB
          Tomonari Yamashita

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tyamashi-oss Tomonari Yamashita
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: