Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24373

"df.cache() df.count()" no longer eagerly caches data when the analyzed plans are different after re-analyzing the plans

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1, 2.4.0
    • SQL
    • None

    Description

      Here is the code to reproduce in local mode

      scala> val df = sc.range(1, 2).toDF
      df: org.apache.spark.sql.DataFrame = [value: bigint]
      
      scala> val myudf = udf({x: Long => println("xxxx"); x + 1})
      myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
      
      scala> val df1 = df.withColumn("value1", myudf(col("value")))
      df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint]
      
      scala> df1.cache
      res0: df1.type = [value: bigint, value1: bigint]
      
      scala> df1.count
      res1: Long = 1 
      
      scala> df1.count
      res2: Long = 1
      
      scala> df1.count
      res3: Long = 1
      

       

      in Spark 2.2, you could see it prints "xxxx". 

      In the above example, when you do explain. You could see

      scala> df1.explain(true)
      == Parsed Logical Plan ==
      'Project [value#2L, UDF('value) AS value1#5]
      +- AnalysisBarrier
      +- SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- ExternalRDD [obj#1L]
      
      == Analyzed Logical Plan ==
      value: bigint, value1: bigint
      Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
      +- SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- ExternalRDD [obj#1L]
      
      == Optimized Logical Plan ==
      InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
      +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- Scan ExternalRDDScan[obj#1L]
      
      == Physical Plan ==
      *(1) InMemoryTableScan [value#2L, value1#5L]
      +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L]
      +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- Scan ExternalRDDScan[obj#1L]
      
      

      but the ImMemoryTableScan is mising in the following explain()

      scala> df1.groupBy().count().explain(true)
      == Parsed Logical Plan ==
      Aggregate [count(1) AS count#170L]
      +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
      +- SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- ExternalRDD [obj#1L]
      
      == Analyzed Logical Plan ==
      count: bigint
      Aggregate [count(1) AS count#170L]
      +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L]
      +- SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- ExternalRDD [obj#1L]
      
      == Optimized Logical Plan ==
      Aggregate [count(1) AS count#170L]
      +- Project
      +- SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- ExternalRDD [obj#1L]
      
      == Physical Plan ==
      *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L])
      +- Exchange SinglePartition
      +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L])
      +- *(1) Project
      +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L]
      +- Scan ExternalRDDScan[obj#1L]
      

       

       

      Attachments

        Activity

          People

            mgaido Marco Gaido
            wbzhao Wenbo Zhao
            Votes:
            2 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: