Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28269

Pandas Grouped Map UDF can get deadlocked

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.3
    • None
    • PySpark

    Description

      I'm working with Pyspark version 2.4.3.

      I have a big data frame:

      • ~15M rows
      • ~130 columns
      • ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB.

      I have some code that groups this data frame and applying a Pandas-UDF:

       

      from pyspark.sql import Row
      from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
      from pyspark.sql.types import *
      from pyspark.sql import functions as F
      
      initial_list = range(4500)
      rdd = sc.parallelize(initial_list)
      rdd = rdd.map(lambda x: Row(val=x))
      initial_spark_df = spark.createDataFrame(rdd)
      
      cols_count = 132
      rows = 1000
      
      # ------------------- Start Generating the big data frame-------------------
      # Generating the schema
      schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)])
      
      @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
      def random_pd_df_generator(df):
          import numpy as np
          import pandas as pd
          return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count))
      
      
      full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
      # ------------------- End Generating the big data frame-------------------
      
      # -------------------Start the bug reproduction---------------------------
      grouped_col = "col_0"
      
      @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
      def very_simpl_udf(pdf):
          import pandas as pd
          ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
          return ret_val
      
      # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset.
      # Here is where to program gets stuck
      full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
      assert False, "If we're, means that the issue wasn't reproduced"
      
      

       

      The above code gets stuck on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader)

       

      for batch in reader:
       yield [self.arrow_to_pandas(c) for c in      pa.Table.from_batches([batch]).itercolumns()]

       

       You can just run the first code snippet and it will reproduce.

      Open a Pyspark shell with this configuration:

      pyspark --conf "spark.python.worker.memory=3G" --conf "spark.executor.memory=20G" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf "spark.driver.memory=10G"

       

      Versions:

      • pandas - 0.24.2
      • pyarrow - 0.13.0
      • Spark - 2.4.2
      • Python - 2.7.16

      Attachments

        1. Untitled.xcf
          704 kB
          Modi Tamam

        Activity

          People

            Unassigned Unassigned
            modi.tamam@gmail.com Modi Tamam
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: