diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java index c5e4000..e9336a0 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java @@ -104,7 +104,7 @@ Object value = entry.getValue(); if (value == null) { newRecord.put(fieldName(fieldNamePrefix, entry.getKey()), null); - return; + continue; } Schema.Type inferredType = ConnectSchema.schemaType(value.getClass()); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java index d709054..9f9e5c8 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/FlattenTest.java @@ -254,4 +254,25 @@ Schema transformedOptFieldSchema = SchemaBuilder.string().optional().defaultValue("child_default").build(); assertEquals(transformedOptFieldSchema, transformedSchema.field("opt_field").schema()); } + + @Test + public void testNullValueInStruct() { + xformKey.configure(Collections.emptyMap()); + + Map aStructMap = new HashMap<>(); + aStructMap.put("B", null); + aStructMap.put("C", "cValue"); + aStructMap.put("D", "dValue"); + + Map> key = Collections.singletonMap("A", aStructMap); + SourceRecord src = new SourceRecord(null, null, "topic", null, key, null, null); + SourceRecord transformed = xformKey.apply(src); + + assertNull(transformed.keySchema()); + assertTrue(transformed.key() instanceof Map); + Map transformedMap = (Map) transformed.key(); + assertEquals(null, transformedMap.get("A.B")); + assertEquals(aStructMap.get("C"), transformedMap.get("A.C")); + assertEquals(aStructMap.get("D"), transformedMap.get("A.D")); + } }