Add a Kafka TableSource which supports Avro serialized data.
The KafkaAvroTableSource should support two modes:
- SpecificRecord Mode: In this case the user specifies a class which was code-generated by Avro depending on a schema. Flink treats these classes as regular POJOs. Hence, they are also natively supported by the Table API and SQL. Classes generated by Avro contain their Schema in a static field. The schema should be used to automatically derive field names and types. Hence, there is no additional information required than the name of the class.
- GenericRecord Mode: In this case the user specifies an Avro Schema. The schema is used to deserialize the data into a GenericRecord which must be translated into possibly nested Row based on the schema information. Again, the Avro Schema is used to automatically derive the field names and types. This mode is less efficient than the SpecificRecord mode because the GenericRecord needs to be converted into Row.
This feature depends on
FLINK-5280, i.e., support for nested data in TableSource.