Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13985

MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.1.0, 3.2.0, 3.3.0
    • 3.4.0
    • mirrormaker
    • None

    Description

      Applying a SMT that filters out messages it can brings to enter in this path:

      From WorkerSourceTask.java

      final SourceRecord record = transformationChain.apply(preTransformRecord);
      final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
      if (producerRecord == null || retryWithToleranceOperator.failed()) {
          counter.skipRecord();
          commitTaskRecord(preTransformRecord, null);
          continue;
      } 

       

      Then to:

      private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
              try {
                  task.commitRecord(record, metadata);
              } catch (Throwable t) {
                  log.error("{} Exception thrown while calling task.commitRecord()", this, t);
              }
      }

      Finally
      From MirrorSourceTask.java

          @Override
          public void commitRecord(SourceRecord record, RecordMetadata metadata) {
              try {
                  if (stopping) {
                      return;
                  }
                  if (!metadata.hasOffset()) {
                      log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
                      return;
                  }
      
      ...

       
      Causing a NPE because metadata is null.
      This the exception.

      [2022-06-13 12:31:33,094] WARN Failure committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
      java.lang.NullPointerException
          at org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
          at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
          at java.base/java.lang.Thread.run(Unknown Source) 

      In my understanding this is well handled and it does not have negative impacts because it's handled by MirrorSourceTask.commitRecord, without leaving the exception be forwarded outside of it. 

      But probably is preferred to handle it checking if metadata != null.
      So skipping commit but safely and silently

      [EDIT]
      Actually, going a bit in deep, there is a small side-effect.

      If the latest message elaborated was filtered out (so not committed by MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read by consumer, because offset was not committed (and probably filtered out if configurations wasn't change).

      But probably this behavior is fine considering MM2's nature

       

      Attachments

        Issue Links

          Activity

            People

              RensGroothuijsen Rens Groothuijsen
              jacopor Jacopo Riciputi
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: