Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
None
-
None
-
None
Description
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.
I propose to support headers in Flink by modifying the following API:
- In KeyedSerializationSchema, add
List<Tuple2<String, byte[]>> getHeaders(T element)
- In KeyedDeserializationSchema, add
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException
These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.
Attachments
Issue Links
- duplicates
-
FLINK-8354 Add KafkaDeserializationSchema that directly uses ConsumerRecord
- Closed
-
FLINK-11693 Add KafkaSerializationSchema that directly uses ProducerRecord
- Closed