Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31049

Add support for Kafka record headers to KafkaSink

    XMLWordPrintableJSON

Details

    Description

      The default org.apache.flink.connector.kafka.sink.KafkaSink does not support adding Kafka record headers. In some implementations, downstream consumers might rely on Kafka record headers being set.
       

      A way to add Headers would be to create a custom KafkaRecordSerializationSchema and inject that into the KafkaSink.

      However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for convenience and allows a more usable approach of creating a KafkaSink without having to deal with details like the RecordProducer directly. This builder does not support adding record headers.

      This is where I think it should be added.

      The code responsible for creating the Kafka record involves  org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper where the RecordProducer is created. 
      It is relatively simple to add support for record headers by adding a "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key and value serializers and using the appropriate RecordProducer constructor.
       
      The issue was discussed here.
       

      Attachments

        Activity

          People

            Axeman Alex Gout
            Axeman Alex Gout
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 48h
                48h
                Remaining:
                Remaining Estimate - 48h
                48h
                Logged:
                Time Spent - Not Specified
                Not Specified