Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12914

StreamSourceNode.toString() throws with StreamsBuilder.stream(Pattern) ctor

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.8.0
    • 2.8.1, 3.0.0
    • streams
    • None

    Description

      Hi, 

      I came across what looks like a bug.

      Repro

      import org.apache.kafka.common.serialization.Serdes
      import org.apache.kafka.streams.KafkaStreams
      import org.apache.kafka.streams.StreamsBuilder
      import org.apache.kafka.streams.kstream.Consumed
      import java.util.*
      import java.util.regex.Pattern
      
      fun main() {
          val builder = StreamsBuilder()
          builder.stream(Pattern.compile("foo"), Consumed.with(Serdes.Long(), Serdes.Long()))
          val streams = KafkaStreams(builder.build(), Properties())
          streams.start()
      }
      
      SLF4J: Failed toString() invocation on an object of type [java.util.LinkedHashSet]
      Reported exception:
      java.lang.NullPointerException
      	at java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030)
      	at java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132)
      	at java.base/java.util.Collections.unmodifiableSet(Collections.java:1122)
      	at org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55)
      	at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65)
      	at java.base/java.lang.String.valueOf(String.java:3352)
      	at java.base/java.lang.StringBuilder.append(StringBuilder.java:166)
      	at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:457)
      	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277)
      	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249)
      	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211)
      	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161)
      	at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
      	at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
      	at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
      	at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
      	at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
      	at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
      	at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
      	at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
      	at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
      	at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414)
      	at ch.qos.logback.classic.Logger.debug(Logger.java:490)
      	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305)
      	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624)
      	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613)
      	at ApplicationKt.main(Application.kt:11)
      	at ApplicationKt.main(Application.kt)
      SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode]
      Reported exception:
      java.lang.NullPointerException
      	at java.base/java.util.Collections$UnmodifiableCollection.<init>(Collections.java:1030)
      	at java.base/java.util.Collections$UnmodifiableSet.<init>(Collections.java:1132)
      	at java.base/java.util.Collections.unmodifiableSet(Collections.java:1122)
      	at org.apache.kafka.streams.kstream.internals.graph.SourceGraphNode.topicNames(SourceGraphNode.java:55)
      	at org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode.toString(StreamSourceNode.java:65)
      	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277)
      	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249)
      	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211)
      	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161)
      	at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)
      	at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)
      	at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)
      	at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
      	at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)
      	at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
      	at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)
      	at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)
      	at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)
      	at ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:414)
      	at ch.qos.logback.classic.Logger.debug(Logger.java:490)
      	at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:305)
      	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:624)
      	at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:613)
      	at ApplicationKt.main(Application.kt:11)
      	at ApplicationKt.main(Application.kt)
      

      Kotlin but you get the idea.

      Brief debug

      StreamSourceNode.toString() calls topicNames():

      https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java#L64-L70

      Which will be null if constructed with a Pattern rather than a String:

      https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java#L49

      Which causes UnmodifiableCollection to throw.

       

      Attachments

        Issue Links

          Activity

            People

              mjsax Matthias J. Sax
              will118 Will Bartlett
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: