Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Incomplete
-
2.3.1
-
None
-
Reproducible on local Spark executors
Description
It seems like Spark doesn't manage find already cached data and triggers a re-read from the source in case when all the following circumstances are present:
- Caching a dataset that is derived from another already cached dataset
- Having nullable fields in the Dataset of types that are mapped to primitives (Integer, Boolean, etc...)
- Using a UDF that takes non-nullable primitive parameters (Int, Boolean) on one of these nullable fields
In this case Spark's optimizer will create a null-check on these fields prior to passing them into the UDF's code (presumably to prevent a NullPointerException). The plans with the null-checks are generated when a Dataset is persisted and when the optimizer is looking for fragments of a query plan that can be read from cache instead of resolving the full Dataset lineage.
However; the plan fragments between the query that is currently being persisted and what is in the cache do not match - the lookup version will have the null-check included into the query plan twice; thus preventing the optimizer from generating a plan that relies on the cached data and ultimately triggering a repeated read from the original datasource.
The reproduction sequence in pseudocode:
base = spark.someDataset(columns: ("a" String, "b" Int)) someUdf = someUdf(x: Int) cached = base.select($"a", someUdf($"b")).cache() cachedAgain = cached.select(...).cached()
In this case I'd expect 'cachedAgain' to re-use the data from 'cached' instead of re-reading 'base'.
I'm attaching a small Scala code fragment with a more detailed reproduction.
Interestingly; the whole process works if I replace 'Int' with 'java.lang.Integer' in the UDF parameter-list. The generated physical plans are:
--- using scala.Int --- == Physical Plan == InMemoryTableScan [a#6, (b * 2)#23] +- InMemoryRelation [a#6, (b * 2)#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [a#6, (if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) * 2) AS (b * 2)#23] +- Scan ExistingRDD[a#6,b#7]
--- using java.lang.Integer --- == Physical Plan == InMemoryTableScan [a#53, (b * 2)#70] +- InMemoryRelation [a#53, (b * 2)#70], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [a#53, (b#57 * 2) AS (b * 2)#70] +- InMemoryTableScan [a#53, b#57] +- InMemoryRelation [a#53, b#57], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [a#53, UDF(b#54) AS b#57] +- Scan ExistingRDD[a#53,b#54]
The underlying issue seems to be that when `persist` is called; Spark will effectively execute this code:
spark.sessionState.executePlan(dataset.queryExecution.analyzed).analyzed
(CacheManager line #100 and #166)
This seems to generate the null-check on nullable fields twice, which won't match with the already cached plan:
Plan cached: Project [a#6, if (isnull(b#7)) null else UDF(b#7) AS b#10] +- LogicalRDD [a#6, b#7], false Plan that is used in the lookup: Project [a#6, if (isnull(b#7)) null else if (isnull(b#7)) null else UDF(b#7) AS b#10] +- LogicalRDD [a#6, b#7], false