Description
Hi,
I came across what looks like a bug.
Repro
import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.kstream.Consumed import java.util.* import java.util.regex.Pattern fun main() { val builder = StreamsBuilder() builder.stream(Pattern.compile("foo"), Consumed.with(Serdes.Long(), Serdes.Long())) val streams = KafkaStreams(builder.build(), Properties()) streams.start() }
SLF4J: Failed toString() invocation on an object of type [java.util.LinkedHashSet] Reported exception: java.lang.NullPointerException at java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030) at java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132) at java.base/java.util.Collections.unmodifiableSet(Collections.java:1122) at org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55) at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65) at java.base/java.lang.String.valueOf(String.java:3352) at java.base/java.lang.StringBuilder.append(StringBuilder.java:166) at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457) at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277) at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161) at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414) at ch.qos.logback.classic.Logger.debug(Logger.java:490) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613) at ApplicationKt.main(Application.kt:11) at ApplicationKt.main(Application.kt) SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode] Reported exception: java.lang.NullPointerException at java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030) at java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132) at java.base/java.util.Collections.unmodifiableSet(Collections.java:1122) at org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55) at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65) at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277) at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211) at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161) at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293) at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206) at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223) at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102) at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84) at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51) at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270) at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257) at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421) at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414) at ch.qos.logback.classic.Logger.debug(Logger.java:490) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613) at ApplicationKt.main(Application.kt:11) at ApplicationKt.main(Application.kt)
Kotlin but you get the idea.
Brief debug
StreamSourceNode.toString() calls topicNames():
Which will be null if constructed with a Pattern rather than a String:
Which causes UnmodifiableCollection to throw.
Attachments
Issue Links
- links to