Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-42645

Introduce feature to allow for function caching across input rows.

    XMLWordPrintableJSON

Details

    • Wish
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.2
    • None
    • Optimizer
    • None

    Description

      Introduce the ability to make functions cachable across input rows. I'm imagining this function to work similarly to python's functools.cache where you could add a decorator to certain expensive functions that you know will regularly encounter repeated values as you read the input data.

       

      With this new feature you would be able to significantly speed up many real world jobs that use expensive functions on data that naturally has repeated column values. An example of this would be parsing user agent fields from internet traffic logs partitioned by user id. Even though the data is not sorted by user agent, in a sample of 10k continuous rows there would be much less than 10k unique values because popular user agents exist on a large fraction of traffic and the user agent of the first event from a user is likely to be shared among all subsequent events from that user. Currently there is a way to hack an approximation of this in a python implementation of this via pandas_udfs. This works because pandas_udfs by default read in batches of 10k input rows, so you can used a caching UDF that empties every 10k rows. At my current job I have noticed that some applications of this trick can significantly speed up queries where custom UDFs are the bottleneck in a query. An example of this is

       

      @F.pandas_udf(T.StringType())
      def parse_user_agent_field(user_agent_series):
          @functools.cache
          def parse_user_agent_field_helper(user_agent):
              # parse the user agent and return the relevant field
              return None
          return user_agent_series.apply(parse_user_agent_field_helper)

       

       

      It would be nice if there was some official support for this behavior for both built in functions and UDFs. If there was official support for this I'd imagine it to look something like

       

      # using pyspark dataframe API
      df = df.withColumn(output_col, F.cache(F.function)(input_col))

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            mtong Michael Tong
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: