Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30670

Pipes for PySpark

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.4.4
    • None
    • SQL
    • None

    Description

      I would propose to add a `pipe` method to a Spark Dataframe. It allows for a functional programming pattern that is inspired from the tidyverse that is currently missing. The pandas community also recently adopted this pattern, documented [here](https://tomaugspurger.github.io/method-chaining.html).

      This is the idea. Suppose you had this;

      # file that has [user, date, timestamp, eventtype]
      ddf = spark.read.parquet("<filepath>")
      
      w_user = Window().partitionBy("user")
      w_user_date = Window().partitionBy("user", "date")
      w_user_time = Window().partitionBy("user").sortBy("timestamp")
      
      thres_sesstime = 60 * 15 
      min_n_rows = 10
      min_n_sessions = 5
      
      clean_ddf = (ddf
        .withColumn("delta", sf.col("timestamp") - sf.lag("timestamp").over(w_user))
        .withColumn("new_session", (sf.col("delta") > thres_sesstime).cast("integer"))
        .withColumn("session", sf.sum(sf.col("new_session")).over(w_user))
        .drop("new_session")
        .drop("delta")
        .withColumn("nrow_user", sf.count(sf.col("timestamp")))
        .withColumn("nrow_user_date", sf.approx_count_distinct(sf.col("date")))
        .filter(sf.col("nrow_user") > min_n_rows)
        .filter(sf.col("nrow_user_date") > min_n_sessions)
        .drop("nrow_user")
        .drop("nrow_user_date"))
      

      The code works and it is somewhat clear. We add a session to the dataframe and then we use this to remove outliers. The issue is that this chain of commands can get quite long so instead it might be better to turn this into functions.

      def add_session(dataf, session_threshold=60*15):
          w_user = Window().partitionBy("user")
        
          return (dataf  
          .withColumn("delta", sf.col("timestamp") - sf.lag("timestamp").over(w_user))
          .withColumn("new_session", (sf.col("delta") > threshold_sesstime).cast("integer"))
          .withColumn("session", sf.sum(sf.col("new_session")).over(w_user))
          .drop("new_session")
          .drop("delta"))
      
      def remove_outliers(dataf, min_n_rows=10, min_n_sessions=5):
          w_user_date = Window().partitionBy("user", "date")
          w_user_time = Window().partitionBy("user").sortBy("timestamp")    
          
          return (dataf  
          .withColumn("nrow_user", sf.count(sf.col("timestamp")))
          .withColumn("nrow_user_date", sf.approx_count_distinct(sf.col("date")))
          .filter(sf.col("nrow_user") > min_n_rows)
          .filter(sf.col("nrow_user_date") > min_n_sessions)
          .drop("nrow_user")
          .drop("nrow_user_date"))
      

      The issue lies not in these functions. These functions are great! You can unit test them and they really give nice verbs that function as an abstraction. The issue is in how you now need to apply them.

      remove_outliers(add_session(ddf, session_threshold=1000), min_n_rows=11)
      

      It'd be much nicer to perhaps allow for this;

      (ddf
        .pipe(add_session, session_threshold=900)
        .pipe(remove_outliers, min_n_rows=11))
      

      The cool thing about this is that you can really easily allow for method chaining but also that you have an amazing way to split high level code and low level code. You still allow mutation as a high level by exposing keyword arguments but you can easily find the lower level code in debugging because you've contained details to their functions.

      For code maintenance, I've relied on this pattern a lot personally. But sofar, I've always monkey-patched spark to be able to do this.

      from pyspark.sql import DataFrame 
      
      def pipe(self, func, *args, **kwargs):
          return func(self, *args, **kwargs)
      

      Could I perhaps add these few lines of code to the codebase?

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              koaning Vincent
              Holden Karau Holden Karau
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 24h
                  24h
                  Remaining:
                  Remaining Estimate - 24h
                  24h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified