Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23220

broadcast hint not applied in a streaming left anti join

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Resolved
    • 2.2.1
    • 2.3.0
    • Structured Streaming
    • None

    Description

      We have a structured streaming app doing a left anti-join between a stream, and a static dataframe. This one is quite small (a few 100s of rows), but the query plan by default is a sort merge join.
       
      It happens sometimes we need to re-process some historical data, so we feed the same app with a FileSource pointing to our S3 storage with all archives. In that situation, the first mini-batch is quite heavy (several 100'000s of input files), and the time spent in sort-merge join is non-acceptable. Additionally it's highly skewed, so partition sizes are completely uneven, and executors tend to crash with OOMs.

      I tried to switch to a broadcast join, but Spark still applies a sort-merge.

      ds.join(broadcast(hostnames), Seq("hostname"), "leftanti")
      

      The logical plan is :

      Project [app_id#5203, <--- snip ---> ... 18 more fields]
      +- Project ...
      <-- snip -->
               +- Join LeftAnti, (hostname#3584 = hostname#190)
                  :- Project [app_id, ...
      <-- snip -->
                                             +- StreamingExecutionRelation FileStreamSource[s3://xxxx{/2018/{01,02}/*/*/,/2017/{08,09,10,11,12}/*/*/}], [app_id
       <--snip--> ... 62 more fields]
                  +- ResolvedHint isBroadcastable=true
                     +- Relation[hostname#190,descr#191] RedshiftRelation("PUBLIC"."hostname_filter")
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mathieude Mathieu DESPRIEE
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: