Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27849

Harden correctness for non-deterministic updates present in the changelog pipeline

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • 1.20.0
    • Table SQL / Runtime
    • None
    • For complex streaming jobs, now it's possible to detect and resolve potential correctness issues before running.
    • Important

    Description

      There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations.

      It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken.

      We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state).

      All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND...
      or user defined non-deterministic functions (override isDeterministic return false)
      or a lookup join on a lookup source which data may change over time

      or a cdc-source with meta data field (described in FLINK-28242)

       

       

      ====== Solution  ======

      Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves).

      For implementation steps, the main changes may include 4 parts:

      1. [preparing work] Adds an internal postOptimize method for physical dag processing
      2. Introduces a `StreamNonDeterministicPlanResolver` to validate if there's any non-deterministic updates which may cause wrong result and rewrite lookup join node with materialization (to eliminate the non determinism generated by lookup join node)
      3. Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
      4. [optimization] SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty

       

       

       

       

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            lincoln.86xy lincoln lee
            lincoln.86xy lincoln lee

            Dates

              Created:
              Updated:

              Slack

                Issue deployment