Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-2590

[Python] Pyspark python_udf serialization error on grouped map (Amazon EMR)

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Cannot Reproduce
    • 0.9.0
    • 0.14.0
    • Python
    • Amazon EMR 5.13
      Spark 2.3.0
      PyArrow 0.9.0 (and 0.8.0)
      Pandas 0.22.0 (and 0.21.1)
      Numpy 1.14.1

    Description

      I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon EMR. When I try to run any aggregation, I get the following Python stack trace:

      18/05/16 14:08:56 ERROR Utils: Aborting task
      {{ org.apache.spark.api.python.PythonException: Traceback (most recent call last):}}
      {{ {{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py", line 229, in m}}}}
      {{ ain}}
      {{ {{ process()}}}}
      {{ {{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py", line 224, in p}}}}
      {{ rocess}}
      {{ {{ serializer.dump_stream(func(split_index, iterator), outfile)}}}}
      {{ {{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 261,}}}}
      {{ {{ in dump_stream}}}}
      {{ {{ batch = _create_batch(series, self._timezone)}}}}
      {{ {{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 239,}}}}
      {{ {{ in _create_batch}}}}
      {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
      {{ {{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 239,}}}}
      {{ {{ in <listcomp>}}}}
      {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
      {{ {{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 237, in create_array}}}}
      {{ {{ return pa.Array.from_pandas(s, mask=mask, type=t)}}}}
      {{ {{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}}}
      {{ {{ File "array.pxi", line 177, in pyarrow.lib.array}}}}
      {{ {{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}}}
      {{ {{ File "error.pxi", line 98, in pyarrow.lib.check_status}}}}
      {{ pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode bytes in position 0-3: code point not in range(0x110000)}}

      To be clear, this happens when I run any aggregation, including the identity aggregation (return the Pandas DataFrame that was passed in). I do not get this error when I return an empty DataFrame, so it seems to be a symptom of the serialization of the Pandas DataFrame back to Spark.

      I have observed this behavior with the following versions:

      • Spark 2.3.0
      • PyArrow 0.9.0 (also 0.8.0)
      • Pandas 0.22.0 (also 0.22.1)
      • Numpy 1.14.1

      Here is some sample code:

      @func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)

      def aggregation(df):

          return df

      df.groupBy('a').apply(aggregation) # get error

      Attachments

        Activity

          People

            Unassigned Unassigned
            dfithian Daniel Fithian
            Votes:
            2 Vote for this issue
            Watchers:
            14 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: