Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27684

Reduce ScalaUDF conversion overheads for primitives

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 3.0.0
    • SQL
    • None


      I believe that we can reduce ScalaUDF overheads when operating over primitive types.

      In ScalaUDF's doGenCode we have logic to convert UDF function input types from Catalyst internal types to Scala types (for example, this is used to convert UTF8Strings to Java Strings). Similarly, we convert UDF return types.

      However, UDF input argument conversion is effectively a no-op for primitive types because CatalystTypeConverters.createToScalaConverter() returns identity in those cases. UDF result conversion is a little tricker because createToCatalystConverter() returns a function that handles Option[Primitive], but it might be the case that the Option-boxing is unusable via ScalaUDF (in which case the conversion truly is an identity no-op).

      These unnecessary no-op conversions could be quite expensive because each call involves an index into the references array to get the converters, a second index into the converters array to get the correct converter for the nth input argument, and, finally, the converter invocation itself:

      Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* converters */)[0].apply(project_value_3);

      In these cases, I believe that we can reduce lookup / invocation overheads by modifying the ScalaUDF code generation to eliminate the conversion calls for primitives and directly assign the unconverted result, e.g.

      Object project_arg_0 = false ? null : project_value_3;

      To cleanly handle the case where we have a multi-argument UDF accepting a mixture of primitive and non-primitive types, we might be able to keep the converters array the same size (so indexes stay the same) but omit the invocation of the converters for the primitive arguments (e.g. converters is sparse / contains unused entries in case of primitives).

      I spotted this optimization while trying to construct some quick benchmarks to measure UDF invocation overheads. For example:

      spark.udf.register("identity", (x: Int) => x)
      sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // ~ 52 seconds
      sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 1000 * 1000)").rdd.count() // ~84 seconds

      I'm curious to see whether the optimization suggested here can close this performance gap. It'd also be a good idea to construct more principled microbenchmarks covering multi-argument UDFs, projections involving multiple UDFs over different input and output types, etc.




          This comment will be Viewable by All Users Viewable by All Users


            mgaido Marco Gaido
            joshrosen Josh Rosen
            1 Vote for this issue
            10 Start watching this issue




                Issue deployment