Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40262

Expensive UDF evaluation pushed down past a join leads to performance issues

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.4.0
    • None
    • SQL
    • None

    Description

      Consider a Spark job with an expensive UDF which looks like follows:

      val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))
      
      spark.range(10).write.format("orc").save("/tmp/orc")
      
      val df = spark.read.format("orc").load("/tmp/orc").as("a")
          .join(spark.range(10).as("b"), "id")
          .withColumn("udf_op", expensive_udf($"a.id"))
          .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
      

      This creates a physical plan as follows:

      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=false
      +- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, BuildRight, false
         :- Project [id#330L, if (isnull(cast(id#330L as int))) null else expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
         :  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
         :     :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)))))
         :     :  +- FileScan orc [id#330L] Batched: true, DataFilters: [isnotnull(id#330L), isnotnull(cast(id#330L as int)), isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
         :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=416]
         :        +- Range (0, 10, step=1, splits=16)
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=420]
            +- Range (0, 10, step=1, splits=16)
      

      In this case, the expensive UDF call is duplicated thrice. Since the UDF output is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF call and push the UDF past a previous join. The duplication behaviour is documented and in itself is not a huge issue. But given a highly restrictive join, the UDF gets evaluated on many orders of magnitude more rows than it should have slowing down the job.

      Can we avoid this duplication of UDF calls? In SPARK-37392, we made a similar change where we decided to only add inferred filters if the input is an attribute. Should we use a similar strategy for `InferFiltersFromConstraints`?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              shardulm Shardul Mahadik
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: