Details
-
New Feature
-
Status: Closed
-
Major
-
Resolution: Implemented
-
None
Description
Currently, Flink planner will optimize the plan in writeToSink method. If there are more than one sink in a query, each sink-tree will be optimized independent and the result execution plans are also completely independent. Actually, there is a high probability of duplicate computing for a multiple-sinks query. This issue aims to resolve the above problem.
The basic idea of the solution is as following:
1. lazy optimization: does not optimize the plan in writeToSink method, just puts the plan into a collection.
2. whole plan optimization and execution: a new execute method is added in TableEnvironment, this method will trigger whole plan optimization and execute the job.
The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to be optimized once
e.g.
val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not null) t2 where a1 = b2") tableEnv.registerTable("TempTable", table) val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70") tableEnv.writeToSink(table1, Sink1) val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70") tableEnv.writeToSink(table2, Sink2)
the above plan will be decomposed into 3 blocks, block1 is the input of block2 and block3. block2 and block3 will be optimized after block1 has been optimized.
Attachments
Attachments
Issue Links
- is a child of
-
FLINK-11488 Add a basic Blink planner framework
- Closed
- links to