Details
-
Improvement
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.9.0
-
None
Description
As FLINK-10050 supported alloedLateness, but we can not get the side output containing the late data, this issue wants to fix it.
For implementation, I want to add an input parameter OutputTag in WithWindow as following
protected WithWindow(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2, TypeInformation<KEY> keyType, WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, Time allowedLateness, OutputTage<TaggedUnion<T1, T2>> outputTag) { ... }
and add a function sideOutputLateData(OutputTag<T> outputTag) in WithWindow
public WithWindow<T1, T2, KEY, W> sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) {
...
}
In WithWindow#apply will add outputTag if it is not null
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInfomation<T> resultType) { ... if (outputTag != null) { windowedStream.sideOutputLateData(outputTag); } ... }
The same will apply to JoinedStreams