Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22702

Spark sql filter with size function(if exists) leads twice calculation

    XMLWordPrintableJSON

    Details

    • Type: Question
    • Status: Resolved
    • Priority: Minor
    • Resolution: Invalid
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Component/s: SQL
    • Labels:
      None
    • Environment:

      jdk1.7.0_67
      spark-hive_2.11

      Description

      I occur an issue about spark-sql. When obtaining a Dataset through some logics, I wish to persist this Dataset as it would be used many times in the future. However, when persisting it, the logic would be calculated twice. Therefore I make some local test to reproduce this issue, and it happens.

      I test in three filter function, and found that,
      .filter(col("id") > 10) //expect
      .filter(length(col("name")) > 4) //expect
      .filter(size(col("seq_name")) > 1) //unexpect if filter exist

      i.e., the twice calculation issue occurs when filter out result exists.

      result image
      expected

      unexpected

      reproduce code
      import org.apache.hadoop.fs.{FileSystem, Path}
      import org.apache.spark.SparkConf
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{Dataset, Row, SparkSession}
      
      
      object TwiceCalculationReproducer {
        private val separator = scala.reflect.io.File.separator
        private val dirName = new java.io.File("").getAbsolutePath + separator + "testOutput"
      
        System.setProperty("spark.app.name", "TestController")
        System.setProperty("spark.master", "local[2]")
        private val ss = SparkSession.builder().config(new SparkConf()).enableHiveSupport().getOrCreate()
        ss.sparkContext.hadoopConfiguration.set("fs.defaultFS", "file:///")
        private val sc = ss.sparkContext
      
        def main(args: Array[String]) {
          val fs = FileSystem.get(sc.hadoopConfiguration)
          fs.delete(new Path(dirName), true)
          Thread.sleep(1000)
      
          /*expected*/
          val tableRaw = Dims.df
          val tableNewExp = seqColumnGeneratorExcepted(tableRaw)
          tableNewExp.persist()
          tableNewExp.show(10, 100)
      
          /*unexpected*/
          Thread.sleep(1000)
          val tableNewUnexp = seqColumnGeneratorUnexpected(tableRaw)
          tableNewExp.persist()
          tableNewUnexp.show(10, 100)
        }
      
        /*normal*/
        def seqColumnGeneratorExcepted[T](ds: Dataset[T]) = {
          ds.withColumn("seq_name", seqTokenUdf(col("id"), col("name")))
        }
      
        /*abnormal*/
        def seqColumnGeneratorUnexpected[T](ds: Dataset[T]) = {
          seqColumnGeneratorExcepted(ds)
              .filter(col("id") > 10) //expect
              .filter(length(col("name")) > 4) //expect
              .filter(size(col("seq_name")) > 1) //unexpect if filter exist
        }
      
        /*validator udf*/
        def seqTokenUdf = udf {
          (id: Int, name: String) => {
            /*validator 1: console print*/
            println(name + "_" + id + "_" + System.currentTimeMillis())
      
            /*validator 2: write file in case of executor not printing console*/
            val fs = FileSystem.get(sc.hadoopConfiguration)
            fs.setWriteChecksum(false)
            val fileName = Seq(dirName, name + "_" + System.currentTimeMillis.toString) mkString separator
            fs.create(new Path(fileName))
      
            /*return*/
            Seq[String](name, System.currentTimeMillis().toString)
          }
        }
      
        /*test data mock*/
        object Dims {
          private val structTypes = StructType(Seq(
            StructField("id", IntegerType),
            StructField("name", StringType)
          ))
      
          private val data = List(
            Row(100, "file100"),
            Row(101, "file101"),
            Row(102, "file102")
          )
      
          private val rdd = sc.parallelize(data)
          val df = ss.createDataFrame(rdd, structTypes)
        }
      
      }
      

      Is there any problem in the size() function?

      Kind regards,
      chenfh5

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              chenfh5 chenfh5
            • Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: