Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10515

NPE: Foreign key join serde may not be initialized with default serde if application is distributed

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 2.6.0, 2.5.1
    • Fix Version/s: 2.7.0, 2.6.1
    • Component/s: streams
    • Labels:
      None

      Description

      The fix of KAFKA-9517 fixed the initialization of the foreign key joins serdes for KStream applications that do not run distributed over multiple instances.

      However, if an application runs distributed over multiple instances, the foreign key join serdes may still not be initialized leading to the following NPE:

      Encountered the following error during 
      processing:java.lang.NullPointerException: null
      	at 
      org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
      	at 
      org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
      	at 
      org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
      	at 
      org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
      	at 
      org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
      	at 
      org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
      	at 
      org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
      	at 
      org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
      	at 
      org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
      	at 
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
      	at 
      org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
      	at 
      org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
      	at 
      org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
      	at 
      org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
      	at 
      org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
      	at 
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
      	at 
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
      	at 
      org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
      	at 
      org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
      	at 
      org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
      	at 
      org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
      	at 
      org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
      	at 
      org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
      	at 
      org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
      	at 
      org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
      	at 
      org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
      	at 
      org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

      This happens because the processors for foreign key joins will be distributed across multiple tasks. The serde will only be initialized with the default serde during the initialization of the task containing the sink node ("subscription-registration-sink"). So if the task containing the SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to the same instance as the task containing the sink node, a NPE will be thrown because the Serde of the state store used within the SubscriptionStoreReceiveProcessor is not initialized.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                thorsten.hake Thorsten Hake
                Reporter:
                thorsten.hake Thorsten Hake
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: