Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9566

ProcessorContextImpl#forward throws NullPointerException if invoked from DeserializationExceptionHandler

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.0.0
    • Fix Version/s: None
    • Component/s: streams
    • Labels:
      None

      Description

      Hi, I am trying to implement custom DeserializationExceptionHandler which would forward an exception to downstream processor(s), but ProcessorContextImpl#forward throws a NullPointerException if invoked from this custom handler.

      Handler implementation:

      MyDeserializationExceptionHandler.java
      
      public class MyDeserializationExceptionHandler implements DeserializationExceptionHandler {
      
          @Override
          public void configure(Map<String, ?> configs) {
          }
      
          @Override
          public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record, Exception exception) {
              context.forward(null, exception, To.child("error-processor"));
              return DeserializationHandlerResponse.CONTINUE;
          }
      }
      

      Handler is wired as default deserialization exception handler:

          private TopologyTestDriver initializeTestDriver(StreamsBuilder streamBuilder) {
              Topology topology = streamBuilder.build();
              Properties props = new Properties();
              props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-test-application");
              props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
              props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
              props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, MyDeserializationExceptionHandler.class.getName());
              return new TopologyTestDriver(topology, props);
          }
      

       
      Exception stacktrace:

      org.apache.kafka.streams.errors.StreamsException: Fatal user code error in deserialization error callback
          at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
          at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
          at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
          at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
          at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
          at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
          ...
      
      Caused by: java.lang.NullPointerException
          at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
          at MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
          at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) ... 33 more
      

      Neither DeserializationExceptionHandler, nor ProcessorContext javadocs mention that ProcessorContext#forward(...) must not be invoked from DeserializationExceptionHandler, so I assume that this is a defect.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              TomasMi Tomas Mi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: