Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Resolved
-
2.2.1
-
None
Description
We have a structured streaming app doing a left anti-join between a stream, and a static dataframe. This one is quite small (a few 100s of rows), but the query plan by default is a sort merge join.
It happens sometimes we need to re-process some historical data, so we feed the same app with a FileSource pointing to our S3 storage with all archives. In that situation, the first mini-batch is quite heavy (several 100'000s of input files), and the time spent in sort-merge join is non-acceptable. Additionally it's highly skewed, so partition sizes are completely uneven, and executors tend to crash with OOMs.
I tried to switch to a broadcast join, but Spark still applies a sort-merge.
ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
The logical plan is :
Project [app_id#5203, <--- snip ---> ... 18 more fields] +- Project ... <-- snip --> +- Join LeftAnti, (hostname#3584 = hostname#190) :- Project [app_id, ... <-- snip --> +- StreamingExecutionRelation FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], [app_id <--snip--> ... 62 more fields] +- ResolvedHint isBroadcastable=true +- Relation[hostname#190,descr#191] RedshiftRelation("PUBLIC"."hostname_filter")
Attachments
Attachments
Issue Links
- is caused by
-
SPARK-22673 InMemoryRelation should utilize on-disk table stats whenever possible
- Resolved