Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23011

FLIP-27 sources are generating non-deterministic results when using event time

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.14.0
    • 1.14.0
    • API / DataStream
    • None

    Description

      FLIP-27 sources currently start in the StreamStatus.IDLE state and they switch to ACTIVE only after emitting first Watermark. Until this happens, downstream operators are ignoring IDLE inputs from calculating the input (min) watermark.

      An extreme example to what problem this leads to, are completely bogus results if for example one FLIP-27 source subtask is slower than others for some reason:

      env.getConfig().setAutoWatermarkInterval(2000);
      env.setParallelism(2);
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10));
      
      DataStream<Long> eventStream =
              env.fromSource(
                              new NumberSequenceSource(0, Long.MAX_VALUE),
                              WatermarkStrategy.<Long>forMonotonousTimestamps()
                                      .withTimestampAssigner(new LongTimestampAssigner()),
                              "NumberSequenceSource")
                      .map(
                              new RichMapFunction<Long, Long>() {
                                  @Override
                                  public Long map(Long value) throws Exception {
                                      if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                                          Thread.sleep(1);
                                      }
                                      return 1L;
                                  }
                              });
      
      eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
      
      (...)
      private static class LongTimestampAssigner implements SerializableTimestampAssigner<Long> {
          private long counter = 0;
      
          @Override
          public long extractTimestamp(Long record, long recordTimeStamp) {
              return counter++;
          }
      }
      

      In such case, after 2 seconds (setAutoWatermarkInterval) the not throttled subtask (subTaskId == 1) generates very high watermarks. The other source subtask (subTaskId == 0) emits very low watermarks. If the non throttled watermark reaches the downstream WindowOperator first, while the other input channel is still idle, it will take those high watermarks as combined input watermark for the the whole WindowOperator. When the input channel from the throttled source subtask finally receives it's ACTIVE status and a much lower watermark, that's already too late.

      Actual output of the example program:

      1596
      2000
      1000
      1000
      1000
      1000
      1000
      1000
      (...)
      

      while the expected output should be always "2000" (2000 records fitting in every 1 second global window)

      2000
      2000
      2000
      2000
      (...)
      

      .

      Attachments

        Issue Links

          Activity

            People

              dwysakowicz Dawid Wysakowicz
              pnowojski Piotr Nowojski
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: