Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22410

Excessive spill for Pyspark UDF when a row has shrunk

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.3.0
    • 2.3.0
    • PySpark
    • None
    • Reproduced on up-to-date master

    Description

      Hi,

      The following code processes 900KB of data and outputs around 2MB of data. However, to process it, Spark needs to spill roughly 12 GB of data.

      from pyspark.sql import SparkSession
      from pyspark.sql.functions import *
      from pyspark.sql.types import *
      import json
      
      ss = SparkSession.builder.getOrCreate()
      
      # Create a few lines of data (5 lines).
      # Each line is made of a string, and an array of 10000 strings
      # Total size of data is around 900 KB
      
      lines_of_file = [ "this is a line" for x in xrange(10000) ]
      file_obj = [ "this_is_a_foldername/this_is_a_filename", lines_of_file ]
      data = [ file_obj for x in xrange(5) ]
      
      # Make a two-columns dataframe out of it
      small_df = ss.sparkContext.parallelize(data).map(lambda x : (x[0], x[1])).toDF(["file", "lines"])
      
      # We then explode the array, so we now have 50000 rows in the dataframe, with 2 columns, the 2nd 
      # column now has only "this is a line" as content
      exploded = small_df.select("file", explode("lines"))
      
      print("Exploded")
      print(exploded.explain())
      
      # Now, just process it with a trivial Pyspark UDF that touches the first column
      # (the one which was not an array)
      
      def split_key(s):
          return s.split("/")[1]
      split_key_udf = udf(split_key, StringType())
      
      with_filename = exploded.withColumn("filename", split_key_udf("file"))
      
      # As expected, explain plan is very simple (BatchEval -> Explode -> Project -> ScanExisting)
      print(with_filename.explain())
      
      # Getting the head will spill around 12 GB of data
      print(with_filename.head())
      

      The spill happens in the HybridRowQueue that is used to merge the part that went through the Python worker and the part that didn't.

      The problem comes from the fact that when it is added to the HybridRowQueue, the UnsafeRow has a totalSizeInBytes of ~240000 (seen by adding debug message in HybridRowQueue), whereas, since it's after the explode, the actual size of the row should be in the ~60 bytes range.

      My understanding is that the row has retained the size it consumed prior to the explode (at that time, the size of each of the 5 rows was indeed ~240000 bytes.

      A workaround is to do exploded.cache() before calling the UDF. The fact of going through the InMemoryColumnarTableScan "resets" the wrongful size of the UnsafeRow.

      Thanks!

      Attachments

        Activity

          People

            viirya L. C. Hsieh
            cstenac Clément Stenac
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: