Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4300

NamedCache throws an NPE when evict is called and the cache is empty

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.1.0
    • Fix Version/s: 0.10.1.0
    • Component/s: streams
    • Labels:
      None

      Description

      evict can be called on a NamedCache even though it is empty. This is due to the shared nature of the outer ThreadCache. Currently if evict is called on an empty NamedCache it will throw a NullPointerException.

      From the original email:

      I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
      0.10.1 release candidate.

      It runs ok for a few thousand of messages, and then it dies with the
      following exception:

      Exception in thread "StreamThread-1" java.lang.NullPointerException
      at
      org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:194)
      at
      org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
      at
      org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
      at
      org.apache.kafka.streams.kstream.internals.KTableReduce$KTableAggregateValueGetter.get(KTableReduce.java:121)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
      at
      org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
      at
      org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
      at
      org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:83)
      at
      org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
      at
      org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
      at
      org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
      at
      org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
      at
      org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:196)
      at
      org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
      at
      org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:187)
      at
      org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:182)
      at
      org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:92)
      at
      org.apache.kafka.streams.kstream.internals.KTableReduce$KTableReduceProcessor.process(KTableReduce.java:52)
      at
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
      at
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
      at
      org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
      at
      org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:177)
      at
      org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:427)
      at
      org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:235)

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                damianguy Damian Guy
                Reporter:
                damianguy Damian Guy
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: