Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14228

The runtime support for Bounded[One|Multi]Input#endInput does not properly implement their semantics

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.9.0
    • 1.11.0
    • Runtime / Task
    • None

    Description

      Currently, the runtime support implementation of Bounded[One|Multi]Input#endInput has the following problems:

      • The runtime are propagating endInput immediately on the operator chain when input of the head operator is finished. Because some operators flush the buffered data in close, the downstream operators still receive records after executing endInput. This need the operators to flush the buffered data in endInput instead of close, like the PRs for fixing issue#13491 and issue#13376.
      • Timers are not taken into account.

      Actually, StreamOperator#close tells the operator to finish all its processing and flush output (all remaining buffered data), while endInput indicates that no more data will arrive on some input of the operator. That is to say, for the non-tail operators on the operator chain, when the upstream operator is closed, the input of its downstream operator arrives at the end. So for an operator chain {{OP1 -> OP2 -> ... }},  the logic should be:

      1. (Source/Network)Input of OP1 is finished.
      2. call OP1#endInput
      3. quiesce ProcessingTimeService to disallow OP1 from registering new timers.
      4. wait for the pending timers (in processing) of OP1 to finish.
      5. call OP1#close
      6. call OP2#endInput
      7. quiesce ProcessingTimeService to disallow OP2 from registering new timers.
      8. ...

      Attachments

        Issue Links

          There are no Sub-Tasks for this issue.

          Activity

            People

              sunhaibotb Haibo Sun
              sunhaibotb Haibo Sun
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 40m
                  40m