Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.5.0
-
None
-
None
-
python 3.11.7
-> pip install pyspark[sql]
pyspark: 3.5.0
pyarrow: 14.0.2
pandas: 2.1.4
Description
When returning a large pandas Dataframe from a UDF, it can get converted to a "pyarrow.lib.ChunkedArray", which is not allowed in the pyarrow "StructArray.from_arrays" function, giving the error message:
File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>
code to reproduce:
from pyspark.sql import DataFrame, SparkSession from typing import List import pyspark.sql.types as T from pyspark.sql.functions import pandas_udf import numpy as np import pyarrow as pa import pandas as pd def large_pandas_udf(iterator): for pdf in iterator: big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)] strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)] arr = np.array(strings_list) yield pd.DataFrame(arr, columns=['string_column']) def large_pandas_workaround_udf(iterator): for pdf in iterator: big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)] strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)] arr = np.array(strings_list) num_chunks = 20 chunk_size = len(arr) // 20 for i in range(num_chunks+10): yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size], columns=['string_column']) def run_udf(spark, udf=large_pandas_udf): schema = T.StructType([T.StructField("string_column", T.StringType())]) spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled', 'false') # get full error traceback df = spark.createDataFrame([['v']], schema) return df.mapInPandas(udf, schema).count()
stacktrace:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<string>", line 30, in run_udf
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1234, in count
return int(self._jdf.count())
^^^^^^^^^^^^^^^^^
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in {}call{}
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
raise converted from None
pyspark.errors.exceptions.captured.PythonException:
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
process()
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
serializer.dump_stream(out_iter, outfile)
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 470, in dump_stream
return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 100, in dump_stream
for batch in iterator:
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 464, in init_stream_yield_batches
batch = self._create_batch(series)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 448, in _create_batch
arrs.append(self._create_struct_array(s, t))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 410, in _create_struct_array
return pa.StructArray.from_arrays(struct_arrs, struct_names)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays
TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>
Related github issue discussing this pyarrow array constructor behavior: https://github.com/apache/arrow/issues/34755