Description
We will insert a Project if the Join output is not match the LocalLimit. We can not push down limit through join in this case. For example:
spark.sql("create table t1(a int, b int, c int) using parquet") spark.sql("create table t2(x int, y int, z int) using parquet") spark.sql("select a from t1 left join t2 on a = x and b = y limit 5").explain("cost")
Current:
== Optimized Logical Plan == GlobalLimit 5 +- LocalLimit 5 +- Project [a#0] +- Join LeftOuter, ((a#0 = x#3) AND (b#1 = y#4)) :- Project [a#0, b#1] : +- Relation default.t1[a#0,b#1,c#2] parquet +- Project [x#3, y#4] +- Filter (isnotnull(x#3) AND isnotnull(y#4)) +- Relation default.t2[x#3,y#4,z#5] parquet
Excepted:
== Optimized Logical Plan == GlobalLimit 5 +- LocalLimit 5 +- Project [a#0] +- Join LeftOuter, ((a#0 = x#3) AND (b#1 = y#4)) :- LocalLimit 5 : +- Project [a#0, b#1] : +- Relation default.t1[a#0,b#1,c#2] parquet +- Project [x#3, y#4] +- Filter (isnotnull(x#3) AND isnotnull(y#4)) +- Relation default.t2[x#3,y#4,z#5] parquet