Description
1) TimeoutException is thrown as IllegalStateException in org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded. Which causes the client to shutdown in org.apache.kafka.streams.KafkaStreams#getActionForThrowable.
2) Should Timeout be a recoverable error which is expected to be handled by User?
3) This issue is exposed by change KAFKA-12887 which was introduced in kafka-streams ver 3.1.0
code referenced
org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded
public boolean commitNeeded() { if (commitNeeded) { return true; } else { for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { final TopicPartition partition = entry.getKey(); try { final long offset = mainConsumer.position(partition); if (offset > entry.getValue() + 1) { commitNeeded = true; entry.setValue(offset - 1); } } catch (final TimeoutException error) { // the `consumer.position()` call should never block, because we know that we did process data // for the requested partition and thus the consumer should have a valid local position // that it can return immediately // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` throw new IllegalStateException(error); } catch (final KafkaException fatal) { throw new StreamsException(fatal); } } return commitNeeded; } }
org.apache.kafka.streams.KafkaStreams#getActionForThrowable
private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action; if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) { action = SHUTDOWN_CLIENT; } else { action = streamsUncaughtExceptionHandler.handle(throwable); } return action; } private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final boolean skipThreadReplacement) { final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); if (oldHandler) { log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + "The old handler will be ignored as long as a new handler is set."); } switch (action) { case REPLACE_THREAD: if (!skipThreadReplacement) { log.error("Replacing thread in the streams uncaught exception handler", throwable); replaceStreamThread(throwable); } else { log.debug("Skipping thread replacement for recoverable error"); } break; case SHUTDOWN_CLIENT: log.error("Encountered the following exception during processing " + "and Kafka Streams opted to " + action + "." + " The streams client is going to shut down now. ", throwable); closeToError(); break;
Stacktrace
error log kafka-streams v. 3.1.0
2022-06-22 13:58:35,796 ERROR thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) Caused by: java.lang.IllegalStateException: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined at org.apache.kafka.streams.processor.internals.StreamTask.commitNeeded(StreamTask.java:1185) at org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1111) at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1084) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1071) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:817) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604) ... 1 common frames omitted Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition com_stmartin_hammer_v3_command_pte_hammercommand--demo--compacted-4 could be determined 2022-06-22 13:58:35,796 INFO thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] State transition from RUNNING to PENDING_ERROR
Attachments
Issue Links
- links to