Details
-
Improvement
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
-
Release Notes Required
Description
Sometimes, if join inputs are not collocated it's worth to broadcast one of the inputs, for example, query:
SELECT * FROM emps WHERE emps.salary = (SELECT AVG(emps.salary) FROM emps)
Currently has plan:
IgniteProject(ID=[$0], NAME=[$1], SALARY=[$2]) IgniteNestedLoopJoin(condition=[=($2, $3)], joinType=[inner]) IgniteExchange(distribution=[single]) IgniteTableScan(table=[[PUBLIC, EMPS]]) IgniteReduceHashAggregate(group=[{}], AVG(EMPS.SALARY)=[AVG($0)]) IgniteExchange(distribution=[single]) IgniteMapHashAggregate(group=[{}], AVG(EMPS.SALARY)=[AVG($0)]) IgniteIndexScan(table=[[PUBLIC, EMPS]], index=[TST], requiredColumns=[{2}], collation=[[2 ASC-nulls-first]])
But this plan is not optimal, since we should send entire table EMP from all nodes to the single node. For such a query it's better to broadcast result of the aggregation, in this case plan will be something like:
IgniteExchange(distribution=[single]) IgniteProject(...) IgniteCorrelatedNestedLoopJoin(...) IgniteExchange(distribution=[broadcast]) IgniteReduceHashAggregate(group=[{}], AVG(EMPS.SALARY)=[AVG($0)]) IgniteExchange(distribution=[single]) IgniteMapHashAggregate(group=[{}], AVG(EMPS.SALARY)=[AVG($0)]) IgniteIndexScan(table=[[PUBLIC, EMPS]], index=[SALARY_IDX]) IgniteIndexScan(table=[[PUBLIC, EMPS]], index=[SALARY_IDX])
But currently we don't try to convert any of the join inputs to the broadcast distribution. We should try to do this.