Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12396

Dedicated exception for kstreams when null key received

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Trivial
    • Resolution: Fixed
    • 2.6.0
    • 3.0.0
    • streams

    Description

      If kstreams application received null as a key (thanks to QA), kstream app gives long and confusing stack trace, it would be nice to have shorter and specific exception instead of
      org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=(hidden), partition=0, offset=3722, stacktrace=java.lang.NullPointerException
      at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
      at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown Source)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
      at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
      at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
      at org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown Source)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
      at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
      at org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown Source)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
      at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
      Caused by: java.lang.NullPointerException
      at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
      at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
      at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown Source)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
      at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
      at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
      at HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:103)
      at HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:29)
      at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
      at org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown Source)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
      at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
      at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
      at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
      at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
      at org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown Source)
      at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
      at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
      ... 4 more

      Attachments

        Issue Links

          Activity

            People

              Nathan22177 Valery Kokorev
              vkalegin Veniamin Kalegin
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: