Details
Description
When you start two consumer processes with a regex topic (with 2 or more
partitions for the matching topics), the second (i.e. nonleader) consumer
will fail with a null pointer exception.
Exception in thread "StreamThread-4" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.
RecordQueue.addRawRecords(RecordQueue.java:78)
at org.apache.kafka.streams.processor.internals.
PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.
StreamTask.addRecords(StreamTask.java:139)
at org.apache.kafka.streams.processor.internals.
StreamThread.runLoop(StreamThread.java:299)
at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:208)
The issue may be in the TopologyBuilder line 832:
String[] topics = (sourceNodeFactory.pattern != null) ?
sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) :
sourceNodeFactory.getTopics();
Because the 2nd consumer joins as a follower, “getUpdates” returns an
empty collection and the regular expression doesn’t get applied to any
topics.
Steps to Reproduce:
1.) Create at least two topics with at least 2 partitions each. And start sending messages to them.
2.) Start a single threaded Regex KStream-Consumer (i.e. becomes the leader)
3) Start a new instance of this consumer (i.e. it should receive some of the partitions)
The second consumer will die with the above exception.
Attachments
Issue Links
- links to