Details
Description
When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium:
{ "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "kafka@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } }
Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE:
java.lang.NullPointerException at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219) at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234) at org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151) at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Here's the connector configuration that was used:
{ "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_" } }
Note that the above configuration sets the delimiter to `_`. The default delimiter is `.`, which is not a valid character within an Avro field, and doing this results in the following exception:
org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.<init>(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
This should probably be addressed in the documentation: when using Avro, set the delimiter to `_` or another alphanumeric character.
Attachments
Issue Links
- supercedes
-
KAFKA-5972 Flatten SMT does not work with null values
- Resolved
- links to