Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24623

Prevent usage of EventTimeWindows when EventTime is disabled

    XMLWordPrintableJSON

Details

    Description

      Having the following stream will never process values after the windowing as event time based has been disabled via the Watermark strategy:

      public class PlaygroundJob {
           public static void main(String[] args) throws Exception {
               StreamExecutionEnvironment env =
      StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
      Configuration());         DataStreamSource<Tuple2<Long, Integer>> source =
      env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
                   @Override
                   public void run(SourceContext<Tuple2<Long, Integer>>
      sourceContext) throws Exception {
                       int i = 0;
                       while (true) {
                           Tuple2<Long, Integer> tuple =
      Tuple2.of(System.currentTimeMillis(), i++ % 10);
                           sourceContext.collect(tuple);
                       }
                   }             @Override
                   public void cancel() {
                   }         });         source.assignTimestampsAndWatermarks(
                       // Switch noWatermarks() to forMonotonousTimestamps()
                       // and values are being printed.
                       WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                               .withTimestampAssigner((t, timestamp) -> t.f0)
                       ).keyBy(t -> t.f1)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                       .process(new ProcessWindowFunction<Tuple2<Long,
      Integer>, String, Integer, TimeWindow>() {
                           @Override
                           public void process(Integer key, Context context,
      Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
      Exception {
                               int count = 0;
                               Iterator<Tuple2<Long, Integer>> iter =
      iterable.iterator();
                               while (iter.hasNext()) {
                                   count++;
                                   iter.next();
                               }                         out.collect("Key: " + key + " count: " + count);                     }
                       }).print();         env.execute();
           }
      }

       

      The issue is that the stream makes use of noWatermarks() which effectively disables any event time windowing. 

      As this pipeline can never process values it is faulty and Flink should throw an Exception when starting up. 

       

      --------------------

      Proposed change:

      We extend the interface WatermarkStrategy with the method boolean isEventTime().

      We create a new class named EventTimeWindowPreconditions and add the following method to it where we make use of isEventTime():

       

      public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>> predecessors) {
          for (int i = predecessors.size() - 1; i >= 0; i--) {
              final Transformation<?> pre = predecessors.get(i);
              if (pre instanceof TimestampsAndWatermarksTransformation) {
      
                  TimestampsAndWatermarksTransformation<?> timestampsAndWatermarksTransformation =
                          (TimestampsAndWatermarksTransformation<?>) pre;
      
                  final WatermarkStrategy<?> waStrat = timestampsAndWatermarksTransformation.getWatermarkStrategy();
      
                  // assert that it generates timestamps or throw exception
                  if (!waStrat.isEventTime()) {
                      // TODO: Custom exception
                      throw new IllegalArgumentException(
                              "Cannot use an EventTime window with a preceding water mark generator which"
                                      + " does not ingest event times. Did you use noWatermarks() as the WatermarkStrategy"
                                      + " and used EventTime windows such as SlidingEventTimeWindows/SlidingEventTimeWindows ?"
                                      + " These windows will never window any values as your stream does not support event time"
                      );
                  }
      
                  // We have to terminate the check now as we have found the first most recent
                  // timestamp assigner for this window and ensured that it actually adds event
                  // time stamps. If there has been previously in the chain a window assigner
                  // such as noWatermarks() we can safely ignore it as another valid event time watermark assigner
                  // exists in the chain after and before our current event time window.
                  break;
      
              }
          }
      }
      

       

      Then we can update the constructors of AllWindowedStream and WindowedStream to:

      if (windowAssigner.isEventTime()) {
         EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
      }
      

      This is the approach I currently have in mind but not sure whether this is the best approach. 

      Best regards, 

      Dario

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Dario Dario Heinisch
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - 24h
                  24h
                  Remaining:
                  Remaining Estimate - 24h
                  24h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified