Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7434

DeadLetterQueueReporter throws NPE if transform throws NPE

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • connect
    • None
    • jdk 8

    Description

      A NPE thrown from a transform in a connector configured with

      errors.deadletterqueue.context.headers.enable=true

      causes DeadLetterQueueReporter to break with a NPE.

      Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is {topic='****', partition=1, offset=0, timestamp=1537370573366, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
      java.lang.NullPointerException
      Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      java.lang.NullPointerException
      	at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
      	at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
      	at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
      	at org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
      	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

       

      This is caused by populateContextHeaders only checking if the Throwable is not null, but not checking that the message in the Throwable is not null before trying to serialize the message:

      https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177

      if (context.error() != null) {
           headers.add(ERROR_HEADER_EXCEPTION, toBytes(context.error().getClass().getName()));
           headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(context.error().getMessage()));
      

      toBytes throws an NPE if passed null as the parameter.

       

      Attachments

        Activity

          People

            mihbor Michal Borowiecki
            mihbor Michal Borowiecki
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: