Details
Description
The following NullPointerException occurs when the global/.checkpoint file contains a line with a topic previously associated with (but no longer configured for) a GlobalKTable:
java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
After line 84 (https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84) `sourceNodeAndDeserializer` is null for the old, but still valid, topic. This can be reproduced with the following sequence:
- create a GlobalKTable associated with topic, 'global-topic1'
- change the topic associated with the GlobalKTable to 'global-topic2'
- at this point, the global/.checkpoint file will contain lines for both topics
- produce messages to previous topic ('global-topic1')
- the consumer will attempt to consume from global-topic1, but no deserializer associated with global-topic1 will be found and the NPE will occur
It looks like the following recent commit has included checkpoint validations that may prevent this issue: https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425
Attachments
Issue Links
- is related to
-
KAFKA-5998 /.checkpoint.tmp Not found exception
- Resolved
- links to