Description
I couldn't get the broadcast(DataFrame) sql function to work in Spark 2.0.
It does work in Spark 1.6.1:
$ pyspark --conf spark.sql.autoBroadcastJoinThreshold=0
>>> df = sqlCtx.range(1000);df2 = sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2), 'id').explain()
== Physical Plan ==
Project [id#0L]
+- BroadcastHashJoin [id#0L], [id#1L], BuildRight
:- ConvertToUnsafe
: +- Scan ExistingRDD[id#0L]
+- ConvertToUnsafe
+- Scan ExistingRDD[id#1L]
While in Spark 2.0 this results in:
>>> df = sqlCtx.range(1000);df2 = sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2), 'id').explain() == Physical Plan == WholeStageCodegen : +- Project [id#6L] : +- SortMergeJoin [id#6L], [id#9L], Inner, None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Sort [id#6L ASC], false, 0 : : +- INPUT : +- Exchange hashpartitioning(id#6L, 200), None : +- WholeStageCodegen : : +- Range 0, 1, 8, 1000, [id#6L] +- WholeStageCodegen : +- Sort [id#9L ASC], false, 0 : +- INPUT +- ReusedExchange [id#9L], Exchange hashpartitioning(id#6L, 200), None
While it should look like (output when you remove the spark.sql.autoBroadcastJoinThreshold conf):
== Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight, None : :- Range 0, 1, 8, 1000, [id#0L] : +- INPUT +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint])) +- WholeStageCodegen : +- Range 0, 1, 8, 1000, [id#3L]