Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8659

SetSchemaMetadata SMT fails on records with null value and schema

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.0.1, 3.2.0, 3.1.1
    • connect
    • None

    Description

      If you use the SetSchemaMetadata SMT with records for which the key or value and corresponding schema are null (i.e. tombstone records from [Debezium|https://debezium.io/), the transform will fail.

      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
      at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.DataException: Schema required for [updating schema metadata]
      at org.apache.kafka.connect.transforms.util.Requirements.requireSchema(Requirements.java:31)
      at org.apache.kafka.connect.transforms.SetSchemaMetadata.apply(SetSchemaMetadata.java:67)
      at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
      ... 11 more
      
      

       

      I don't see any problem in passing those records as is in favor of failing and will shortly add this in a PR.

      Attachments

        Issue Links

          Activity

            People

              bfncs Marc Löhe
              bfncs Marc Löhe
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: