Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-23011

FLIP-27 sources are generating non-deterministic results when using event time

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.14.0
    • Fix Version/s: 1.14.0
    • Component/s: API / DataStream
    • Labels:
      None

      Description

      FLIP-27 sources currently start in the StreamStatus.IDLE state and they switch to ACTIVE only after emitting first Watermark. Until this happens, downstream operators are ignoring IDLE inputs from calculating the input (min) watermark.

      An extreme example to what problem this leads to, are completely bogus results if for example one FLIP-27 source subtask is slower than others for some reason:

      env.getConfig().setAutoWatermarkInterval(2000);
      env.setParallelism(2);
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 10));
      
      DataStream<Long> eventStream =
              env.fromSource(
                              new NumberSequenceSource(0, Long.MAX_VALUE),
                              WatermarkStrategy.<Long>forMonotonousTimestamps()
                                      .withTimestampAssigner(new LongTimestampAssigner()),
                              "NumberSequenceSource")
                      .map(
                              new RichMapFunction<Long, Long>() {
                                  @Override
                                  public Long map(Long value) throws Exception {
                                      if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
                                          Thread.sleep(1);
                                      }
                                      return 1L;
                                  }
                              });
      
      eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
      
      (...)
      private static class LongTimestampAssigner implements SerializableTimestampAssigner<Long> {
          private long counter = 0;
      
          @Override
          public long extractTimestamp(Long record, long recordTimeStamp) {
              return counter++;
          }
      }
      

      In such case, after 2 seconds (setAutoWatermarkInterval) the not throttled subtask (subTaskId == 1) generates very high watermarks. The other source subtask (subTaskId == 0) emits very low watermarks. If the non throttled watermark reaches the downstream WindowOperator first, while the other input channel is still idle, it will take those high watermarks as combined input watermark for the the whole WindowOperator. When the input channel from the throttled source subtask finally receives it's ACTIVE status and a much lower watermark, that's already too late.

      Actual output of the example program:

      1596
      2000
      1000
      1000
      1000
      1000
      1000
      1000
      (...)
      

      while the expected output should be always "2000" (2000 records fitting in every 1 second global window)

      2000
      2000
      2000
      2000
      (...)
      

      .

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                dwysakowicz Dawid Wysakowicz
                Reporter:
                pnowojski Piotr Nowojski
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: