Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.3.0
    • Component/s: PySpark, SQL
    • Labels:
    • Target Version/s:

      Description

      Background and Motivation
      Python is one of the most popular programming languages among Spark users. Spark currently exposes a row-at-a-time interface for defining and executing user-defined functions (UDFs). This introduces high overhead in serialization and deserialization, and also makes it difficult to leverage Python libraries (e.g. numpy, Pandas) that are written in native code.

      This proposal advocates introducing new APIs to support vectorized UDFs in Python, in which a block of data is transferred over to Python in some columnar format for execution.

      Target Personas
      Data scientists, data engineers, library developers.

      Goals

      • Support vectorized UDFs that apply on chunks of the data frame
      • Low system overhead: Substantially reduce serialization and deserialization overhead when compared with row-at-a-time interface
      • UDF performance: Enable users to leverage native libraries in Python (e.g. numpy, Pandas) for data manipulation in these UDFs

      Non-Goals
      The following are explicitly out of scope for the current SPIP, and should be done in future SPIPs. Nonetheless, it would be good to consider these future use cases during API design, so we can achieve some consistency when rolling out new APIs.

      • Define block oriented UDFs in other languages (that are not Python).
      • Define aggregate UDFs
      • Tight integration with machine learning frameworks

      Proposed API Changes
      The following sketches some possibilities. I haven’t spent a lot of time thinking about the API (wrote it down in 5 mins) and I am not attached to this design at all. The main purpose of the SPIP is to get feedback on use cases and see how they can impact API design.

      A few things to consider are:

      1. Python is dynamically typed, whereas DataFrames/SQL requires static, analysis time typing. This means users would need to specify the return type of their UDFs.

      2. Ratio of input rows to output rows. We propose initially we require number of output rows to be the same as the number of input rows. In the future, we can consider relaxing this constraint with support for vectorized aggregate UDFs.

      3. How do we handle null values, since Pandas doesn't have the concept of nulls?

      Proposed API sketch (using examples):

      Use case 1. A function that defines all the columns of a DataFrame (similar to a “map” function):

      @spark_udf(some way to describe the return schema)
      def my_func_on_entire_df(input):
        """ Some user-defined function.
       
        :param input: A Pandas DataFrame with two columns, a and b.
        :return: :class: A Pandas data frame.
        """
        input[c] = input[a] + input[b]
        Input[d] = input[a] - input[b]
        return input
       
      spark.range(1000).selectExpr("id a", "id / 2 b")
        .mapBatches(my_func_on_entire_df)
      

      Use case 2. A function that defines only one column (similar to existing UDFs):

      @spark_udf(some way to describe the return schema)
      def my_func_that_returns_one_column(input):
        """ Some user-defined function.
       
        :param input: A Pandas DataFrame with two columns, a and b.
        :return: :class: A numpy array
        """
        return input[a] + input[b]
       
      my_func = udf(my_func_that_returns_one_column)
       
      df = spark.range(1000).selectExpr("id a", "id / 2 b")
      df.withColumn("c", my_func(df.a, df.b))
      

      Optional Design Sketch
      I’m more concerned about getting proper feedback for API design. The implementation should be pretty straightforward and is not a huge concern at this point. We can leverage the same implementation for faster toPandas (using Arrow).

      Optional Rejected Designs
      See above.

        Issue Links

          Activity

          Hide
          icexelloss Li Jin added a comment -

          Very excited to see this.

          I created https://issues.apache.org/jira/browse/SPARK-20396 earlier and I have been working the same problem for a while. Julien Le Dem and I talked about the work we did on Spark Summit this year.

          Here is the design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

          I think the ideas are very similar and we could probably collaborate.

          Show
          icexelloss Li Jin added a comment - Very excited to see this. I created https://issues.apache.org/jira/browse/SPARK-20396 earlier and I have been working the same problem for a while. Julien Le Dem and I talked about the work we did on Spark Summit this year. Here is the design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md I think the ideas are very similar and we could probably collaborate.
          Hide
          rxin Reynold Xin added a comment -

          Li Jin Thanks. Your proposal brings up a good point, which is do we want to allow users to see the entire partition at a time. As demonstrated by your example, this can be a very powerful tool (especially for leveraging the time series analysis features available in Pandas). The primary challenge there is that a partition can be very large, and it is possible to run out of memory when using Pandas. Any thoughts on how we should handle those cases?

          Show
          rxin Reynold Xin added a comment - Li Jin Thanks. Your proposal brings up a good point, which is do we want to allow users to see the entire partition at a time. As demonstrated by your example, this can be a very powerful tool (especially for leveraging the time series analysis features available in Pandas). The primary challenge there is that a partition can be very large, and it is possible to run out of memory when using Pandas. Any thoughts on how we should handle those cases?
          Hide
          icexelloss Li Jin added a comment - - edited

          Reynold Xin,

          The use case of seeing entire partition at a time is one of the less interesting use case, because partition is somehow an implementation detail of spark. From the use cases I saw from our internal pyspark/pandas users, the majority interests are to use pandas udf with a spark windowing and grouping operation, for instance, breaking data into day/month/year piece and perform some analysis using pandas for each piece.

          However, the challenge still holds if one group is too large. One idea is to implement local aggregation and merge in pandas operations so it can be executed efficiently, but I haven't put too much thoughts in this.

          I think without solving the problem of "one group doesn't fit in memory", this can already be a pretty powerful tool because currently to perform any pandas analysis, the entire dataset has to fit in memory (that is, without using tools like dask).

          Show
          icexelloss Li Jin added a comment - - edited Reynold Xin , The use case of seeing entire partition at a time is one of the less interesting use case, because partition is somehow an implementation detail of spark. From the use cases I saw from our internal pyspark/pandas users, the majority interests are to use pandas udf with a spark windowing and grouping operation, for instance, breaking data into day/month/year piece and perform some analysis using pandas for each piece. However, the challenge still holds if one group is too large. One idea is to implement local aggregation and merge in pandas operations so it can be executed efficiently, but I haven't put too much thoughts in this. I think without solving the problem of "one group doesn't fit in memory", this can already be a pretty powerful tool because currently to perform any pandas analysis, the entire dataset has to fit in memory (that is, without using tools like dask).
          Hide
          leif Leif Walsh added a comment -

          I agree with Li Jin that we should aim to provide an API which provides "logical" groups of data to the UDF rather than the implementation detail of providing partitions wholesale. Setting aside for the moment the problem with dataset skew which could cause one group to be very large, let's look at some use cases.

          One obvious use case that tools like dplyr and pandas support is df.groupby(...).aggregate(...). Here, we group on some key and apply a function to each logical group. This can be used to e.g. demean each group w.r.t. its cohort. Another use case that we care about with Flint is aggregating over a window. In pandas terminology this is the rolling operator. One might want to, for each row, perform a moving window average or rolling regression over a history of some size. The windowed aggregation poses a performance question that the groupby case doesn't: namely, if we naively send each window to the python worker independently, we're transferring a lot of duplicate data since each overlapped window contains many of the same rows. An option here is to transfer the entire partition on the backend and then instruct the python worker to call the UDF with slices of the whole dataset according to the windowing requested by the user.

          I think the idea of presenting a whole partition in a pandas dataframe to a UDF is a bit off-track. If someone really wants to apply a python function to the "whole" dataset, they'd be best served by pulling those data back to the driver and just using pandas, if they tried to use spark's partitions they'd get somewhat arbitrary partitions and have to implement some kind of merge operator on their own. However, with grouped and windowed aggregations, we can provide an API which truly is parallelizable and useful.

          I want to focus on use cases where we actually can parallelize without requiring a merge operator right now. Aggregators in pandas and related tools in the ecosystem usually assume they have access to all the data for an operation and don't need to merge results of subaggregations. For aggregations over larger datasets you'd really want to encourage the use of native Spark operations (that use e.g. treeAggregate).

          Does that make sense? I think it focuses the problem nicely that it becomes fairly tractable.

          I think the really hard part of this API design is deciding what the inputs and outputs of the UDF look like, and providing for the myriad use cases therein. For example, one might want to aggregate each group down to a scalar (e.g. mean) and do something with that (either produce a reduced dataset with one value per group, or add a column where each group has the same value across all rows), or one might want to compute over the group and produce a value per row within the group and attach that as a new column (e.g. demeaning or ranking). These translate roughly to the differences between the **ply operations in dplyr or the differences in pandas between df.groupby(...).agg(...) and df.groupby(...).transform(...) and df.groupby(...).apply(...).

          Show
          leif Leif Walsh added a comment - I agree with Li Jin that we should aim to provide an API which provides "logical" groups of data to the UDF rather than the implementation detail of providing partitions wholesale. Setting aside for the moment the problem with dataset skew which could cause one group to be very large, let's look at some use cases. One obvious use case that tools like dplyr and pandas support is df.groupby(...).aggregate(...) . Here, we group on some key and apply a function to each logical group. This can be used to e.g. demean each group w.r.t. its cohort. Another use case that we care about with Flint is aggregating over a window. In pandas terminology this is the rolling operator. One might want to, for each row, perform a moving window average or rolling regression over a history of some size. The windowed aggregation poses a performance question that the groupby case doesn't: namely, if we naively send each window to the python worker independently, we're transferring a lot of duplicate data since each overlapped window contains many of the same rows. An option here is to transfer the entire partition on the backend and then instruct the python worker to call the UDF with slices of the whole dataset according to the windowing requested by the user. I think the idea of presenting a whole partition in a pandas dataframe to a UDF is a bit off-track. If someone really wants to apply a python function to the "whole" dataset, they'd be best served by pulling those data back to the driver and just using pandas, if they tried to use spark's partitions they'd get somewhat arbitrary partitions and have to implement some kind of merge operator on their own. However, with grouped and windowed aggregations, we can provide an API which truly is parallelizable and useful. I want to focus on use cases where we actually can parallelize without requiring a merge operator right now. Aggregators in pandas and related tools in the ecosystem usually assume they have access to all the data for an operation and don't need to merge results of subaggregations. For aggregations over larger datasets you'd really want to encourage the use of native Spark operations (that use e.g. treeAggregate ). Does that make sense? I think it focuses the problem nicely that it becomes fairly tractable. I think the really hard part of this API design is deciding what the inputs and outputs of the UDF look like, and providing for the myriad use cases therein. For example, one might want to aggregate each group down to a scalar (e.g. mean) and do something with that (either produce a reduced dataset with one value per group, or add a column where each group has the same value across all rows), or one might want to compute over the group and produce a value per row within the group and attach that as a new column (e.g. demeaning or ranking). These translate roughly to the differences between the **ply operations in dplyr or the differences in pandas between df.groupby(...).agg(...) and df.groupby(...).transform(...) and df.groupby(...).apply(...) .
          Hide
          rxin Reynold Xin added a comment -

          That makes a lot of sense. So to design APIs similar to a lot of the ply operations. I will think a bit about this next week. Btw if somebody can also just propose some different alternative APIs (perhaps as a gist), it'd facilitate the discussion further.

          How should we do memory management though? Should we just have a configurable cap on the maximum number of roles? Should we just let user crash (not a great experience)?

          Show
          rxin Reynold Xin added a comment - That makes a lot of sense. So to design APIs similar to a lot of the ply operations. I will think a bit about this next week. Btw if somebody can also just propose some different alternative APIs (perhaps as a gist), it'd facilitate the discussion further. How should we do memory management though? Should we just have a configurable cap on the maximum number of roles? Should we just let user crash (not a great experience)?
          Hide
          cloud_fan Wenchen Fan added a comment -

          For aggregate, I think it makes more sense to do the grouping at Spark side, and then send each group to Python, and use Pandas to do reduce or add extra column. We can have a config for group max size, and when sending groups from JVM to Python, fail the query if the max size is exceeded.

          For window, like Leif Walsh said, ideally we should not send each window to python, to avoid transferring a lot of duplicate data. Instead, we can send the entire partition to python, in streaming manner. Then we do the windowing at python side and use Pandas to do reduce or something. The downside is we need to re-implement many things in Python.

          I'll think about the API too.

          Show
          cloud_fan Wenchen Fan added a comment - For aggregate, I think it makes more sense to do the grouping at Spark side, and then send each group to Python, and use Pandas to do reduce or add extra column. We can have a config for group max size, and when sending groups from JVM to Python, fail the query if the max size is exceeded. For window, like Leif Walsh said, ideally we should not send each window to python, to avoid transferring a lot of duplicate data. Instead, we can send the entire partition to python, in streaming manner. Then we do the windowing at python side and use Pandas to do reduce or something. The downside is we need to re-implement many things in Python. I'll think about the API too.
          Hide
          icexelloss Li Jin added a comment - - edited

          I went another round of updates for the API incorporating the discussion here:
          Here is how to define a udf: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#definition-of-pandas-udf
          Here is example how to use udf:
          1. Vectorized row transforms: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#vectorized-row-operations
          2. Window: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#window-operations
          3. Group: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#group-operations

          To give a quick an example (for group aggregations):

          import numpy as np
          @pandas_udf(DoubleType())
          def weighted_mean_udf(v, w):
              return np.average(v, weights=w)
          
          df.groupBy('id').agg(weighted_mean_udf(df.v1, df.w).as('v1_wm'))
          

          In this example, the udf takes one or more pandas.Series of the same size, and returns a scalar value. The result is the aggregation for each group.

          I'd like to get some feedbacks for this API and also see alternative APIs.

          Show
          icexelloss Li Jin added a comment - - edited I went another round of updates for the API incorporating the discussion here: Here is how to define a udf: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#definition-of-pandas-udf Here is example how to use udf: 1. Vectorized row transforms: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#vectorized-row-operations 2. Window: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#window-operations 3. Group: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md#group-operations To give a quick an example (for group aggregations): import numpy as np @pandas_udf(DoubleType()) def weighted_mean_udf(v, w): return np.average(v, weights=w) df.groupBy('id').agg(weighted_mean_udf(df.v1, df.w).as('v1_wm')) In this example, the udf takes one or more pandas.Series of the same size, and returns a scalar value. The result is the aggregation for each group. I'd like to get some feedbacks for this API and also see alternative APIs.
          Hide
          leif Leif Walsh added a comment -

          I think we can get away with doing windowing (deciding which rows to aggregate) in scala to avoid reimplementing that logic in python, and then send row indexes (a list of begin/end pairs which each are the bounds of a window) over to python to select the windows.

          Show
          leif Leif Walsh added a comment - I think we can get away with doing windowing (deciding which rows to aggregate) in scala to avoid reimplementing that logic in python, and then send row indexes (a list of begin/end pairs which each are the bounds of a window) over to python to select the windows.
          Hide
          cloud_fan Wenchen Fan added a comment -

          Thanks for your proposal!

          I have 2 thoughts:
          1. How should we handle null values? IIRC, pd.Series doesn't work well with null, e.g. int series with null becomes float type, boolean series with null becomes object type.
          2. I think UDF returning scalar value only makes sense for group/window, otheriwse the result is indeterministic for users.

          Show
          cloud_fan Wenchen Fan added a comment - Thanks for your proposal! I have 2 thoughts: 1. How should we handle null values? IIRC, pd.Series doesn't work well with null, e.g. int series with null becomes float type, boolean series with null becomes object type. 2. I think UDF returning scalar value only makes sense for group/window, otheriwse the result is indeterministic for users.
          Hide
          cloud_fan Wenchen Fan added a comment -

          > I think we can get away with doing windowing (deciding which rows to aggregate) in scala to avoid reimplementing that logic in python, and then send row indexes (a list of begin/end pairs which each are the bounds of a window) over to python to select the windows.

          Does this mean we need to keep the data of the whole window in both JVM and Python? This seems too expensive...

          Show
          cloud_fan Wenchen Fan added a comment - > I think we can get away with doing windowing (deciding which rows to aggregate) in scala to avoid reimplementing that logic in python, and then send row indexes (a list of begin/end pairs which each are the bounds of a window) over to python to select the windows. Does this mean we need to keep the data of the whole window in both JVM and Python? This seems too expensive...
          Hide
          leif Leif Walsh added a comment -

          I figure we could address that by using shared memory, if we need to. Don't think that's a blocker for a first cut though, just my opinion.

          Show
          leif Leif Walsh added a comment - I figure we could address that by using shared memory, if we need to. Don't think that's a blocker for a first cut though, just my opinion.
          Hide
          leif Leif Walsh added a comment -

          I believe we could also compute window indexes while we stream over the data and serialize to arrow, don't need to keep the data in the jvm necessarily.

          I'm more worried about keeping a whole partition in python. You could have small windows but large partitions and pay an unfair memory cost unless we have some smart heuristics about how to batch the data into sub-partition chunks.

          Show
          leif Leif Walsh added a comment - I believe we could also compute window indexes while we stream over the data and serialize to arrow, don't need to keep the data in the jvm necessarily. I'm more worried about keeping a whole partition in python. You could have small windows but large partitions and pay an unfair memory cost unless we have some smart heuristics about how to batch the data into sub-partition chunks.
          Hide
          icexelloss Li Jin added a comment -

          > I have 2 thoughts:
          > 1. How should we handle null values? IIRC, pd.Series doesn't work well with null, e.g. int series with null becomes float type, boolean series with null becomes object type.
          > 2. I think UDF returning scalar value only makes sense for group/window, otheriwse the result is indeterministic for users.

          Here are my thoughts:
          1. From Spark -> Pandas, I think turning nullable int series to float series is fine because pandas users are already dealing with issue in pure pandas.
          From Pandas -> Spark, because Arrow supports nullable type, we can turn the pandas series into arrow series with the correct type. For instance, if the user specifies that udf returns a int series but ends
          up returning a float series because it contains nullable values, we can turn the pandas float series into arrow nullable int series. This might have some performance cost but hopefully is not too bad. Wes McKinney, do you have some thoughts on this?
          2. You are absolutely right, not every return types makes sense with every function. I think it's fine we can do some type checking in catalyst and throw exceptions.

          Show
          icexelloss Li Jin added a comment - > I have 2 thoughts: > 1. How should we handle null values? IIRC, pd.Series doesn't work well with null, e.g. int series with null becomes float type, boolean series with null becomes object type. > 2. I think UDF returning scalar value only makes sense for group/window, otheriwse the result is indeterministic for users. Here are my thoughts: 1. From Spark -> Pandas, I think turning nullable int series to float series is fine because pandas users are already dealing with issue in pure pandas. From Pandas -> Spark, because Arrow supports nullable type, we can turn the pandas series into arrow series with the correct type. For instance, if the user specifies that udf returns a int series but ends up returning a float series because it contains nullable values, we can turn the pandas float series into arrow nullable int series. This might have some performance cost but hopefully is not too bad. Wes McKinney , do you have some thoughts on this? 2. You are absolutely right, not every return types makes sense with every function. I think it's fine we can do some type checking in catalyst and throw exceptions.
          Hide
          leif Leif Walsh added a comment -

          If the user specifies an int return type but produces floats in order to indicate nulls, the "right thing" would be to convert to nullable int. If the user specifies int return type but produces float because of operator error, the "right thing" would be to produce an exception with a meaningful error message. Unfortunately I do not have a way to distinguish.

          Show
          leif Leif Walsh added a comment - If the user specifies an int return type but produces floats in order to indicate nulls, the "right thing" would be to convert to nullable int. If the user specifies int return type but produces float because of operator error, the "right thing" would be to produce an exception with a meaningful error message. Unfortunately I do not have a way to distinguish.
          Hide
          bryanc Bryan Cutler added a comment -

          This is a great discussion so far and I would love to see this in Spark as a future use of Arrow! Thanks for the detailed use cases Li Jin and Leif Walsh. I've also been thinking about this optimization for a while and have been working out some POCs to experiment with. Here are my some of my thoughts on the discussion so far

          1. Apply a vectorized UDF that returns 1 column of same size
          I think this can be accomplished within the current API for declaring UDFs, only using Arrow on the back-end that will load the columns as Pandas.Series and return a Pandas.Series. It might be necessary to do some conversion on the returned Pandas.Series to ensure it is the defined return type and NULL values are correct, but all this is known so I don't think it's necessary to add extra annotation to the UDF. This functionality could be enabled with a SQLConf.

          2. Apply a vectorized UDAF - windowing use case in mind that aggregates to a scaler
          This would be great to have, but seems like it could be quite a bit more complicated than a standard UDF so I agree with Reynold Xin that this is probably better to enable in the future.

          3. Apply a function to a Pandas DataFrame that returns a new Pandas DataFrame
          I think it is a little awkward to use Pandas DataFrames in the same way as UDFs if it's required to define all columns types in the returned frame, and also not very flexible. Here is an alternative that I think could work for some of the use cases described above:

          Proposed API
          This would require some light wrappers around a few pyspark.sql classes. A method DataFrame.asPandas() could be added which would produce a wrapped DataFrame and allow processing with Pandas.DataFrames. See here for a rough design https://gist.github.com/BryanCutler/2d2ae04e81fa96ba4b61dc095726419f

          Use Case 1. Apply a function that takes a pandas.DataFrame as input and produces a new pandas.DataFrame. Similar to use case 1 above, maybe not that interesting for data analysis, but usage is straight-forward

          def my_func_on_entire_df(input):
            """ Some user-defined function.
           
            :param input: A Pandas DataFrame with two columns, a and b.
            :return: :class: A Pandas data frame.
            """
            input[c] = input[a]   input[b]
            input[d] = input[a] - input[b]
            return input
          
          spark.range(1000).selectExpr("id a", "id / 2 b")
            .asPandas().rdd.map(my_func_on_entire_df)
          

          This will change the pyspark.DataFrame to an wrapped RDD that uses Arrow to serde and defines methods to map/reduce/collect with functions accepting pandas.DataFrames as input/output.

          Use Case 2. Calculate the weighted mean on a column in a Pandas.DataFrame

          def weighted_mean(df):
            """Take weighted mean of a column.
            
            :param df: A Pandas DataFrame with data column "v" and weight column "w"
            :return: :class: A Pandas data frame with column 'wmu'.
            """
            df['wmu'] = np.average(input['v'], weights=input['w'])
            return df
            
          spark.range(1000).selectExpr("id % 7 a", "id / 2 v", "id % 2 w")
            .asPandas().groupBy("a").agg(weighted_mean)
          

          Here a subclass of GroupedData is created that overrides the "agg()" method to accept a function using pandas.DataFrame. The "agg()" method would convert the resulting pandas.DataFrames to Arrow and then convert Arrow to a Spark Dataset.

          Using this to apply over a window is a little more complicated, so I'll have to think more on that.

          The benefit of this design is that it only relies on conversions to/from Arrow, which has a well defined schema, so it's easy to compose functions and work with them. However, the downside is that it's mostly separated from the rest of SparkSQL and doesn't fit in sql expressions like UDFs. So this might lead to having to do extra work, like join DataFrames, after the Pandas processing is complete.

          Regardless, I think vectorized UDFs without any API change would be a great place to start and allow us to optimize the processing with Arrow before getting into more complex usage.

          Show
          bryanc Bryan Cutler added a comment - This is a great discussion so far and I would love to see this in Spark as a future use of Arrow! Thanks for the detailed use cases Li Jin and Leif Walsh . I've also been thinking about this optimization for a while and have been working out some POCs to experiment with. Here are my some of my thoughts on the discussion so far 1. Apply a vectorized UDF that returns 1 column of same size I think this can be accomplished within the current API for declaring UDFs, only using Arrow on the back-end that will load the columns as Pandas.Series and return a Pandas.Series. It might be necessary to do some conversion on the returned Pandas.Series to ensure it is the defined return type and NULL values are correct, but all this is known so I don't think it's necessary to add extra annotation to the UDF. This functionality could be enabled with a SQLConf. 2. Apply a vectorized UDAF - windowing use case in mind that aggregates to a scaler This would be great to have, but seems like it could be quite a bit more complicated than a standard UDF so I agree with Reynold Xin that this is probably better to enable in the future. 3. Apply a function to a Pandas DataFrame that returns a new Pandas DataFrame I think it is a little awkward to use Pandas DataFrames in the same way as UDFs if it's required to define all columns types in the returned frame, and also not very flexible. Here is an alternative that I think could work for some of the use cases described above: Proposed API This would require some light wrappers around a few pyspark.sql classes. A method DataFrame.asPandas() could be added which would produce a wrapped DataFrame and allow processing with Pandas.DataFrames. See here for a rough design https://gist.github.com/BryanCutler/2d2ae04e81fa96ba4b61dc095726419f Use Case 1. Apply a function that takes a pandas.DataFrame as input and produces a new pandas.DataFrame. Similar to use case 1 above, maybe not that interesting for data analysis, but usage is straight-forward def my_func_on_entire_df(input): """ Some user-defined function. :param input: A Pandas DataFrame with two columns, a and b. :return: :class: A Pandas data frame. """ input[c] = input[a] input[b] input[d] = input[a] - input[b] return input spark.range(1000).selectExpr("id a", "id / 2 b") .asPandas().rdd.map(my_func_on_entire_df) This will change the pyspark.DataFrame to an wrapped RDD that uses Arrow to serde and defines methods to map/reduce/collect with functions accepting pandas.DataFrames as input/output. Use Case 2. Calculate the weighted mean on a column in a Pandas.DataFrame def weighted_mean(df): """Take weighted mean of a column. :param df: A Pandas DataFrame with data column "v" and weight column "w" :return: :class: A Pandas data frame with column 'wmu'. """ df['wmu'] = np.average(input['v'], weights=input['w']) return df spark.range(1000).selectExpr("id % 7 a", "id / 2 v", "id % 2 w") .asPandas().groupBy("a").agg(weighted_mean) Here a subclass of GroupedData is created that overrides the "agg()" method to accept a function using pandas.DataFrame. The "agg()" method would convert the resulting pandas.DataFrames to Arrow and then convert Arrow to a Spark Dataset. Using this to apply over a window is a little more complicated, so I'll have to think more on that. The benefit of this design is that it only relies on conversions to/from Arrow, which has a well defined schema, so it's easy to compose functions and work with them. However, the downside is that it's mostly separated from the rest of SparkSQL and doesn't fit in sql expressions like UDFs. So this might lead to having to do extra work, like join DataFrames, after the Pandas processing is complete. Regardless, I think vectorized UDFs without any API change would be a great place to start and allow us to optimize the processing with Arrow before getting into more complex usage.
          Hide
          rxin Reynold Xin added a comment -

          Bryan Cutler Sorry I don't think it makes sense to not introduce APIs. In general we are trying to raise the level of abstractions and move people away from lower level APIs. We wouldn't want to give users another reason to use the lower level RDD API.

          Show
          rxin Reynold Xin added a comment - Bryan Cutler Sorry I don't think it makes sense to not introduce APIs. In general we are trying to raise the level of abstractions and move people away from lower level APIs. We wouldn't want to give users another reason to use the lower level RDD API.
          Hide
          bryanc Bryan Cutler added a comment -

          Reynold Xin I was talking about 2 different things, sorry if I was unclear. My first point was just to say that we could get vectorized UDFs for the case of returning 1 column with the existing pyspark udf api. Just a small change to the function in your use case 1:

          def my_func_that_returns_one_column(a, b):
            """ Some user-defined function.
           
            :param input: a and b, scalar or numpy array.
            :return: scalar or numpy array
            """
            return a + b
          

          This function is vectorized and can be composed as a standard UDF, specifying the return type

          my_func = udf(my_func_that_returns_one_column, DoubleType())
          
          df = spark.range(1000).selectExpr("id a", "id / 2 b")
          df.withColumn("c", my_func(df.a, df.b))
          

          The python worker would then load the Arrow record batch and extract columns "a" and "b" as pandas.Series and apply the function, basically like this

          sa, sb  # columns "a" and "b" read by Arrow as pandas.Series
          sc = f(sa, sb)  # where f is my_func_that_returns_one_column and sc is a pandas.Series with same size as "sa" and "sb"
          

          So if this gives us vectorized UDFs for 1 column, I'm not sure what exactly expanding the API will buy us?

          btw, I'm not against any of what has been proposed here, I'm just bringing this up to make sure it's known and facilitate discussion.

          Show
          bryanc Bryan Cutler added a comment - Reynold Xin I was talking about 2 different things, sorry if I was unclear. My first point was just to say that we could get vectorized UDFs for the case of returning 1 column with the existing pyspark udf api. Just a small change to the function in your use case 1: def my_func_that_returns_one_column(a, b): """ Some user-defined function. :param input: a and b, scalar or numpy array. :return: scalar or numpy array """ return a + b This function is vectorized and can be composed as a standard UDF, specifying the return type my_func = udf(my_func_that_returns_one_column, DoubleType()) df = spark.range(1000).selectExpr("id a", "id / 2 b") df.withColumn("c", my_func(df.a, df.b)) The python worker would then load the Arrow record batch and extract columns "a" and "b" as pandas.Series and apply the function, basically like this sa, sb # columns "a" and "b" read by Arrow as pandas.Series sc = f(sa, sb) # where f is my_func_that_returns_one_column and sc is a pandas.Series with same size as "sa" and "sb" So if this gives us vectorized UDFs for 1 column, I'm not sure what exactly expanding the API will buy us? btw, I'm not against any of what has been proposed here, I'm just bringing this up to make sure it's known and facilitate discussion.
          Hide
          cloud_fan Wenchen Fan added a comment -

          Bryan Cutler You example works because Python is dynamically typed and a + b can work for both scalar values and vector values. However I'm afraid this may not apply for all operators/functions, and I think it's better to introduce new APIs so that users can know they are running scalar udf or columnar udf.

          Show
          cloud_fan Wenchen Fan added a comment - Bryan Cutler You example works because Python is dynamically typed and a + b can work for both scalar values and vector values. However I'm afraid this may not apply for all operators/functions, and I think it's better to introduce new APIs so that users can know they are running scalar udf or columnar udf.
          Hide
          bryanc Bryan Cutler added a comment -

          Wenchen Fan yes, I know not every function can be vectorized so I was saying it would need an SQLConf to enable, or maybe something less broad. The format in my example is pretty standard for writing vectorized fuctions (at least from my experience with numpy/matlab), and I think a lot of users probably have UDFs that fit into this mold. It would be a great benefit to Python users to be able to use vectorization without having to change their existing UDFs.

          Show
          bryanc Bryan Cutler added a comment - Wenchen Fan yes, I know not every function can be vectorized so I was saying it would need an SQLConf to enable, or maybe something less broad. The format in my example is pretty standard for writing vectorized fuctions (at least from my experience with numpy/matlab), and I think a lot of users probably have UDFs that fit into this mold. It would be a great benefit to Python users to be able to use vectorization without having to change their existing UDFs.
          Hide
          icexelloss Li Jin added a comment - - edited

          I have created this PR for the groupby().apply() use case with pandas udf: https://github.com/apache/spark/pull/18732

          This PR consists of a few parts that can be broken into smaller PRs:

          (1) pandas_udf:
          This is a new API in pyspark.sql.functions what allows using pandas udf with various pyspark sql functions discussed here (group, window, withColumn)

          (2) arrow batch record -> unsafe row conversion
          This is a set of converters to turn arrow batch record -> unsafe rows. (The other direction has already been implemented). This is needed for pandas udf regardless of the API choices.

          (3) PythonRDD and pyspark.worker:
          Supports passing arrow data between executors / python worker and processing of arrow data on pyspark worker.

          (4) df.groupby().apply() function in pyspark sql
          This is a new API in pyspark sql that implements split-apply-merge pattern with pyspark and pandas udf.

          I am hoping to get some feedbacks here as well as working together to figure the next step for vectorized UDFs

          Show
          icexelloss Li Jin added a comment - - edited I have created this PR for the groupby().apply() use case with pandas udf: https://github.com/apache/spark/pull/18732 This PR consists of a few parts that can be broken into smaller PRs: (1) pandas_udf: This is a new API in pyspark.sql.functions what allows using pandas udf with various pyspark sql functions discussed here (group, window, withColumn) (2) arrow batch record -> unsafe row conversion This is a set of converters to turn arrow batch record -> unsafe rows. (The other direction has already been implemented). This is needed for pandas udf regardless of the API choices. (3) PythonRDD and pyspark.worker: Supports passing arrow data between executors / python worker and processing of arrow data on pyspark worker. (4) df.groupby().apply() function in pyspark sql This is a new API in pyspark sql that implements split-apply-merge pattern with pyspark and pandas udf. I am hoping to get some feedbacks here as well as working together to figure the next step for vectorized UDFs
          Hide
          icexelloss Li Jin added a comment -

          Bryan Cutler,

          I have looked at your PR at https://github.com/apache/spark/pull/18659 and we have quite bit of overlap in (2) and (3) above. These are not binded to the API choices and I think we should combine/unify them and establish the base line. What do you think?

          Show
          icexelloss Li Jin added a comment - Bryan Cutler , I have looked at your PR at https://github.com/apache/spark/pull/18659 and we have quite bit of overlap in (2) and (3) above. These are not binded to the API choices and I think we should combine/unify them and establish the base line. What do you think?
          Hide
          cloud_fan Wenchen Fan added a comment -

          I think (2) is already done by ArrowColumnVector (written by Takuya Ueshin), and you can just call `ColumnarBatch.rowIterator` for the conversion.

          Show
          cloud_fan Wenchen Fan added a comment - I think (2) is already done by ArrowColumnVector (written by Takuya Ueshin ), and you can just call `ColumnarBatch.rowIterator` for the conversion.
          Hide
          bryanc Bryan Cutler added a comment -

          Hi Li Jin, yes I think there is definitely some basic framework needed to enable vectorized UDFs in Python that could be done before any API improvements. Like you pointed out in (3) the PythonRDD and pyspark.worker will need to be more flexible to handle a different udf, as well as some work on the pyspark.serializer to produce/consume vector data. I was hoping #18659 could serve as this basis. I plan on updating it to use the ArrowColumnVector and ArrowWriter from Takuya Ueshin when that gets merged.

          Show
          bryanc Bryan Cutler added a comment - Hi Li Jin , yes I think there is definitely some basic framework needed to enable vectorized UDFs in Python that could be done before any API improvements. Like you pointed out in (3) the PythonRDD and pyspark.worker will need to be more flexible to handle a different udf, as well as some work on the pyspark.serializer to produce/consume vector data. I was hoping #18659 could serve as this basis. I plan on updating it to use the ArrowColumnVector and ArrowWriter from Takuya Ueshin when that gets merged.
          Hide
          icexelloss Li Jin added a comment - - edited

          Wenchen Fan, thanks for pointing out `ArrowColumnVector`. Bryan Cutler, I think #18659 could serve as a basis for future udf work. My work with #18732 has some overlap with #18659 but I can work with Bryan Cutler to merge.

          Wenchen Fan and Reynold Xin, have you got the chance to think more about the API?

          Show
          icexelloss Li Jin added a comment - - edited Wenchen Fan , thanks for pointing out `ArrowColumnVector`. Bryan Cutler , I think #18659 could serve as a basis for future udf work. My work with #18732 has some overlap with #18659 but I can work with Bryan Cutler to merge. Wenchen Fan and Reynold Xin , have you got the chance to think more about the API?
          Hide
          icexelloss Li Jin added a comment -

          I think the use case 2 of what Reynold Xin proposed originally is a good API to enable first. I think it can a bit better if the input of the user function is not a pandas.DataFrame but pandas.Series to match Spark columns. i.e., instead of:

          @spark_udf(some way to describe the return schema)
          def my_func(input):
            """ Some user-defined function.
           
            :param input: A Pandas DataFrame with two columns, a and b.
            :return: :class: A numpy array
            """
            return input[a] + input[b]
           
          df = spark.range(1000).selectExpr("id a", "id / 2 b")
          df.withColumn("c", my_func(df.a, df.b))
          

          I think this is better:

          @spark_udf(some way to describe the return schema)
          def my_func(a, b):
            """ Some user-defined function.
           
            :param input: Two Pandas Series, a and b
            :return: :class: A Pandas Series
            """
            return a + b
           
          df = spark.range(1000).selectExpr("id a", "id / 2 b")
          df.withColumn("c", my_func(df.a, df.b))
          
          Show
          icexelloss Li Jin added a comment - I think the use case 2 of what Reynold Xin proposed originally is a good API to enable first. I think it can a bit better if the input of the user function is not a pandas.DataFrame but pandas.Series to match Spark columns. i.e., instead of: @spark_udf(some way to describe the return schema) def my_func(input): """ Some user-defined function. :param input: A Pandas DataFrame with two columns, a and b. : return : :class: A numpy array """ return input[a] + input[b] df = spark.range(1000).selectExpr( "id a" , "id / 2 b" ) df.withColumn( "c" , my_func(df.a, df.b)) I think this is better: @spark_udf(some way to describe the return schema) def my_func(a, b): """ Some user-defined function. :param input: Two Pandas Series, a and b : return : :class: A Pandas Series """ return a + b df = spark.range(1000).selectExpr( "id a" , "id / 2 b" ) df.withColumn( "c" , my_func(df.a, df.b))
          Hide
          ueshin Takuya Ueshin added a comment -

          Hi all,

          I'd like to summarize this discussion about APIs for vectorized UDFs (not for UDAFs or Window operations for now).
          Please correct me if I miss anything or add comments if you have some.

          Proposed API

          Definition of Vectorized UDFs

          We introduce a @pandas_udf decorator (or annotation) to define vectorized UDFs which takes one or more pandas.Series or one integer value meaning the length of the input value for 0-parameter UDFs. The return value should be pandas.Series of the specified type and the length of the returned value should be the same as input value.

          We can define vectorized UDFs as:

          @pandas_udf(DoubleType())
          def plus(v1, v2):
              return v1 + v2
          

          or we can define as:

          plus = pandas_udf(lambda v1, v2: v1 + v2, DoubleType())
          

          We can use it similar to row-by-row UDFs:

          df.withColumn('sum', plus(df.v1, df.v2))
          

          As for 0-parameter UDFs, we can define and use as:

          @pandas_udf(LongType())
          def f0(size):
              return pd.Series(1).repeat(size)
          
          df.select(f0())
          
          Show
          ueshin Takuya Ueshin added a comment - Hi all, I'd like to summarize this discussion about APIs for vectorized UDFs (not for UDAFs or Window operations for now). Please correct me if I miss anything or add comments if you have some. Proposed API Definition of Vectorized UDFs We introduce a @pandas_udf decorator (or annotation) to define vectorized UDFs which takes one or more pandas.Series or one integer value meaning the length of the input value for 0-parameter UDFs. The return value should be pandas.Series of the specified type and the length of the returned value should be the same as input value. We can define vectorized UDFs as: @pandas_udf(DoubleType()) def plus(v1, v2): return v1 + v2 or we can define as: plus = pandas_udf(lambda v1, v2: v1 + v2, DoubleType()) We can use it similar to row-by-row UDFs: df.withColumn('sum', plus(df.v1, df.v2)) As for 0-parameter UDFs, we can define and use as: @pandas_udf(LongType()) def f0(size): return pd.Series(1).repeat(size) df.select(f0())
          Hide
          icexelloss Li Jin added a comment -

          Takuya Ueshin, thanks for the summary. +1 for this API.

          Although the 0-parameter UDF is a bit confusing. Where does "size" come from?

          Show
          icexelloss Li Jin added a comment - Takuya Ueshin , thanks for the summary. +1 for this API. Although the 0-parameter UDF is a bit confusing. Where does "size" come from?
          Hide
          ueshin Takuya Ueshin added a comment -

          Li Jin
          We can know the length of input from RecordBatch and pass it as a hint to the actual function for users to create the output of the exact length.

          Show
          ueshin Takuya Ueshin added a comment - Li Jin We can know the length of input from RecordBatch and pass it as a hint to the actual function for users to create the output of the exact length.
          Hide
          icexelloss Li Jin added a comment - - edited

          Takuya Ueshin, Got it. I'd actually prefer doing it this way:

          @pandas_udf(LongType())
          def f0(v):
              return pd.Series(1).repeat(len(v))
          
          df.select(f0(F.lit(0)))
          

          Instead of passing the size as a scalar to the function.

          Passing size to f0 feels unintuitive to me because f0 is defined as a function that takes one argument - def f0(size), but being invoked with 0 args - f0().

          Show
          icexelloss Li Jin added a comment - - edited Takuya Ueshin , Got it. I'd actually prefer doing it this way: @pandas_udf(LongType()) def f0(v): return pd.Series(1).repeat(len(v)) df.select(f0(F.lit(0))) Instead of passing the size as a scalar to the function. Passing size to f0 feels unintuitive to me because f0 is defined as a function that takes one argument - def f0(size), but being invoked with 0 args - f0().
          Hide
          cloud_fan Wenchen Fan added a comment -

          hmmm, your proposal has a weird usage: users need to pass an argument to a 0-parameter UDF, and transferring this constant column batch is also a waste.

          Defining the 0-parameter udf with a size parameter is unintuitive, but I can't think of a better idea...

          Show
          cloud_fan Wenchen Fan added a comment - hmmm, your proposal has a weird usage: users need to pass an argument to a 0-parameter UDF, and transferring this constant column batch is also a waste. Defining the 0-parameter udf with a size parameter is unintuitive, but I can't think of a better idea...
          Hide
          bryanc Bryan Cutler added a comment - - edited

          I'm good with the API summary proposed by Takuya Ueshin, but I'm also not crazy about the 0-parameter UDF. My only other idea would be to allow any pandas_udf to optionally define `**kwargs` as the last argument in the UDF. In the python worker, it would be easy to inspect the UDF to check if it accepts kwargs and then provide the size hint (could add other metadata also). Of course isn't the most intuitive also, but at least would be consistent across all {{pandas_udf}}s. So a 0-parameter example would look like:

          @pandas_udf(LongType())
          def f0(**kwargs):
              return pd.Series(1).repeat(kwargs['size'])
          
          df.select(f0())
          

          It's not perfect since it would require the user to know that size is in the kwargs, but just thought I'd throw it out as an alternative.

          Show
          bryanc Bryan Cutler added a comment - - edited I'm good with the API summary proposed by Takuya Ueshin , but I'm also not crazy about the 0-parameter UDF. My only other idea would be to allow any pandas_udf to optionally define `**kwargs` as the last argument in the UDF. In the python worker, it would be easy to inspect the UDF to check if it accepts kwargs and then provide the size hint (could add other metadata also). Of course isn't the most intuitive also, but at least would be consistent across all {{pandas_udf}}s. So a 0-parameter example would look like: @pandas_udf(LongType()) def f0(**kwargs): return pd.Series(1).repeat(kwargs['size']) df.select(f0()) It's not perfect since it would require the user to know that size is in the kwargs, but just thought I'd throw it out as an alternative.
          Hide
          leif Leif Walsh added a comment - - edited

          I'm not 100% sure this is legal pandas but I think it might be. If no columns are passed in, we might be able to pass in a zero-column dataframe with an integer index from 0 to size-1. Then pd.Series(data=1, index=df.index) might do the right thing, without magic extra parameters. I think this would be conceptually correct too. I will verify and provide sample code this weekend if I find the time.

          Show
          leif Leif Walsh added a comment - - edited I'm not 100% sure this is legal pandas but I think it might be. If no columns are passed in, we might be able to pass in a zero-column dataframe with an integer index from 0 to size-1. Then pd.Series(data=1, index=df.index) might do the right thing, without magic extra parameters. I think this would be conceptually correct too. I will verify and provide sample code this weekend if I find the time.
          Hide
          leif Leif Walsh added a comment -

          Yep, that's totally a thing:

          In [1]: import pandas as pd
          
          In [2]: pd.DataFrame(index=list(range(100)))
          Out[2]: 
          Empty DataFrame
          Columns: []
          Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
          
          [100 rows x 0 columns]
          
          In [3]: df = Out[2]
          
          In [4]: pd.Series(data=1, index=df.index)
          Out[4]: 
          0     1
          1     1
          2     1
          3     1
          4     1
          5     1
          6     1
          7     1
          8     1
          9     1
          10    1
          11    1
          12    1
          13    1
          14    1
          15    1
          16    1
          17    1
          18    1
          19    1
          20    1
          21    1
          22    1
          23    1
          24    1
          25    1
          26    1
          27    1
          28    1
          29    1
               ..
          70    1
          71    1
          72    1
          73    1
          74    1
          75    1
          76    1
          77    1
          78    1
          79    1
          80    1
          81    1
          82    1
          83    1
          84    1
          85    1
          86    1
          87    1
          88    1
          89    1
          90    1
          91    1
          92    1
          93    1
          94    1
          95    1
          96    1
          97    1
          98    1
          99    1
          Length: 100, dtype: int64
          
          

          So, how about:

          @pandas_udf(LongType())
          def f0(df):
              return pd.Series(data=1, index=df.index)
          
          df.select(f0())
          
          Show
          leif Leif Walsh added a comment - Yep, that's totally a thing: In [1]: import pandas as pd In [2]: pd.DataFrame(index=list(range(100))) Out[2]: Empty DataFrame Columns: [] Index: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] [100 rows x 0 columns] In [3]: df = Out[2] In [4]: pd.Series(data=1, index=df.index) Out[4]: 0 1 1 1 2 1 3 1 4 1 5 1 6 1 7 1 8 1 9 1 10 1 11 1 12 1 13 1 14 1 15 1 16 1 17 1 18 1 19 1 20 1 21 1 22 1 23 1 24 1 25 1 26 1 27 1 28 1 29 1 .. 70 1 71 1 72 1 73 1 74 1 75 1 76 1 77 1 78 1 79 1 80 1 81 1 82 1 83 1 84 1 85 1 86 1 87 1 88 1 89 1 90 1 91 1 92 1 93 1 94 1 95 1 96 1 97 1 98 1 99 1 Length: 100, dtype: int64 So, how about: @pandas_udf(LongType()) def f0(df): return pd.Series(data=1, index=df.index) df.select(f0())
          Hide
          leif Leif Walsh added a comment -

          You can also make a Series with no content and an index, but this becomes a float64 Series full of NaNs, which seems less good: it's indistinguishable from a column which is actually full of NaNs:

          In [5]: pd.Series(index=list(range(100)))
          Out[5]: 
          0    NaN
          1    NaN
          2    NaN
          3    NaN
          4    NaN
          5    NaN
          6    NaN
          7    NaN
          8    NaN
          9    NaN
          10   NaN
          11   NaN
          12   NaN
          13   NaN
          14   NaN
          15   NaN
          16   NaN
          17   NaN
          18   NaN
          19   NaN
          20   NaN
          21   NaN
          22   NaN
          23   NaN
          24   NaN
          25   NaN
          26   NaN
          27   NaN
          28   NaN
          29   NaN
                ..
          70   NaN
          71   NaN
          72   NaN
          73   NaN
          74   NaN
          75   NaN
          76   NaN
          77   NaN
          78   NaN
          79   NaN
          80   NaN
          81   NaN
          82   NaN
          83   NaN
          84   NaN
          85   NaN
          86   NaN
          87   NaN
          88   NaN
          89   NaN
          90   NaN
          91   NaN
          92   NaN
          93   NaN
          94   NaN
          95   NaN
          96   NaN
          97   NaN
          98   NaN
          99   NaN
          Length: 100, dtype: float64
          
          
          Show
          leif Leif Walsh added a comment - You can also make a Series with no content and an index, but this becomes a float64 Series full of NaNs, which seems less good: it's indistinguishable from a column which is actually full of NaNs: In [5]: pd.Series(index=list(range(100))) Out[5]: 0 NaN 1 NaN 2 NaN 3 NaN 4 NaN 5 NaN 6 NaN 7 NaN 8 NaN 9 NaN 10 NaN 11 NaN 12 NaN 13 NaN 14 NaN 15 NaN 16 NaN 17 NaN 18 NaN 19 NaN 20 NaN 21 NaN 22 NaN 23 NaN 24 NaN 25 NaN 26 NaN 27 NaN 28 NaN 29 NaN .. 70 NaN 71 NaN 72 NaN 73 NaN 74 NaN 75 NaN 76 NaN 77 NaN 78 NaN 79 NaN 80 NaN 81 NaN 82 NaN 83 NaN 84 NaN 85 NaN 86 NaN 87 NaN 88 NaN 89 NaN 90 NaN 91 NaN 92 NaN 93 NaN 94 NaN 95 NaN 96 NaN 97 NaN 98 NaN 99 NaN Length: 100, dtype: float64
          Hide
          ueshin Takuya Ueshin added a comment -

          Bryan Cutler Thank you for your suggestion. **kwargs might be a good idea to provide the size hint and other metadata in the future. I'm not sure how to inspect the UDF to check if it accepts kwargs yet, though.
          Should we make **kwargs required? If not, users can still define 0-parameter UDF without it and we can't determine how to handle it.

          Show
          ueshin Takuya Ueshin added a comment - Bryan Cutler Thank you for your suggestion. **kwargs might be a good idea to provide the size hint and other metadata in the future. I'm not sure how to inspect the UDF to check if it accepts kwargs yet, though. Should we make **kwargs required? If not, users can still define 0-parameter UDF without it and we can't determine how to handle it.
          Hide
          ueshin Takuya Ueshin added a comment -

          Leif Walsh Thank you for your proposal.
          I'm sorry but I couldn't figure out the big difference between only size parameter and a Series/DataFrame with no content and an index, and I guess no-content Series/DataFrame confuses users more.

          Show
          ueshin Takuya Ueshin added a comment - Leif Walsh Thank you for your proposal. I'm sorry but I couldn't figure out the big difference between only size parameter and a Series/DataFrame with no content and an index, and I guess no-content Series/DataFrame confuses users more.
          Hide
          leif Leif Walsh added a comment -

          I think the size parameter is confusing: if a 1-or-more-parameter UDF gets called with some number of Series or DataFrame objects, but a 0-parameter UDF switches to getting a size scalar, it's less consistent. In fact, the most consistent interface would be for all UDFs to get called with a single DataFrame parameter, containing all the columns, already aligned. However, I somewhat more like the idea of a UDF on multiple columns getting separate Series objects, as this makes the UDF's function signature look more similar to how it will be called.

          You can use the inspect module in the python standard library to find out what parameters a function accepts, but I'd like to warn you against using a magic **kwargs parameter, that's a fairly non-intuitive API and is also somewhat brittle.

          Show
          leif Leif Walsh added a comment - I think the size parameter is confusing: if a 1-or-more-parameter UDF gets called with some number of Series or DataFrame objects, but a 0-parameter UDF switches to getting a size scalar, it's less consistent. In fact, the most consistent interface would be for all UDFs to get called with a single DataFrame parameter, containing all the columns, already aligned. However, I somewhat more like the idea of a UDF on multiple columns getting separate Series objects, as this makes the UDF's function signature look more similar to how it will be called. You can use the inspect module in the python standard library to find out what parameters a function accepts, but I'd like to warn you against using a magic **kwargs parameter, that's a fairly non-intuitive API and is also somewhat brittle.
          Hide
          bryanc Bryan Cutler added a comment -

          Thanks Takuya Ueshin, I think having an optional kwargs at least would make things consistent across all types of pandas_udf. Here is how you could use to check that a function has a ** arg that would accept keyword arguments:

          In [1]: import inspect
          
          In [2]: def f(a, b, **kwargs):
             ...:     print(a, b, kwargs)
          
          In [3]: def g(a, b):
             ...:     print(a, b) 
          
          In [4]: inspect.getargspec(f)
          Out[4]: ArgSpec(args=['a', 'b'], varargs=None, keywords='kwargs', defaults=None)
          
          In [5]: inspect.getargspec(g)
          Out[5]: ArgSpec(args=['a', 'b'], varargs=None, keywords=None, defaults=None)
          
          In [6]: if inspect.getargspec(f).keywords is not None:
             ...:     f(1, 2, size=3)
             ...:     
          (1, 2, {'size': 3})
          

          If the user defines a 0-parameter UDF without a **kwargs then it is probably best to raise an error if the returned size isn't right - although it would be possible to repeat and/or slice..

          Show
          bryanc Bryan Cutler added a comment - Thanks Takuya Ueshin , I think having an optional kwargs at least would make things consistent across all types of pandas_udf . Here is how you could use to check that a function has a ** arg that would accept keyword arguments: In [1]: import inspect In [2]: def f(a, b, **kwargs): ...: print(a, b, kwargs) In [3]: def g(a, b): ...: print(a, b) In [4]: inspect.getargspec(f) Out[4]: ArgSpec(args=['a', 'b'], varargs=None, keywords='kwargs', defaults=None) In [5]: inspect.getargspec(g) Out[5]: ArgSpec(args=['a', 'b'], varargs=None, keywords=None, defaults=None) In [6]: if inspect.getargspec(f).keywords is not None: ...: f(1, 2, size=3) ...: (1, 2, {'size': 3}) If the user defines a 0-parameter UDF without a **kwargs then it is probably best to raise an error if the returned size isn't right - although it would be possible to repeat and/or slice..
          Hide
          ueshin Takuya Ueshin added a comment -

          Leif Walsh, Bryan Cutler Thanks for the instruction about inspect module.
          I believe we got a consensus about the basic APIs except for the size hint, so I've decided to submit a pr based on my proposal at first and let's discuss the size hint for 0-parameter UDF (or for more parameter UDFs to be consistent) through the review process.

          Show
          ueshin Takuya Ueshin added a comment - Leif Walsh , Bryan Cutler Thanks for the instruction about inspect module. I believe we got a consensus about the basic APIs except for the size hint, so I've decided to submit a pr based on my proposal at first and let's discuss the size hint for 0-parameter UDF (or for more parameter UDFs to be consistent) through the review process.
          Hide
          apachespark Apache Spark added a comment -

          User 'ueshin' has created a pull request for this issue:
          https://github.com/apache/spark/pull/19147

          Show
          apachespark Apache Spark added a comment - User 'ueshin' has created a pull request for this issue: https://github.com/apache/spark/pull/19147
          Hide
          apachespark Apache Spark added a comment -

          User 'BryanCutler' has created a pull request for this issue:
          https://github.com/apache/spark/pull/18659

          Show
          apachespark Apache Spark added a comment - User 'BryanCutler' has created a pull request for this issue: https://github.com/apache/spark/pull/18659
          Hide
          bryanc Bryan Cutler added a comment -

          I attached my PR because it had already been done and pretty much matches the proposal from here. It is slightly different to define the UDF, I use just a flag in the normal UDF declaration, for example udf(my_func, DoubleType(), vectorized=True, but it is trivial to change it to pandas_udf if that is what's decided. I'm not exactly sure if there was a problem in using my PR for this, but I'm more than happy to discuss the differences and hopefully avoid duplicated work.

          Show
          bryanc Bryan Cutler added a comment - I attached my PR because it had already been done and pretty much matches the proposal from here. It is slightly different to define the UDF, I use just a flag in the normal UDF declaration, for example udf(my_func, DoubleType(), vectorized=True , but it is trivial to change it to pandas_udf if that is what's decided. I'm not exactly sure if there was a problem in using my PR for this, but I'm more than happy to discuss the differences and hopefully avoid duplicated work.
          Hide
          icexelloss Li Jin added a comment -

          To Bryan Cutler's point, PR #18659 and PR #19147 are largely similar and it makes sense not to keep two PR for the same thing.

          Also, what I am curious about is what kind of guide lines to follow to avoid such duplicate work in the future. IMHO, Bryan Cutler has been linked his PR to this Jira a while back and has been actively engaging in all discussions, so I am not sure why do we need a similar second PR in this case. (Of course, however, if people think #18659 and #19147 are very different, that's another story).

          I have worked with Bryan Cutler in the past together on SPARK-13534 and collaborating on the same branch worked quite well for us. Maybe that's something we should encourage?

          Show
          icexelloss Li Jin added a comment - To Bryan Cutler 's point, PR #18659 and PR #19147 are largely similar and it makes sense not to keep two PR for the same thing. Also, what I am curious about is what kind of guide lines to follow to avoid such duplicate work in the future. IMHO, Bryan Cutler has been linked his PR to this Jira a while back and has been actively engaging in all discussions, so I am not sure why do we need a similar second PR in this case. (Of course, however, if people think #18659 and #19147 are very different, that's another story). I have worked with Bryan Cutler in the past together on SPARK-13534 and collaborating on the same branch worked quite well for us. Maybe that's something we should encourage?
          Hide
          bryanc Bryan Cutler added a comment -

          Thanks Li Jin. I definitely think collaboration should be encouraged, especially in this case where there was existing work done by you and I that could have been leveraged. The major difference between #18659 and #19147 is the Arrow data format used. #19147 uses the Arrow stream format which has some pros and cons that I brought up in the PR.

          Show
          bryanc Bryan Cutler added a comment - Thanks Li Jin . I definitely think collaboration should be encouraged, especially in this case where there was existing work done by you and I that could have been leveraged. The major difference between #18659 and #19147 is the Arrow data format used. #19147 uses the Arrow stream format which has some pros and cons that I brought up in the PR.
          Hide
          ueshin Takuya Ueshin added a comment -

          Li Jin Thank you for your suggestion. I agree that we should unify the prs to 1 pr and the collaboration.
          As Wenchen Fan mentioned in Bryan Cutler's pr, I'll send some prs toward Bryan Cutler's branch per functionality to discuss each.

          Show
          ueshin Takuya Ueshin added a comment - Li Jin Thank you for your suggestion. I agree that we should unify the prs to 1 pr and the collaboration. As Wenchen Fan mentioned in Bryan Cutler 's pr, I'll send some prs toward Bryan Cutler 's branch per functionality to discuss each.
          Hide
          cloud_fan Wenchen Fan added a comment -

          Issue resolved by pull request 18659
          https://github.com/apache/spark/pull/18659

          Show
          cloud_fan Wenchen Fan added a comment - Issue resolved by pull request 18659 https://github.com/apache/spark/pull/18659
          Hide
          icexelloss Li Jin added a comment -

          Wenchen Fan, do we want to track other vectorized udf efforts (group, window, etc) in this Jira or a separate one?

          Show
          icexelloss Li Jin added a comment - Wenchen Fan , do we want to track other vectorized udf efforts (group, window, etc) in this Jira or a separate one?
          Hide
          cloud_fan Wenchen Fan added a comment -

          yea, let's do that in a separated ticket.

          Show
          cloud_fan Wenchen Fan added a comment - yea, let's do that in a separated ticket.
          Hide
          rxin@databricks.com Reynold Xin added a comment -

          Maybe create an umbrella ticket so it is easier to link.

          Show
          rxin@databricks.com Reynold Xin added a comment - Maybe create an umbrella ticket so it is easier to link.
          Hide
          rxin Reynold Xin added a comment -

          Where did we settle on 0-arg UDFs? I think we should just disallow it, since users can trivially workaround it by creating a 1-arg UDF that ignores the arg.

          Show
          rxin Reynold Xin added a comment - Where did we settle on 0-arg UDFs? I think we should just disallow it, since users can trivially workaround it by creating a 1-arg UDF that ignores the arg.
          Hide
          hyukjin.kwon Hyukjin Kwon added a comment - - edited

          Reynold Xin, I suggested to disallow it here and I think reviewers and committers agreed with it for the similar reasons.
          So, it was separately fixed in this commit.

          Therefore, the cases below are not allowed:

          >>> from pyspark.sql.functions import pandas_udf
          >>> from pyspark.sql.types import *
          >>> @pandas_udf(returnType=LongType())
          ... def add_one():
          ...     return 1
          ...
          
          NotImplementedError: 0-parameter pandas_udfs are not currently supported
          >>> pandas_udf(lambda: 1, "long")
          ...
          NotImplementedError: 0-parameter pandas_udfs are not currently supported
          >>> pandas_udf(lambda: 1, LongType())
          ...
          NotImplementedError: 0-parameter pandas_udfs are not currently supported
          
          Show
          hyukjin.kwon Hyukjin Kwon added a comment - - edited Reynold Xin , I suggested to disallow it here and I think reviewers and committers agreed with it for the similar reasons. So, it was separately fixed in this commit . Therefore, the cases below are not allowed: >>> from pyspark.sql.functions import pandas_udf >>> from pyspark.sql.types import * >>> @pandas_udf(returnType=LongType()) ... def add_one(): ... return 1 ... NotImplementedError: 0-parameter pandas_udfs are not currently supported >>> pandas_udf(lambda: 1, " long " ) ... NotImplementedError: 0-parameter pandas_udfs are not currently supported >>> pandas_udf(lambda: 1, LongType()) ... NotImplementedError: 0-parameter pandas_udfs are not currently supported
          Hide
          rxin Reynold Xin added a comment -

          OK it would be great to have a better error message, e.g. remove "currently", and tell users to work around the limitation, they can create an 1-arg udf and ignore the arg.

          Show
          rxin Reynold Xin added a comment - OK it would be great to have a better error message, e.g. remove "currently", and tell users to work around the limitation, they can create an 1-arg udf and ignore the arg.
          Hide
          hyukjin.kwon Hyukjin Kwon added a comment -

          Will keep in mind and suggest to fix it or fix it by myself when we happen to fix some codes around here (of course before the release).

          Show
          hyukjin.kwon Hyukjin Kwon added a comment - Will keep in mind and suggest to fix it or fix it by myself when we happen to fix some codes around here (of course before the release).

            People

            • Assignee:
              bryanc Bryan Cutler
              Reporter:
              rxin Reynold Xin
            • Votes:
              11 Vote for this issue
              Watchers:
              39 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development