Description
I have a simple Kafka Stream app that consumes from multiple input topics using the stream function that accepts a Pattern (link).
Whenever I add a new topic that matches the pattern the kafka stream state goes to REBALANCING -> ERROR -> PENDING_SHUTDOWN .
If I restart the app it correctly starts reading again without problems.
It is by design? Should I handle this and simply restart the app?
Kafka Stream version is 2.6.0.
The error is the following:
ERROR o.a.k.s.p.i.ProcessorTopology - Set of source nodes do not match: sourceNodesByName = [KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002] sourceTopicsByName = [KSTREAM-SOURCE-0000000000, KSTREAM-SOURCE-0000000014, KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000002] org.apache.kafka.common.KafkaException: User rebalance callback throws an error at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) Caused by: java.lang.IllegalStateException: Tried to update source topics but source nodes did not match at org.apache.kafka.streams.processor.internals.ProcessorTopology.updateSourceTopics(ProcessorTopology.java:151) at org.apache.kafka.streams.processor.internals.AbstractTask.update(AbstractTask.java:109) at org.apache.kafka.streams.processor.internals.StreamTask.update(StreamTask.java:514) at org.apache.kafka.streams.processor.internals.TaskManager.updateInputPartitionsAndResume(TaskManager.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:261) at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421) ... 10 common frames omitted KafkaStream state is ERROR 17:28:53.200 [datalake-StreamThread-1] ERROR o.apache.kafka.streams.KafkaStreams - stream-client [datalake] All stream threads have died. The instance will be in error state and should be closed. ============> User rebalance callback throws an error KafkaStream state is PENDING_SHUTDOWN
Attachments
Issue Links
- links to