Details
-
Improvement
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.0.0
-
None
-
None
Description
The pipeline is Source(Kafka serialize msg to RowData type) -> FlatMap --(keyby)–> TumbleProcessWindow -> sink.
The Source would emit two fields: UserName String, and Amount Integer.
FlatMap collects the BinaryRowData type record and emits it to the TumbleProcessWindow operator.
KeySelector is using the UserName BinaryStringData. key by code is :
.keyBy(rowData -> rowData.getString(0))
The window result is unexpected, the same username records arrived at TumbleProcessWindow simultaneously, but these records were calculated in the different windows.
When I use the below keyBy, the window result is correct.
.keyBy(rowData -> rowData.getString(0).toString())
Reproduction Code:
env.fromSource( kafkaSource, WatermarkStrategy.noWatermarks(), sourceName()) .flatMap(new FlatMapFunction<RowData, RowData>() { public void flatMap(RowData value, Collector<RowData> out) throws Exception { System.out.println("is instanceof BinaryRowData =" + (value instanceof BinaryRowData)); out.collect(value); } }) .keyBy((KeySelector<RowData, StringData>) value -> value.getString(0)) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new FlinkTumbleWindowingProcessFunction()) .print(); public static class FlinkTumbleWindowingProcessFunction extends ProcessWindowFunction<RowData, RowData, StringData, TimeWindow> { int subtask = -1; public void open(Configuration parameters) throws Exception { super.open(parameters); subtask = getRuntimeContext().getIndexOfThisSubtask(); } public void process( StringData s, ProcessWindowFunction<RowData, RowData, StringData, TimeWindow>.Context context, Iterable<RowData> elements, Collector<RowData> out) throws Exception { System.out.printf( "subtask = %d, s = %s, elements = %s%n", subtask, s, elements); } }