Description
After switching from Transformer to using FixedKeyProcessor I get some NullPointer exceptions logged. It seems to be due to ProcessorParameters::toString using it's field processorSupplier without a null check. After KAFKA-13654: Extend KStream process with new Processor API (#11993) this field can be null and instead the field fixedKeyProcessorSupplier is set.
Stack trace
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.processor.api.ProcessorSupplier.get()" because "this.processorSupplier" is null at org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters.toString(ProcessorParameters.java:133) at java.base/java.lang.String.valueOf(String.java:4218) at java.base/java.lang.StringBuilder.append(StringBuilder.java:173) at org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode.toString(ProcessorGraphNode.java:52) at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277) at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161) at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414) at ch.qos.logback.classic.Logger.debug(Logger.java:490) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:293) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:628)
Attachments
Issue Links
- links to