Details
-
New Feature
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
None
Description
The default org.apache.flink.connector.kafka.sink.KafkaSink does not support adding Kafka record headers. In some implementations, downstream consumers might rely on Kafka record headers being set.
A way to add Headers would be to create a custom KafkaRecordSerializationSchema and inject that into the KafkaSink.
However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for convenience and allows a more usable approach of creating a KafkaSink without having to deal with details like the RecordProducer directly. This builder does not support adding record headers.
This is where I think it should be added.
The code responsible for creating the Kafka record involves org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper where the RecordProducer is created.
It is relatively simple to add support for record headers by adding a "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key and value serializers and using the appropriate RecordProducer constructor.
The issue was discussed here.