Details

    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Not A Problem
    • Affects Version/s: 2.0.0
    • Fix Version/s: None
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

      Databricks Cloud / Spark 2.0.0

      Description

      Background

      Llonger running processes that might run analytics or contact external services from UDFs. The response might not just be a field, but instead a structure of information. When attempting to break out this information, it is critical that query is optimized correctly.

      Steps to Reproduce

      1. Create some sample data.
      2. Create a UDF that returns a multiple attributes.
      3. Run UDF over some data.
      4. Create new columns from the multiple attributes.
      5. Observe run time.

      Actual Results

      The UDF is executed multiple times per row.

      Expected Results

      The UDF should only be executed once per row.

      Workaround

      Cache the Dataset after UDF execution.

      Details

      For code and more details, see over_optimized_udf.html

        Issue Links

          Activity

          Hide
          hvanhovell Herman van Hovell added a comment -

          You really should not try to use any external state in a UDF (it should be a pure function).

          It might be an idea to use a generator in this case. These are guaranteed to only execute once for an input tuple.

          Show
          hvanhovell Herman van Hovell added a comment - You really should not try to use any external state in a UDF (it should be a pure function). It might be an idea to use a generator in this case. These are guaranteed to only execute once for an input tuple.
          Hide
          jeisinge Jacob Eisinger added a comment -

          I am a little confused.

          1. Could you explain how a generator would apply here?
          2. You mentioned that UDFs should be pure functions. Is Spark optimizing the function calls as if they are pure functions?

          (Also, please check out my example — the UDF there should be a pure function.)

          Show
          jeisinge Jacob Eisinger added a comment - I am a little confused. Could you explain how a generator would apply here? You mentioned that UDFs should be pure functions. Is Spark optimizing the function calls as if they are pure functions? (Also, please check out my example — the UDF there should be a pure function.)
          Hide
          hvanhovell Herman van Hovell added a comment -

          Spark assumes UDF's are pure function; we do not guarantee that a function is only executed once. This is due to the way the optimizer works, and the fact that sometimes retry stages. We could add a flag to UDF to prevent this from working, but this would be a considerable engineering effort.

          The example you give is not really a pure function, as its side effects makes the thread stop (changes state).

          If you are connecting to an external service, then I would suggest using Dataset.mapPartitions(...) (similar to a generator). This will allow you to setup one connection per partition, and you can call a method as much or as little as you like.

          Show
          hvanhovell Herman van Hovell added a comment - Spark assumes UDF's are pure function; we do not guarantee that a function is only executed once. This is due to the way the optimizer works, and the fact that sometimes retry stages. We could add a flag to UDF to prevent this from working, but this would be a considerable engineering effort. The example you give is not really a pure function, as its side effects makes the thread stop (changes state). If you are connecting to an external service, then I would suggest using Dataset.mapPartitions(...) (similar to a generator). This will allow you to setup one connection per partition, and you can call a method as much or as little as you like.
          Hide
          hvanhovell Herman van Hovell added a comment -

          I am going to close this as not a problem, but feel free to follow up.

          Show
          hvanhovell Herman van Hovell added a comment - I am going to close this as not a problem, but feel free to follow up.
          Hide
          jeisinge Jacob Eisinger added a comment -

          Thanks for the explanation, but I still think this is an issue.

          If Spark assumed their was no side effects and optimize accordingly, their would be not issue: the UDF would be called once per row (1). However, Spark calls a costly function many times leading to inefficiency.

          In our production code, we have a function that takes in a long string and classifies it under a number of different dimensions. This is a very CPU intensive operation and is a pure function . Obviously, if Spark's optimizer calls the functions multiple times, this is not optimal in this scenario.

          I think it is intuitive to most that the following code would call the UDF once per row (1):

          val exploded = as
            .withColumn("structured_information", fUdf('a))
            .withColumn("plus_one", 'structured_information("plusOne"))
            .withColumn("squared", 'structured_information("squared"))
          

          However, Spark calls the UDF three times per row! Is this what you would expect? What am I missing?

          (1) - "Once per row" - except when the row needs to recomputed such as when workers are lost.
          (2) - I attempted to model the long operation via Thread.sleep(); as you mentioned this does have a slight side effect. Maybe I should have summed the first billion counting numbers to illustrate the slow down?

          Show
          jeisinge Jacob Eisinger added a comment - Thanks for the explanation, but I still think this is an issue. If Spark assumed their was no side effects and optimize accordingly, their would be not issue: the UDF would be called once per row (1). However, Spark calls a costly function many times leading to inefficiency. In our production code, we have a function that takes in a long string and classifies it under a number of different dimensions. This is a very CPU intensive operation and is a pure function . Obviously, if Spark's optimizer calls the functions multiple times, this is not optimal in this scenario. I think it is intuitive to most that the following code would call the UDF once per row (1): val exploded = as .withColumn( "structured_information" , fUdf('a)) .withColumn( "plus_one" , 'structured_information( "plusOne" )) .withColumn( "squared" , 'structured_information( "squared" )) However, Spark calls the UDF three times per row! Is this what you would expect? What am I missing? (1) - "Once per row" - except when the row needs to recomputed such as when workers are lost. (2) - I attempted to model the long operation via Thread.sleep(); as you mentioned this does have a slight side effect. Maybe I should have summed the first billion counting numbers to illustrate the slow down?
          Hide
          hvanhovell Herman van Hovell added a comment -

          I think calling explain(true) on your plans helps to understand what is going on.

          Spark executes the UDF 3x times because the optimizer collapses subsequent projects (a project normally being much more expensive than a UDF call). In your case the three projects get rewritten into one project, and the expressions are rewritten in the following form:

          structured_information -> fUdf('a)
          plus_one -> fUdf('a).get("plusOne")
          squared -> fUdf('a).get("squared")
          

          It is a bit tricky to get around this, this might work:

          val exploded = as
             .withColumn("structured_information", explode(array(fUdf('a))))
              .withColumn("plus_one", 'structured_information("plusOne"))
              .withColumn("squared", 'structured_information("squared"))
          
          Show
          hvanhovell Herman van Hovell added a comment - I think calling explain(true) on your plans helps to understand what is going on. Spark executes the UDF 3x times because the optimizer collapses subsequent projects (a project normally being much more expensive than a UDF call). In your case the three projects get rewritten into one project, and the expressions are rewritten in the following form: structured_information -> fUdf('a) plus_one -> fUdf('a).get("plusOne") squared -> fUdf('a).get("squared") It is a bit tricky to get around this, this might work: val exploded = as .withColumn("structured_information", explode(array(fUdf('a)))) .withColumn("plus_one", 'structured_information("plusOne")) .withColumn("squared", 'structured_information("squared"))
          Hide
          jeisinge Jacob Eisinger added a comment - - edited

          Thanks for the explanation and the tricky code snippet!  I kind of figured it was optimizing incorrectly / over optimizing.  It sounds like this is not a defect because normally this optimization of collapsing projects is the desired route. Correct?

          Do you think it is worth filing a feature request to allow working with costly UDFs?  Possibly:

          • Memoize UDFs / other transforms on a per row basis.
          • Manually override costs for UDFs.
          Show
          jeisinge Jacob Eisinger added a comment - - edited Thanks for the explanation and the tricky code snippet!  I kind of figured it was optimizing incorrectly / over optimizing.  It sounds like this is not a defect because normally this optimization of collapsing projects is the desired route. Correct? Do you think it is worth filing a feature request to allow working with costly UDFs?  Possibly: Memoize UDFs / other transforms on a per row basis. Manually override costs for UDFs.
          Hide
          hvanhovell Herman van Hovell added a comment -

          First of all, we implement subexpression elimination (which is a form of memoization), and this should prevent multiple invocations from happening. I am quite curious why this is not triggering in your case. Are you on a completely interpreted path?

          Cost functions for a UDF is doable, we would have to this for expression trees though, and this is a non-trivial thing to implement.

          Show
          hvanhovell Herman van Hovell added a comment - First of all, we implement subexpression elimination (which is a form of memoization), and this should prevent multiple invocations from happening. I am quite curious why this is not triggering in your case. Are you on a completely interpreted path? Cost functions for a UDF is doable, we would have to this for expression trees though, and this is a non-trivial thing to implement.
          Hide
          jeisinge Jacob Eisinger added a comment -

          What do you mean by completely interpreted path?

          Show
          jeisinge Jacob Eisinger added a comment - What do you mean by completely interpreted path ?
          Hide
          jeisinge Jacob Eisinger added a comment -

          Also, it is interesting for me to note that this occurs for parquets — and not generating the Dataset in memory.

          For example,

          val as = spark.read.parquet("/tmp/as.parquet")
          

          triggers the behavior, but

          val as = (1 to 10).toDF("a")
          

          does not.

          Show
          jeisinge Jacob Eisinger added a comment - Also, it is interesting for me to note that this occurs for parquets — and not generating the Dataset in memory. For example, val as = spark.read.parquet( "/tmp/as.parquet" ) triggers the behavior, but val as = (1 to 10).toDF( "a" ) does not.
          Hide
          hvanhovell Herman van Hovell added a comment -

          There are different evaluation paths in Spark SQL:

          • Interpreted. Expressions are evaluated using an eval(...) method. Plans are evaluated using iterators (volcano model). This is what I mean by the completely interpreted path.
          • Expression Codegenerated. This means that all expressions are evaluated using a code generated function. Plans are evaluated using iterators.
          • Wholestafe Codegenerated. All expressions and most plans are evaluated using code generation.

          I think you are using whole stage code generation. This does not support common subexpression elimination.

          Show
          hvanhovell Herman van Hovell added a comment - There are different evaluation paths in Spark SQL: Interpreted. Expressions are evaluated using an eval(...) method. Plans are evaluated using iterators (volcano model). This is what I mean by the completely interpreted path. Expression Codegenerated. This means that all expressions are evaluated using a code generated function. Plans are evaluated using iterators. Wholestafe Codegenerated. All expressions and most plans are evaluated using code generation. I think you are using whole stage code generation. This does not support common subexpression elimination.
          Hide
          hvanhovell Herman van Hovell added a comment - - edited

          The second one does not trigger the behavior because this is turned into a LocalRelation, these are evaluated during optimization and before we collapse projections.

          The following in memory dataframe should trigger the same behavior:

          spark.range(1, 10)
               .withColumn("expensive_udf_result", fUdf($"id"))
               .withColumn("b", $"expensive_udf_result" + 100)
          
          Show
          hvanhovell Herman van Hovell added a comment - - edited The second one does not trigger the behavior because this is turned into a LocalRelation, these are evaluated during optimization and before we collapse projections. The following in memory dataframe should trigger the same behavior: spark.range(1, 10) .withColumn("expensive_udf_result", fUdf($"id")) .withColumn("b", $"expensive_udf_result" + 100)
          Hide
          jeisinge Jacob Eisinger added a comment -

          Thanks for the great explanation! Are there plans for subexpression elimination for whole stage code generation — do you think their should be?

          Show
          jeisinge Jacob Eisinger added a comment - Thanks for the great explanation! Are there plans for subexpression elimination for whole stage code generation — do you think their should be?
          Hide
          hvanhovell Herman van Hovell added a comment -

          I think we eventually should add it, however this is currently quite hard to implement properly. There is also the matter that the JIT is really good at inlining small functions and doing common subexpression elimination for us. In this particular case it make more sense to me to add costs to UDFs or to add a mechanism that prevents project collapsing.

          Show
          hvanhovell Herman van Hovell added a comment - I think we eventually should add it, however this is currently quite hard to implement properly. There is also the matter that the JIT is really good at inlining small functions and doing common subexpression elimination for us. In this particular case it make more sense to me to add costs to UDFs or to add a mechanism that prevents project collapsing.
          Hide
          jeisinge Jacob Eisinger added a comment -

          Herman van Hovell, thanks for looking into this! Unless you can think of a better way to ensure the UDF doesn't get executed multiple times, we are going to go with your workaround of:

          val exploded = as
             .withColumn("structured_information", explode(array(fUdf('a))))
             .withColumn("plus_one", 'structured_information("plusOne"))
             .withColumn("squared", 'structured_information("squared"))
          
          exploded2.explain
          == Physical Plan ==
          *Project [a#10, structured_information#159, structured_information#159.plusOne AS plus_one#161, structured_information#159.squared AS squared#166]
          +- Generate explode(array(if (isnull(a#10)) null else UDF(a#10))), true, false, [a#10, structured_information#159]
             +- *BatchedScan parquet [a#10] Format: ParquetFormat, InputPaths: file:/tmp/as.parquet, PushedFilters: [], ReadSchema: struct<a:int>
          

          I reckon it might impact GC a bit with the creation of the extra arrays — but, that sure beats the cost of running those expensive UDFs! Thanks again for the excellent explanations!

          Show
          jeisinge Jacob Eisinger added a comment - Herman van Hovell , thanks for looking into this! Unless you can think of a better way to ensure the UDF doesn't get executed multiple times, we are going to go with your workaround of: val exploded = as .withColumn( "structured_information" , explode(array(fUdf('a)))) .withColumn( "plus_one" , 'structured_information( "plusOne" )) .withColumn( "squared" , 'structured_information( "squared" )) exploded2.explain == Physical Plan == *Project [a#10, structured_information#159, structured_information#159.plusOne AS plus_one#161, structured_information#159.squared AS squared#166] +- Generate explode(array( if (isnull(a#10)) null else UDF(a#10))), true , false , [a#10, structured_information#159] +- *BatchedScan parquet [a#10] Format: ParquetFormat, InputPaths: file:/tmp/as.parquet, PushedFilters: [], ReadSchema: struct<a: int > I reckon it might impact GC a bit with the creation of the extra arrays — but, that sure beats the cost of running those expensive UDFs! Thanks again for the excellent explanations!

            People

            • Assignee:
              Unassigned
              Reporter:
              jeisinge Jacob Eisinger
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development