Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14054

Unexpected client shutdown as TimeoutException is thrown as IllegalStateException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.0, 3.2.0, 3.1.1
    • 3.5.0, 3.4.1
    • streams
    • None

    Description

       Re: https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2

      1) TimeoutException is thrown as IllegalStateException in org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded. Which causes the client to shutdown in org.apache.kafka.streams.KafkaStreams#getActionForThrowable.

      2) Should Timeout be a recoverable error which is expected to be handled by User?

      3) This issue is exposed by change KAFKA-12887 which was introduced in kafka-streams ver 3.1.0

      code referenced

      org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded
      public boolean commitNeeded() {
              if (commitNeeded) {
                  return true;
              } else {
                  for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
                      final TopicPartition partition = entry.getKey();
                      try {
                          final long offset = mainConsumer.position(partition);
                          if (offset > entry.getValue() + 1) {
                              commitNeeded = true;
                              entry.setValue(offset - 1);
                          }
                      } catch (final TimeoutException error) {
                          // the `consumer.position()` call should never block, because we know that we did process data
                          // for the requested partition and thus the consumer should have a valid local position
                          // that it can return immediately
      
                          // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException`
                          throw new IllegalStateException(error);
                      } catch (final KafkaException fatal) {
                          throw new StreamsException(fatal);
                      }
                  }
      
                  return commitNeeded;
              }
          }
      
      org.apache.kafka.streams.KafkaStreams#getActionForThrowable
      private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable,
                                                                                                      final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
              final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action;
              if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
                  action = SHUTDOWN_CLIENT;
              } else {
                  action = streamsUncaughtExceptionHandler.handle(throwable);
              }
              return action;
          }
      
          private void handleStreamsUncaughtException(final Throwable throwable,
                                                      final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
                                                      final boolean skipThreadReplacement) {
              final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
              if (oldHandler) {
                  log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." +
                          "The old handler will be ignored as long as a new handler is set.");
              }
              switch (action) {
                  case REPLACE_THREAD:
                      if (!skipThreadReplacement) {
                          log.error("Replacing thread in the streams uncaught exception handler", throwable);
                          replaceStreamThread(throwable);
                      } else {
                          log.debug("Skipping thread replacement for recoverable error");
                      }
                      break;
                  case SHUTDOWN_CLIENT:
                      log.error("Encountered the following exception during processing " +
                              "and Kafka Streams opted to " + action + "." +
                              " The streams client is going to shut down now. ", throwable);
                      closeToError();
                      break;
      

       Stacktrace

      error log kafka-streams v. 3.1.0
      2022-06-22 13:58:35,796 ERROR thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
      org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
      Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
              at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185)
              at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111)
              at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084)
              at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071)
              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817)
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
              ... 1 common frames omitted
      Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined
      2022-06-22 13:58:35,796  INFO thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] State transition from RUNNING to PENDING_ERROR
      

      Attachments

        Issue Links

          Activity

            People

              mjsax Matthias J. Sax
              donald.n Donald
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: