Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.4.0
-
None
-
None
Description
Consider a Spark job with an expensive UDF which looks like follows:
val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i)) spark.range(10).write.format("orc").save("/tmp/orc") val df = spark.read.format("orc").load("/tmp/orc").as("a") .join(spark.range(10).as("b"), "id") .withColumn("udf_op", expensive_udf($"a.id")) .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
This creates a physical plan as follows:
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, BuildRight, false :- Project [id#330L, if (isnull(cast(id#330L as int))) null else expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338] : +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false : :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int))))) : : +- FileScan orc [id#330L] Batched: true, DataFilters: [isnotnull(id#330L), isnotnull(cast(id#330L as int)), isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint> : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=416] : +- Range (0, 10, step=1, splits=16) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=420] +- Range (0, 10, step=1, splits=16)
In this case, the expensive UDF call is duplicated thrice. Since the UDF output is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF call and push the UDF past a previous join. The duplication behaviour is documented and in itself is not a huge issue. But given a highly restrictive join, the UDF gets evaluated on many orders of magnitude more rows than it should have slowing down the job.
Can we avoid this duplication of UDF calls? In SPARK-37392, we made a similar change where we decided to only add inferred filters if the input is an attribute. Should we use a similar strategy for `InferFiltersFromConstraints`?
Attachments
Issue Links
- is related to
-
SPARK-37392 Catalyst optimizer very time-consuming and memory-intensive with some "explode(array)"
- Resolved