Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11303

Utilizing Kafka headers for serialization and deserialization

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • None
    • None
    • Connectors / Kafka
    • None

    Description

      Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

      However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.

       

      I propose to support headers in Flink by modifying the following API:

      • In KeyedSerializationSchema, add
        List<Tuple2<String, byte[]>> getHeaders(T element)
        
      • In KeyedDeserializationSchema, add
        T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException

       

      These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              allenxwang Allen Wang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: