Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6605

Flatten SMT does not properly handle fields that are null

    XMLWordPrintableJSON

Details

    Description

      When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium:

      {
        "before": null,
        "after": {
          "dbserver1.mydb.team.Value": {
            "id": 1,
            "name": "kafka",
            "email": "kafka@apache.org",
            "last_modified": 1519939449000
          }
        },
        "source": {
          "version": {
            "string": "0.7.3"
          },
          "name": "dbserver1",
          "server_id": 0,
          "ts_sec": 0,
          "gtid": null,
          "file": "mysql-bin.000003",
          "pos": 154,
          "row": 0,
          "snapshot": {
            "boolean": true
          },
          "thread": null,
          "db": {
            "string": "mydb"
          },
          "table": {
            "string": "team"
          }
        },
        "op": "c",
        "ts_ms": {
          "long": 1519939520285
        }
      }
      

      Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE:

      java.lang.NullPointerException
          at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219)
          at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234)
          at org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151)
          at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75)
          at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)
      

      Here's the connector configuration that was used:

      {
          "name": "debezium-connector-flatten",
          "config": {
              "connector.class": "io.debezium.connector.mysql.MySqlConnector",
              "tasks.max": "1",
              "database.hostname": "mysql",
              "database.port": "3306",
              "database.user": "debezium",
              "database.password": "dbz",
              "database.server.id": "223345",
              "database.server.name": "dbserver-flatten",
              "database.whitelist": "mydb",
              "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
              "database.history.kafka.topic": "schema-flatten.mydb",
              "include.schema.changes": "true",
              "transforms": "flatten",
              "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
              "transforms.flatten.delimiter": "_"
            }
      }
      

      Note that the above configuration sets the delimiter to `_`. The default delimiter is `.`, which is not a valid character within an Avro field, and doing this results in the following exception:

      org.apache.avro.SchemaParseException: Illegal character in: source.version
      	at org.apache.avro.Schema.validateName(Schema.java:1151)
      	at org.apache.avro.Schema.access$200(Schema.java:81)
      	at org.apache.avro.Schema$Field.<init>(Schema.java:403)
      	at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
      	at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
      	at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
      	at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
      	at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
      	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
      	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
      	at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
      	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
      	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      This should probably be addressed in the documentation: when using Avro, set the delimiter to `_` or another alphanumeric character.

      Attachments

        Issue Links

          Activity

            People

              mihbor Michal Borowiecki
              rhauch Randall Hauch
              Randall Hauch Randall Hauch
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: