Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24588

StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      In https://github.com/apache/spark/pull/19080, we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredDistribution from children. Unfortunately streaming join operator was missed.

      This can cause wrong result. Think about

      val input1 = MemoryStream[Int]
      val input2 = MemoryStream[Int]
      
      val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
      val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
      val joined = df1.join(df2, Seq("a", "b")).select('a)
      

      The physical plan is

      *(3) Project [a#5, b#6, c#7, c#14]
      +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
         :- Exchange hashpartitioning(a#5, b#6, 5)
         :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS c#7]
         :     +- StreamingRelation MemoryStream[value#1], [value#1]
         +- Exchange hashpartitioning(b#13, 5)
            +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS c#14]
               +- StreamingRelation MemoryStream[value#3], [value#3]
      

      The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            cloud_fan Wenchen Fan
            cloud_fan Wenchen Fan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment