Details
-
Improvement
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
None
-
None
Description
Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy:
public class PlaygroundJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<Tuple2<Long, Integer>> source = env.addSource(new SourceFunction<Tuple2<Long, Integer>>() { @Override public void run(SourceContext<Tuple2<Long, Integer>> sourceContext) throws Exception { int i = 0; while (true) { Tuple2<Long, Integer> tuple = Tuple2.of(System.currentTimeMillis(), i++ % 10); sourceContext.collect(tuple); } } @Override public void cancel() { } }); source.assignTimestampsAndWatermarks( // Switch noWatermarks() to forMonotonousTimestamps() // and values are being printed. WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() .withTimestampAssigner((t, timestamp) -> t.f0) ).keyBy(t -> t.f1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .process(new ProcessWindowFunction<Tuple2<Long, Integer>, String, Integer, TimeWindow>() { @Override public void process(Integer key, Context context, Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws Exception { int count = 0; Iterator<Tuple2<Long, Integer>> iter = iterable.iterator(); while (iter.hasNext()) { count++; iter.next(); } out.collect("Key: " + key + " count: " + count); } }).print(); env.execute(); } }
The issue is that the stream makes use of noWatermarks() which effectively disables any event time windowing.
As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up.
--------------------
Proposed change:
We extend the interface WatermarkStrategy with the method boolean isEventTime().
We create a new class named EventTimeWindowPreconditions and add the following method to it where we make use of isEventTime():
public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) { for (int i = predecessors.size() - 1; i >= 0; i--) { final Transformation<?> pre = predecessors.get(i); if (pre instanceof TimestampsAndWatermarksTransformation) { TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation = (TimestampsAndWatermarksTransformation<?>) pre; final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy(); // assert that it generates timestamps or throw exception if (!waStrat.isEventTime()) { // TODO: Custom exception throw new IllegalArgumentException( "Cannot use an EventTime window with a preceding water mark generator which" + " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy" + " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?" + " These windows will never window any values as your stream does not support event time" ); } // We have to terminate the check now as we have found the first most recent // timestamp assigner for this window and ensured that it actually adds event // time stamps. If there has been previously in the chain a window assigner // such as noWatermarks() we can safely ignore it as another valid event time watermark assigner // exists in the chain after and before our current event time window. break; } } }
Then we can update the constructors of AllWindowedStream and WindowedStream to:
if (windowAssigner.isEventTime()) {
EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
}
This is the approach I currently have in mind but not sure whether this is the best approach.
Best regards,
Dario
Attachments
Issue Links
- links to