Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32398

Support Avro SpecificRecord in DataStream and Table conversion.

    XMLWordPrintableJSON

Details

    Description

      At this point, it seems that Avro SpecificRecord is not supported in DataStream and Table conversion. For example, the following code breaks when MyAvroRecord contains fields of type Record, Enum, Array, etc.

       

      ing schemaString = MyAvroRecord.getClassSchema().toString();
      DataType dataType = AvroSchemaConverter.convertToDataType(schemaString);
      TypeInformation<MyAvroRecord> typeInfo = AvroSchemaConverter.convertToTypeInfo(schemaString);;
      
      input.getTransformation().setOutputType(typeInfo);
      tEnv.createTemporaryView("myTable", input);
      Table result = tEnv.sqlQuery("SELECT * FROM myTable");
      DataStream<MyAvroRecord> output = tEnv.toDataStream(result, dataType);
      output.getTransformation().setOutputType(typeInfo); 

       

      While the conversion from MyAvroRecord to RowData seems fine, several issues were there when converting the RowData back to MyAvroRecord, including but not limited to:

      1. AvroSchemaConverter.convertToDataType(schema) maps Avro Record type to RowType, which loses the class information.
      2. AvroSchemaConverter maps Enum to StringType, and simply try to cast the string to the Enum.

      I did not find a way to easily convert the between DataStream and Table for Avro SpecificRecord. Given the popularity of Avro SpecificRecord, we should support this.

      Attachments

        Activity

          People

            Unassigned Unassigned
            becket_qin Jiangjie Qin
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: