Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33059

Bug with self joined tables after posexploded column

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 3.0.1
    • Fix Version/s: None
    • Component/s: ML, MLlib
    • Labels:
      None
    • Environment:

      OS ubuntu 16.07 LTS

      8GB Ram

      Intel Core i7

      Description

       

      Test below should pass success, but not. 

      DataFrames have only one difference, validDF data column type is Array[Int], and in testDF type is Array[Vector].

      This bug we catch when use MinHashLSH from Spark ML

      // Bug with self joined tables with posexplode columns
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.SparkSession
      import org.scalatest.flatspec.AnyFlatSpec
      import org.apache.spark.sql.functions.{udf, col, struct, posexplode}
      import org.apache.spark.ml.linalg.Vectors
      
      import scala.collection.mutable
      
      
      case class TestRow(s: String, l: Array[Long], data: Array[Int])
      
      class SparkBug extends AnyFlatSpec{
      
        "Test case" should "equal count" in {
          val conf = new SparkConf()
            .setMaster("local[8]")
            .set("spark.sql.codegen.wholeStage", "true")//its important, if set false, test will pass success
      
          val spark = SparkSession.builder().config(conf).getOrCreate()
      
          val vecUdf = udf(
            (s: mutable.WrappedArray[Long]) =>
              s.map(v => Vectors.dense(v))
          )
      
          val r = new scala.util.Random(1000)
      
          import spark.implicits._
      
          val rowCount = 200000//its important, if row count less than 200000(150000 for example), test will pass success
      
          val df = (0 to rowCount).map{
            case _ =>
              TestRow(
                "",
                (0 to 40).map(t => t.toLong).toArray,
                (0 to 4)
                  .map(_ => r.nextInt(100000)).toArray
              )
          }.toDF()
            .coalesce(1)//Important, without coalesce(1) test pass success
      
          val testDF = df
            .withColumn("data", vecUdf(col("data")))
            .select(
              struct(col("*")).as("dd"),
              posexplode(col("data"))
            )
      
          val validDF = df.select(struct(col("*")).as("dd"), posexplode(col("data")))
          //Difference between validDF and testDF, that testDF data col have Array[Vector] type, and valid DF data col have Array[Long]
      
          val testCount = testDF
            .join(testDF, Seq("pos", "col"))
            .distinct()//Important, without distinct test pass success
            .count
      
          val validCount = validDF
            .join(validDF, Seq("pos", "col"))
            .distinct()
            .count
      
          //count should be equal, but not
          assert(testCount == validCount)
        }
      }
      
      

       

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              Lookuut Struchkov Lookuut Fedorovich
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: