Details
Description
Integration tests with com.salesforce.kafka.test.KafkaTestUtils, local setup, StickyAssignor used, local topics are created / removed, one topic is created in the beginning of test and without unsubscribing from it - deleted.
Same happens in real environment.
- have single "topic" with 1 partition
- single consumer subscribed to this "topic" (StickyAssignor)
- delete "topic"
=>
- rebalance starts, topic partition(s) is revoked
- on assignment StickyAssignor throws exception (line 223), because partitionsPerTopic.("topic") returns null in for loop (topic deleted - no partitions are present)
In the provided log part, tearDown() causes topic deletion, while consumer still running and tries to poll data from topic.
RangeAssignor works fine (revokes partition, assigns empty set).
Problem doesn't have workaround (like handle i in onPartitionsAssigned and remove unsubscribe topic), because everything happens before listener called.
Attachments
Attachments
Issue Links
- links to