Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39976

NULL check in ArrayIntersect adds extraneous null from first param

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.3.1, 3.4.0
    • SQL

    Description

      This is very likely a regression from SPARK-36829.

      When using array_intersect(a, b), if the first parameter contains a NULL value and the second one does not, an extraneous NULL is present in the output. This also leads to array_intersect(a, b) != array_intersect(b, a) which is incorrect as set intersection should be commutative.

      Example using PySpark:

      >>> a = [1, 2, 3]
      >>> b = [3, None, 5]
      >>> df = spark.sparkContext.parallelize(data).toDF(["a","b"])
      >>> df.show()
      +---------+------------+
      |        a|           b|
      +---------+------------+
      |[1, 2, 3]|[3, null, 5]|
      +---------+------------+
      
      >>> df.selectExpr("array_intersect(a,b)").show()
      +---------------------+
      |array_intersect(a, b)|
      +---------------------+
      |                  [3]|
      +---------------------+
      
      >>> df.selectExpr("array_intersect(b,a)").show()
      +---------------------+
      |array_intersect(b, a)|
      +---------------------+
      |            [3, null]|
      +---------------------+
      

      Note that in the first case, a does not contain a NULL, and the final output is correct: [3]. In the second case, since b does contain NULL and is now the first parameter.

      The same behavior occurs in Scala when writing to Parquet:

      scala> val a = Array[java.lang.Integer](1, 2, null, 4)
      a: Array[Integer] = Array(1, 2, null, 4)
      
      scala> val b = Array[java.lang.Integer](4, 5, 6, 7)
      b: Array[Integer] = Array(4, 5, 6, 7)
      
      scala> val df = Seq((a, b)).toDF("a","b")
      df: org.apache.spark.sql.DataFrame = [a: array<int>, b: array<int>]
      
      scala> df.write.parquet("/tmp/simple.parquet")
      
      scala> val df = spark.read.parquet("/tmp/simple.parquet")
      df: org.apache.spark.sql.DataFrame = [a: array<int>, b: array<int>]
      
      scala> df.show()
      +---------------+------------+
      |              a|           b|
      +---------------+------------+
      |[1, 2, null, 4]|[4, 5, 6, 7]|
      +---------------+------------+
      
      
      scala> df.selectExpr("array_intersect(a,b)").show()
      +---------------------+
      |array_intersect(a, b)|
      +---------------------+
      |            [null, 4]|
      +---------------------+
      
      
      scala> df.selectExpr("array_intersect(b,a)").show()
      +---------------------+
      |array_intersect(b, a)|
      +---------------------+
      |                  [4]|
      +---------------------+
      

      Attachments

        Activity

          People

            angerszhu angerszhu
            navkumar Navin Kumar
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: