Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24588

StreamingSymmetricHashJoinExec should require HashClusteredPartitioning from children

    XMLWordPrintableJSON

Details

    Description

      In https://github.com/apache/spark/pull/19080, we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredDistribution from children. Unfortunately streaming join operator was missed.

      This can cause wrong result. Think about

      val input1 = MemoryStream[Int]
      val input2 = MemoryStream[Int]
      
      val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b)
      val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b)
      val joined = df1.join(df2, Seq("a", "b")).select('a)
      

      The physical plan is

      *(3) Project [a#5, b#6, c#7, c#14]
      +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
         :- Exchange hashpartitioning(a#5, b#6, 5)
         :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS c#7]
         :     +- StreamingRelation MemoryStream[value#1], [value#1]
         +- Exchange hashpartitioning(b#13, 5)
            +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS c#14]
               +- StreamingRelation MemoryStream[value#3], [value#3]
      

      The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not.

      Attachments

        Activity

          People

            cloud_fan Wenchen Fan
            cloud_fan Wenchen Fan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: