Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-12577

Flink uses Kryo serializer if given a SpecificRecord type

    XMLWordPrintableJSON

Details

    Description

      If a data stream does not return a subclass of SpecificRecord, Flink will actually infer a GenericTypeInfo and use kryo instead.

      Example:

      StreamExecutionEnvironment env = ...
      env.getConfig().disableGenericTypes();
      
      DataStream<Measurement> sourceStream = env.addSource(...);
      DataStream<SpecificRecord> test = sourceStream.map(value -> new AggregatedSensorStatistics());
      
      public class AggregatedSensorStatistics extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {...}
      

      This will lead to

      Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.specific.SpecificRecord is treated as a generic type.
      	at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
      	at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:208)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:541)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132)
      	at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1537)
      	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
      	at com.ververica.training.statemigration.StateMigrationJobBase.createAndExecuteJob(StateMigrationJobBase.java:68)
      	at com.ververica.training.statemigration.avro.StateMigrationJob.main(StateMigrationJob.java:13)
      

      You may want some flexibility in your types and thus not provide the exact one like AggregatedSensorStatistics in this example. I don't see any reason we should disallow that behaviour.

      Reason for this is that TypeExtractor#privateGetForClass() is having this code which only sees classes as Avro if they extend from SpecificRecord:

      if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
      	return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            nkruber Nico Kruber
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: