Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
Description
When we write a sql like
select udf2(udf1(field), udf3(udf1(field) ...
udf1(field) will be invoked twice. However once udf1 has a bad performance, it will have a huge impact to the whole task. More times invoked, huger impact.
I hope that whatever how many times udf1(field) writed in sql, Flink will take advantage of common subexpression elimination and only invoke it once.
Then i do some work on this, and the attachment tells the result.
The sql.png shows the sql logic,
and i read source from kafka and sink into blackhole. The parallelism is 1.
The udf `testcse` do nothing except sleeping 20 milliseconds, while the udf `testcse2`, `testcse3` and `testcse4` are the same udf with different alias which completely do nothing.
As expected, the performance after optimization is approximately 3 times than before since I write `testcse(sid)` 3 times in sql.
before:
after: