Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25707

Repeated caching doesn't work when using an UDF on a nullable field

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Optimizer, Spark Core
    • Reproducible on local Spark executors

    Description

      It seems like Spark doesn't manage find already cached data and triggers a re-read from the source in case when all the following circumstances are present:

      1. Caching a dataset that is derived from another already cached dataset
      2. Having nullable fields in the Dataset of types that are mapped to primitives (Integer, Boolean, etc...)
      3. Using a UDF that takes non-nullable primitive parameters (Int, Boolean) on one of these nullable fields

      In this case Spark's optimizer will create a null-check on these fields prior to passing them into the UDF's code (presumably to prevent a NullPointerException). The plans with the null-checks are generated when a Dataset is persisted and when the optimizer is looking for fragments of a query plan that can be read from cache instead of resolving the full Dataset lineage.

      However; the plan fragments between the query that is currently being persisted and what is in the cache do not match - the lookup version will have the null-check included into the query plan twice; thus preventing the optimizer from generating a plan that relies on the cached data and ultimately triggering a repeated read from the original datasource.

      The reproduction sequence in pseudocode:

      base = spark.someDataset(columns: ("a" String, "b" Int))
      someUdf = someUdf(x: Int)
      cached = base.select($"a", someUdf($"b")).cache()
      cachedAgain = cached.select(...).cached()

      In this case I'd expect 'cachedAgain' to re-use the data from 'cached' instead of re-reading 'base'.

      I'm attaching a small Scala code fragment with a more detailed reproduction. 

      Interestingly; the whole process works if I replace 'Int' with 'java.lang.Integer' in the UDF parameter-list. The generated physical plans are:

      --- using scala.Int ---
      == Physical Plan ==
      InMemoryTableScan [a#6, (b * 2)#23]
      +- InMemoryRelation [a#6, (b * 2)#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(1) Project [a#6, (if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) * 2) AS (b * 2)#23]
      +- Scan ExistingRDD[a#6,b#7]
      
      
      --- using java.lang.Integer ---
      == Physical Plan ==
      InMemoryTableScan [a#53, (b * 2)#70]
      +- InMemoryRelation [a#53, (b * 2)#70], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(1) Project [a#53, (b#57 * 2) AS (b * 2)#70]
      +- InMemoryTableScan [a#53, b#57]
      +- InMemoryRelation [a#53, b#57], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(1) Project [a#53, UDF(b#54) AS b#57]
      +- Scan ExistingRDD[a#53,b#54]

      The underlying issue seems to be that when `persist` is called; Spark will effectively execute this code:

      spark.sessionState.executePlan(dataset.queryExecution.analyzed).analyzed

      (CacheManager line #100 and #166)

      This seems to generate the null-check on nullable fields twice, which won't match with the already cached plan:

      Plan cached:
      Project [a#6, if (isnull(b#7)) null else UDF(b#7) AS b#10]
      +- LogicalRDD [a#6, b#7], false
      
      
      Plan that is used in the lookup:
      Project [a#6, if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) AS b#10]
      +- LogicalRDD [a#6, b#7], false

      Attachments

        1. UdfCachingNullableBug.scala
          2 kB
          David Borsos

        Activity

          People

            Unassigned Unassigned
            davibo_els David Borsos
            Votes:
            2 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: