Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13632

MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered records

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.1.0, 3.2.0, 3.3.0
    • 3.4.0
    • mirrormaker
    • None

    Description

      We have a setup where we filter records with MirrorMaker 2.0 (see below). This results in the following warning messages as a result of NPE's in MirrorSourceTask.commitRecord for each filtered record:

      [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
      java.lang.NullPointerException
      	at org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829) 

      The reason seems to be that for filtered records metadata is null. Note that in the overridden SourceTask.commitRecord the javadoc clearly states that metadata will be null if the record was filtered.

      In our case we use a custom predicate, but the issue can be reproduced with the following configuration:

      clusters = source,target
      
      tasks.max = 1
      
      source.bootstrap.servers = <cluster1>
      target.bootstrap.servers = <cluster2>
      
      offset.storage.replication.factor=1
      status.storage.replication.factor=1
      config.storage.replication.factor=1
      
      source->target.enabled = true
      source->target.topics = topic1
      source->target.transforms=Filter
      source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
      source->target.transforms.Filter.predicate=HeaderPredicate
      source->target.predicates=HeaderPredicate
      source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
      source->target.predicates.HeaderPredicate.name=someheader
       

      Each record with the header key 'someheader' will result in the NPE and warning message.

      On a side note, we couldn't find clear documentation on how to configure (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but apart from the NPE's and warning messages this seems to functionally work for us with our custom filter.

       

      Attachments

        Issue Links

          Activity

            People

              RensGroothuijsen Rens Groothuijsen
              bertbaron Bert Baron
              Votes:
              1 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: