Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35157

Sources with watermark alignment get stuck once some subtasks finish

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      The current watermark alignment logic can easily get stuck if some subtasks finish while others are still running.

      The reason is that once a source subtask finishes, the subtask is not excluded from alignment, effectively blocking the rest of the job to make progress beyond last wm + alignment time for the finished sources.

      This can be easily reproduced by the following simple pipeline:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setParallelism(2);
          DataStream<Long> s = env.fromSource(new NumberSequenceSource(0, 100),
                  WatermarkStrategy.<Long>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Long>) (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), Duration.ofSeconds(2)),
                  "Sequence Source").filter((FilterFunction<Long>) aLong -> {
              Thread.sleep(200);
              return true;
          }
      );
      
          s.print();
          env.execute();

      The solution could be to send out a max watermark event once the sources finish or to exclude them from the source coordinator

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            elon elon_X
            gyfora Gyula Fora
            Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment