Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27684

Reduce ScalaUDF conversion overheads for primitives



    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 3.0.0
    • SQL
    • None


      I believe that we can reduce ScalaUDF overheads when operating over primitive types.

      In ScalaUDF's doGenCode we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types.

      However, UDF input argument conversion is effectively a no-op for primitive types because CatalystTypeConverters.createToScalaConverter() returns identity in those cases. UDF result conversion is a little tricker because createToCatalystConverter() returns a function that handles Option[Primitive], but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an identity no-op).

      These unnecessary no-op conversions could be quite expensive because each call involves an index into the references array to get the converters, a second index into the converters array to get the correct converter for the nth input argument, and, finally, the converter invocation itself:

      Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);

      In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g.

      Object project_arg_0 = false ? null : project_value_3;

      To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the converters array the same size (so indexes stay the same) but omit the invocation of the converters for the primitive arguments (e.g. converters is sparse / contains unused entries in case of primitives).

      I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example:

      spark.udf.register("identity", (x: Int) => x)
      sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds
      sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds

      I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc.



        Issue Links



              mgaido Marco Gaido
              joshrosen Josh Rosen
              1 Vote for this issue
              9 Start watching this issue