Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27849

Harden correctness for non-deterministic updates present in the changelog pipeline



    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • 2.0.0
    • Table SQL / Runtime
    • None
    • For complex streaming jobs, now it's possible to detect and resolve potential correctness issues before running.
    • Important


      There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations.

      It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken.

      We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state).

      All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND...
      or user defined non-deterministic functions (override isDeterministic return false)
      or a lookup join on a lookup source which data may change over time

      or a cdc-source with meta data field (described in FLINK-28242)



      ====== Solution  ======

      Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves).

      For implementation steps, the main changes may include 4 parts:

      1. [preparing work] Adds an internal postOptimize method for physical dag processing
      2. Introduces a `StreamNonDeterministicPlanResolver` to validate if there's any non-deterministic updates which may cause wrong result and rewrite lookup join node with materialization (to eliminate the non determinism generated by lookup join node)
      3. Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
      4. [optimization] SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty







        Issue Links



              lincoln.86xy lincoln lee
              lincoln.86xy lincoln lee
              0 Vote for this issue
              13 Start watching this issue