Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7434

DeadLetterQueueReporter throws NPE if transform throws NPE

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.0.1, 2.1.0
    • Component/s: KafkaConnect
    • Labels:
      None
    • Environment:
      jdk 8

      Description

      A NPE thrown from a transform in a connector configured with

      errors.deadletterqueue.context.headers.enable=true

      causes DeadLetterQueueReporter to break with a NPE.

      Executing stage 'TRANSFORMATION' with class 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is {topic='****', partition=1, offset=0, timestamp=1537370573366, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
      java.lang.NullPointerException
      Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
      java.lang.NullPointerException
      	at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
      	at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
      	at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
      	at org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
      	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
      	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

       

      This is caused by populateContextHeaders only checking if the Throwable is not null, but not checking that the message in the Throwable is not null before trying to serialize the message:

      https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177

      if (context.error() != null) {
           headers.add(ERROR_HEADER_EXCEPTION, toBytes(context.error().getClass().getName()));
           headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes(context.error().getMessage()));
      

      toBytes throws an NPE if passed null as the parameter.

       

        Attachments

          Activity

            People

            • Assignee:
              mihbor Michal Borowiecki
              Reporter:
              mihbor Michal Borowiecki
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: