Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37765

PySpark Dynamic DataFrame for easier inheritance



    • New Feature
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.2.0
    • None
    • PySpark
    • None


      In typical development settings, multiple tables with very different concepts are mapped to the same `DataFrame` class. The inheritance from the pyspark `DataFrame` class is a bit cumbersome because of the chainable methods and it also makes it difficult to abstract regularly used queries. The proposal is to generate a `DynamicDataFrame` that allows easy inheritance retaining `DataFrame` methods without losing chainability neither for the newly generated queries nor for the usual dataframe ones.

      In our experience, this allowed us to iterate much faster, generating business-centric classes in a couple of lines of code. Here's an example of what the application code would look like. Attached in the end is a summary of the different strategies that are usually pursued when trying to abstract queries.

      import pyspark
      from pyspark.sql import DynamicDataFrame
      from pyspark.sql import functions as F
      spark = pyspark.sql.SparkSession.builder.getOrCreate()
      class Inventory(DynamicDataFrame):
          def update_prices(self, factor: float = 2.0):
              return self.withColumn("price", F.col("price") * factor)
      base_dataframe = spark.createDataFrame(
          data=[["product_1", 2.0], ["product_2", 4.0]],
          schema=["name", "price"],
      print("Doing an inheritance mediated by DynamicDataFrame")
      inventory = Inventory(base_dataframe)
      inventory_updated = inventory.update_prices(2.0).update_prices(5.0)
      print("After multiple uses of the query we still have the desired type")
      print(f"type(inventory_updated): {type(inventory_updated)}")
      print("We can still use the usual dataframe methods")
      expensive_inventory = inventory_updated.filter(F.col("price") > 25)
      print("And retain the desired type")
      print(f"type(expensive_inventory): {type(expensive_inventory)}")

      The PR linked to this ticket is an implementation of the DynamicDataFramed used in this snippet.


      Other strategies found for handling the query abstraction:

      1. Functions: using functions that call dataframes and returns them transformed. It had a couple of pitfalls: we had to manage the namespaces carefully, there is no clear new object and also the "chainability" didn't feel very pyspark-y.
      2. MonkeyPatching DataFrame: we monkeypatched (https://stackoverflow.com/questions/5626193/what-is-monkey-patching) methods with the regularly done queries inside the DataFrame class. This one kept it pyspark-y, but there was no easy way to handle segregated namespaces/
      3. Inheritances: create the class `MyBusinessDataFrame`, inherit from `DataFrame` and implement the methods there. This one solves all the issues, but with a caveat: the chainable methods cast the result explicitly to `DataFrame` (see https://github.com/apache/spark/blob/master/python/pyspark/sql/dataframe.py#L1910 e g). Therefore, everytime you use one of the parent's methods you'd have to re-cast to `MyBusinessDataFrame`, making the code cumbersome.


      (see https://mail-archives.apache.org/mod_mbox/spark-dev/202111.mbox/browser for the link to the original mail in which we proposed this feature)




            Unassigned Unassigned
            pabloalcain Pablo Alcain
            0 Vote for this issue
            2 Start watching this issue