Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-46776

Pandas UDF Serialization Error: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.5.0
    • None
    • PySpark
    • None
    • python 3.11.7

      -> pip install pyspark[sql]

      pyspark: 3.5.0

      pyarrow: 14.0.2

      pandas: 2.1.4

    Description

      When returning a large pandas Dataframe from a UDF, it can get converted to a "pyarrow.lib.ChunkedArray", which is not allowed in the pyarrow "StructArray.from_arrays" function, giving the error message:

      File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'> 

      code to reproduce:

      from pyspark.sql import DataFrame, SparkSession
      from typing import List
      import pyspark.sql.types as T
      from pyspark.sql.functions import pandas_udf
      import numpy as np
      import pyarrow as pa
      import pandas as pd
      
      def large_pandas_udf(iterator):
          for pdf in iterator:
              big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)]
              strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
              arr = np.array(strings_list)
              yield pd.DataFrame(arr, columns=['string_column'])
      
      def large_pandas_workaround_udf(iterator):
          for pdf in iterator:
              big_unique_strings = ['x' * ((1 << 20) - 1) + str(i % 10) for i in range(10)]
              strings_list = [big_unique_strings[i % 10] for i in range(1 << 11)]
              arr = np.array(strings_list)
              num_chunks = 20
              chunk_size = len(arr) // 20
              for i in range(num_chunks+10):
                  yield pd.DataFrame(arr[i*chunk_size:(i+1)*chunk_size], columns=['string_column'])
      
      def run_udf(spark, udf=large_pandas_udf):
          schema = T.StructType([T.StructField("string_column", T.StringType())])
          spark.conf.set('spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled', 'false') # get full error traceback
          df = spark.createDataFrame([['v']], schema)
          return df.mapInPandas(udf, schema).count() 

       

      stacktrace:

      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
        File "<string>", line 30, in run_udf
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1234, in count
          return int(self._jdf.count())
                     ^^^^^^^^^^^^^^^^^
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in {}call{}
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
          raise converted from None
      pyspark.errors.exceptions.captured.PythonException:
        An exception was thrown from the Python worker. Please see the stack trace below.
      Traceback (most recent call last):
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
          process()
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
          serializer.dump_stream(out_iter, outfile)
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 470, in dump_stream
          return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 100, in dump_stream
          for batch in iterator:
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 464, in init_stream_yield_batches
          batch = self._create_batch(series)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 448, in _create_batch
          arrs.append(self._create_struct_array(s, t))
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "/Users/kosborne/.pyenv/versions/py11/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 410, in _create_struct_array
          return pa.StructArray.from_arrays(struct_arrs, struct_names)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        File "pyarrow/array.pxi", line 3211, in pyarrow.lib.StructArray.from_arrays
      TypeError: Expected Array, got <class 'pyarrow.lib.ChunkedArray'>

      Related github issue discussing this pyarrow array constructor behavior: https://github.com/apache/arrow/issues/34755

      Attachments

        Activity

          People

            Unassigned Unassigned
            kposborne Kyle Osborne
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: