Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.7.2, 1.8.0, 1.9.0, 1.10.0
-
None
Description
If a data stream does not return a subclass of SpecificRecord, Flink will actually infer a GenericTypeInfo and use kryo instead.
Example:
StreamExecutionEnvironment env = ... env.getConfig().disableGenericTypes(); DataStream<Measurement> sourceStream = env.addSource(...); DataStream<SpecificRecord> test = sourceStream.map(value -> new AggregatedSensorStatistics()); public class AggregatedSensorStatistics extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {...}
This will lead to
Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.specific.SpecificRecord is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:208) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:541) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1537) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at com.ververica.training.statemigration.StateMigrationJobBase.createAndExecuteJob(StateMigrationJobBase.java:68) at com.ververica.training.statemigration.avro.StateMigrationJob.main(StateMigrationJob.java:13)
You may want some flexibility in your types and thus not provide the exact one like AggregatedSensorStatistics in this example. I don't see any reason we should disallow that behaviour.
Reason for this is that TypeExtractor#privateGetForClass() is having this code which only sees classes as Avro if they extend from SpecificRecord:
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) { return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz); }