Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.3
-
None
Description
I'm working with Pyspark version 2.4.3.
I have a big data frame:
- ~15M rows
- ~130 columns
- ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
I have some code that groups this data frame and applying a Pandas-UDF:
from pyspark.sql import Row from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json from pyspark.sql.types import * from pyspark.sql import functions as F initial_list = range(4500) rdd = sc.parallelize(initial_list) rdd = rdd.map(lambda x: Row(val=x)) initial_spark_df = spark.createDataFrame(rdd) cols_count = 132 rows = 1000 # ------------------- Start Generating the big data frame------------------- # Generating the schema schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)]) @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) def random_pd_df_generator(df): import numpy as np import pandas as pd return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count)) full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) # ------------------- End Generating the big data frame------------------- # -------------------Start the bug reproduction--------------------------- grouped_col = "col_0" @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) def very_simpl_udf(pdf): import pandas as pd ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) return ret_val # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset. # Here is where to program gets stuck full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() assert False, "If we're, means that the issue wasn't reproduced"
The above code gets stuck on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader)
for batch in reader: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]
You can just run the first code snippet and it will reproduce.
Open a Pyspark shell with this configuration:
pyspark --conf "spark.python.worker.memory=3G" --conf "spark.executor.memory=20G" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf "spark.driver.memory=10G"
Versions:
- pandas - 0.24.2
- pyarrow - 0.13.0
- Spark - 2.4.2
- Python - 2.7.16