Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.5.0
-
None
-
None
-
linux, windows, java 11
Description
NullPointerException seen when a KTable statestore is being modified by a punctuated method which is added to a topology via the DSL processor/ktable valueTransfomer methods.
It seems valid for AbstractProcessorContext.topic() to return null; however the check below returns a NullPointerException before a null can be returned.
if (topic.equals(NONEXIST_TOPIC)) {
Made a local fix to reverse the ordering of the check (i.e. avoid the null) and this appears to fix the issue and sends the change to the state stores changelog topic.
if (NONEXIST_TOPIC.equals(topic)) {
Stacktrace below
2020-07-02 07:29:46,829 [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5
51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the following error during processing:
java.lang.NullPointerException: null
{{ at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}}
{{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
{{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}}
{{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}}
{{ at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}}
{{ at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}}
{{ at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}}
{{ at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
{{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
{{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55) // punctuated lambda - user code}}
{{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) //iterates over the state store and cleans up old items}}
{{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}}
{{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}}
{{ at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}}
{{ at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}}
{{ at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}}
{{ at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}}
{{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}}
{
}
Attachments
Issue Links
- links to