Details
-
New Feature
-
Status: Closed
-
Major
-
Resolution: Implemented
-
None
Description
Exchange and Sort is the most heavy operator, they are created in FlinkExpandConversionRule when some operators require its inputs to satisfy distribution trait or collation trait in planner rules. However, many operators could provide distribution or collation, e.g. BatchExecHashAggregate or BatchExecHashJoin could provide distribution on its shuffle keys, BatchExecSortMergeJoin could provide distribution and collation on its join keys. If the provided traits could satisfy the required traits, the Exchange or the Sort is redundant.
e.g.
schema: x: a int, b bigint, c varchar y: d int, e bigint, f varchar t1: a1 int, b1 bigint, c1 varchar t2: d1 int, e1 bigint, f1 varchar sql: select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2 on a1 = d1 and b1 = e1 the physical plan after redundant Exchange and Sort are removed: SortMergeJoin(joinType=[...], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...) :- SortMergeJoin(joinType=[...], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...) : :- SortMergeJoin(joinType=[...], where=[AND(=(a, d), =(b, e))], ...) : : :- Exchange(distribution=[hash[a, b]]) : : : +- TableSourceScan(table=[[x]], ...) : : +- Exchange(distribution=[hash[d, e]]) : : +- TableSourceScan(table=[[y]], ...) : +- Exchange(distribution=[hash[a1, b1]]) : +- TableSourceScan(table=[[t1]], ...) +- Exchange(distribution=[hash[d1, e1]]) +- TableSourceScan(table=[[t2]], ...)
In above physical plan, the Exchanges between SortMergeJoins are redundant due to their shuffle keys are same, the Sorts in the top two SortMergeJoins' left hand side are redundant due to its input is sorted.
notes: after exchange removed, there maybe exist a sub-tree like localHashAggregate -> globalHashAggregate, the localHashAggregate should be removed due to localHashAggregate is redundant. so do localRank -> globalRank, localSortAggregate -> globalSortAggregate.
another situation is the shuffle and collation could be removed between multiple OVERs. e.g.
schema: MyTable: a int, b int, c varchar sql: SELECT COUNT(*) OVER (PARTITION BY c ORDER BY a), SUM(a) OVER (PARTITION BY b ORDER BY a), RANK() OVER (PARTITION BY c ORDER BY a, c), SUM(a) OVER (PARTITION BY b ORDER BY a), COUNT(*) OVER (PARTITION BY c ORDER BY c) FROM MyTable the physical plan after redundant Exchange and Sort are removed: Calc(select=[...]) +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) ...]) +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) ...], window#1=[RANK(*) ...], ...) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a), $SUM0(a) ...], ...) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) +- TableSourceScan(table=[[MyTable]], ...)
the Exchange and Sort between the top two OverAggregates are redundant due to their shuffle keys and sort keys are same.
Attachments
Issue Links
- is a child of
-
FLINK-11488 Add a basic Blink planner framework
- Closed
- links to