Description
I have a hive table : DistributionAttributes, with
Schema:
root
– distributionstatus: string (nullable = true) |
– enabledforselectionflag: boolean (nullable = true) |
– sourcedistributionid: integer (nullable = true) |
– rowstartdate: date (nullable = true) |
– rowenddate: date (nullable = true) |
– rowiscurrent: string (nullable = true) |
– dwcreatedate: timestamp (nullable = true) |
– dwlastupdatedate: timestamp (nullable = true) |
– appid: integer (nullable = true) |
– siteid: integer (nullable = true) |
– brandid: integer (nullable = true) |
DataFrame
val df = spark.sql("SELECT s.sourcedistributionid as sid, t.sourcedistributionid as tid, s.appid as sapp, t.appid as tapp, s.brandid as sbrand, t.brandid as tbrand FROM DistributionAttributes t INNER JOIN DistributionAttributes s ON t.sourcedistributionid=s.sourcedistributionid AND t.appid=s.appid AND t.brandid=s.brandid").
Without BroadCastJoin ( spark-shell --conf "spark.sql.autoBroadcastJoinThreshold=-1") :
df.explain
== Physical Plan ==
*Project sourcedistributionid#71 AS sid#0, sourcedistributionid#60 AS tid#1, appid#77 AS sapp#2, appid#66 AS tapp#3, brandid#79 AS sbrand#4, brandid#68 AS tbrand#5
+- *SortMergeJoin sourcedistributionid#60, appid#66, brandid#68, sourcedistributionid#71, appid#77, brandid#79, Inner
:- *Sort sourcedistributionid#60 ASC, appid#66 ASC, brandid#68 ASC, false, 0
: +- Exchange hashpartitioning(sourcedistributionid#60, appid#66, brandid#68, 200)
: +- *Filter ((isnotnull(sourcedistributionid#60) && isnotnull(brandid#68)) && isnotnull(appid#66))
: +- HiveTableScan sourcedistributionid#60, appid#66, brandid#68, MetastoreRelation distributionattributes, t
+- *Sort sourcedistributionid#71 ASC, appid#77 ASC, brandid#79 ASC, false, 0
+- Exchange hashpartitioning(sourcedistributionid#71, appid#77, brandid#79, 200)
+- *Filter ((isnotnull(sourcedistributionid#71) && isnotnull(appid#77)) && isnotnull(brandid#79))
+- HiveTableScan sourcedistributionid#71, appid#77, brandid#79, MetastoreRelation distributionattributes, s
df.show
sid | tid | sapp | tapp | sbrand | tbrand |
22 | 22 | 61 | 61 | 614 | 614 |
29 | 29 | 65 | 65 | 0 | 0 |
30 | 30 | 12 | 12 | 121 | 121 |
10 | 10 | 73 | 73 | 731 | 731 |
24 | 24 | 61 | 61 | 611 | 611 |
35 | 35 | 65 | 65 | 0 | 0 |
With BroadCastJoin ( spark-shell )
df.explain
== Physical Plan ==
*Project sourcedistributionid#136 AS sid#65, sourcedistributionid#125 AS tid#66, appid#142 AS sapp#67, appid#131 AS tapp#68, brandid#144 AS sbrand#69, brandid#133 AS tbrand#70
+- *BroadcastHashJoin sourcedistributionid#125, appid#131, brandid#133, sourcedistributionid#136, appid#142, brandid#144, Inner, BuildRight
:- *Filter ((isnotnull(brandid#133) && isnotnull(appid#131)) && isnotnull(sourcedistributionid#125))
: +- HiveTableScan sourcedistributionid#125, appid#131, brandid#133, MetastoreRelation distributionattributes, t
+- BroadcastExchange HashedRelationBroadcastMode(List((shiftleft((shiftleft(cast(input[0, int, false] as bigint), 32) | (cast(input[1, int, false] as bigint) & 4294967295)), 32) | (cast(input[2, int, false] as bigint) & 4294967295))))
+- *Filter ((isnotnull(brandid#144) && isnotnull(sourcedistributionid#136)) && isnotnull(appid#142))
+- HiveTableScan sourcedistributionid#136, appid#142, brandid#144, MetastoreRelation distributionattributes, s
df.show
sid | tid | sapp | tapp | sbrand | tbrand |
15 | 22 | 61 | 61 | 614 | 614 |
13 | 22 | 61 | 61 | 614 | 614 |
10 | 22 | 61 | 61 | 614 | 614 |
7 | 22 | 61 | 61 | 614 | 614 |
9 | 22 | 61 | 61 | 614 | 614 |
16 | 22 | 61 | 61 | 614 | 614 |