Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10246

AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.5.0
    • None
    • streams
    • None
    • linux, windows, java 11

    Description

      NullPointerException seen when a KTable statestore is being modified by a punctuated method which is added to a topology via the DSL processor/ktable valueTransfomer methods.

      It seems valid for AbstractProcessorContext.topic() to return null; however the check below returns a NullPointerException before a null can be returned.

      if (topic.equals(NONEXIST_TOPIC)) {

      Made a local fix to reverse the ordering of the check (i.e. avoid the null) and this appears to fix the issue and sends the change to the state stores changelog topic.

      if (NONEXIST_TOPIC.equals(topic)) {

      Stacktrace below

      2020-07-02 07:29:46,829 [ABC_aggregator-551a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] ERROR [o.a.k.s.p.i.StreamThread]: stream-thread [ABC_aggregator-5
      51a90c1-d7c3-4357-a608-3ea79951f4e8-StreamThread-5] Encountered the following error during processing:
      java.lang.NullPointerException: null
      {{ at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:141)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
      {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
      {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
      {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
      {{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:118)}}
      {{ at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:97)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
      {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
      {{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:118)}}
      {{ at org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin$KTableKTableOuterJoinProcessor.process(KTableKTableOuterJoin.java:65)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)}}
      {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}}
      {{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)}}
      {{ at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)}}
      {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)}}
      {{ at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)}}
      {{ at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)}}
      {{ at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)}}
      {{ at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)}}
      {{ at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)}}
      {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)}}
      {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
      {{ at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)}}
      {{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$0(StreamsBuilderHelper.java:55) // punctuated lambda - user code}}
      {{ at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133) //iterates over the state store and cleans up old items}}
      {{ at com.pjp1981.streambuilder.StreamsBuilderHelper$1.lambda$init$1(StreamsBuilderHelper.java:47)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$punctuate$3(ProcessorNode.java:161)}}
      {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
      {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:161)}}
      {{ at org.apache.kafka.streams.processor.internals.StreamTask.lambda$punctuate$4(StreamTask.java:445)}}
      {{ at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)}}
      {{ at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:445)}}
      {{ at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)}}
      {{ at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:868)}}
      {{ at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.punctuate(AssignedStreamsTasks.java:502)}}
      {{ at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:557)}}
      {{ at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:951)}}
      {{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)}}
      {{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)}}
      {

      { at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)}

      }

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pjp1981 Peter Pringle
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: