Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
After FLINK-25995 is finished, we have add an exchange (which will be converted to ForwardForConsecutiveHashPartitioner) for the nodes which do not need explicitly hash shuffle (which input has already hashed)
e.g.
WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%') SELECT sum(b1) FROM r group by a1
the plan after FLINK-25995 is finished:
Calc(select=[EXPR$0]) +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0]) +- Exchange(distribution=[keep_input_as_is[hash[a1]]) +- Calc(select=[a1, b1]) +- HashJoin(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2], build=[left]) :- Exchange(distribution=[hash[a1]]) : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], fields=[a1, b1, c1]) +- Exchange(distribution=[hash[a2]]) +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2])
but data between Calc and HashJoin may be out of order once their parallelism is different, so an Exchange(distribution=[keep_input_as_is[hash[a1]]) should be added between them.
Attachments
Issue Links
- fixes
-
FLINK-25995 Make implicit assumption of SQL local hash explicit
- Closed
- links to