When we write a sql like
udf1(field) will be invoked twice. However once udf1 has a bad performance, it will have a huge impact to the whole task. More times invoked, huger impact.
I hope that whatever how many times udf1(field) writed in sql, Flink will take advantage of common subexpression elimination and only invoke it once.
Then i do some work on this, and the attachment tells the result.
The sql.png shows the sql logic,
and i read source from kafka and sink into blackhole. The parallelism is 1.
The udf `testcse` do nothing except sleeping 20 milliseconds, while the udf `testcse2`, `testcse3` and `testcse4` are the same udf with different alias which completely do nothing.
As expected, the performance after optimization is approximately 3 times than before since I write `testcse(sid)` 3 times in sql.