Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5164

SetSchemaMetadata does not replace the schemas in structs correctly

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.2.1
    • Fix Version/s: 0.11.0.0
    • Component/s: KafkaConnect
    • Labels:
      None

      Description

      In SetSchemaMetadataTest we verify that the name and version of the schema in the record have been replaced:

      https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62

      However, in the case of Structs, the schema will be attached to both the record and the Struct itself. So we correctly rebuild the Record:

      https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77
      https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104
      https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119

      But if the key or value are a struct, they will still contain the old schema embedded in the struct.

      Ultimately this can lead to validations in other code failing (even for very simple changes like adjusting the name of a schema):

      (org.apache.kafka.connect.runtime.WorkerTask:141)
      org.apache.kafka.connect.errors.DataException: Mismatching struct schema
          at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471)
          at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
          at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      

      The solution to this is probably to check whether we're dealing with a Struct when we use a new schema and potentially copy/reallocate it.

      This particular issue would only appear if we don't modify the data, so I think SetSchemaMetadata is currently the only transformation that would have the issue.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                rhauch Randall Hauch
                Reporter:
                ewencp Ewen Cheslack-Postava
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: