Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Resolved
-
1.0.0
-
MacOS X 10.10
Description
Using named keys in keyBy() for nested POJO types results in failure. The iindexes for named key fields are used inconsistently with nested POJO types. In particular, PojoTypeInfo.getFlatFields() returns the field's position after (apparently) flattening the structure but is referenced in the unflattened version of the POJO type by PojoTypeInfo.getTypeAt().
In the example below, getFlatFields() returns positions 0, 1, and 14. These positions appear correct in the flattened structure of the Data class. However, in KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig executionConfig), a call to compositeType.getTypeAt(logicalKeyPositions[i]) for the third key results PojoTypeInfo.getTypeAt() declaring it out of range, as it compares the length of the directly named fields of the object vs the length of flattened version of that type.
Concrete Example:
Consider this graph:
DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.topic, new DataDeserialzer(), kafkaConsumerProperties)); dataStream .flatMap(new DataMapper()) .keyBy("aaa", "abc", "wxyz")
DataDeserialzer returns a "NativeDataFormat" object; DataMapper takes this NativeDataFormat object and extracts individual Data objects:
public class Data { public int aaa; public int abc; public long wxyz; public int t1; public int t2; public Policy policy; public Stats stats; public Data() {}
A Policy object is an instance of this class:
public class Policy { public short a; public short b; public boolean c; public boolean d; public Policy() {} }
A Stats object is an instance of this class:
public class Stats { public long count; public float a; public float b; public float c; public float d; public float e; public Stats() {} }