There commonly exists updates(which means not only RowKind.INSERT messages) in a streaming pipeline, then wrong results or error may occurs when use some non-deterministic functions or operations.
It is a long lived issue since the first day that flink sql was available in streaming, but it still not totally be eliminated though some efforts have been taken.
We should detect all the non-deterministic operations in the changelog pipelines, raise an error to tell users the risk and also add an mechanism that can process such a issue if a user is willing to pay some cost(probably introduce the state).
All non-deterministic operations include builtin temporal functions(now, current_timestamp...), UUID, RAND...
or user defined non-deterministic functions (override isDeterministic return false)
or a lookup join on a lookup source which data may change over time
or a cdc-source with meta data field (described in
====== Solution ======
Will introduce a physical plan checker to validate if there's any non-deterministic updates which may cause wrong result, and also a physical plan rewriter to eliminate the non determinism generated by lookup join node (which we think is commonly used in sql, and hard to solve by users themselves).
For implementation steps, the main changes may include 4 parts:
- [preparing work] Adds an internal postOptimize method for physical dag processing
- Introduces a `StreamNonDeterministicPlanResolver` to validate if there's any non-deterministic updates which may cause wrong result and rewrite lookup join node with materialization (to eliminate the non determinism generated by lookup join node)
- Implements a new lookup join operator (sync mode only) with state to eliminate the non determinism
- [optimization] SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty