Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29367

pandas udf not working with latest pyarrow release (0.15.0)

    XMLWordPrintableJSON

    Details

    • Type: Documentation
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.4.0, 2.4.1, 2.4.3
    • Fix Version/s: 2.4.5, 3.0.0
    • Component/s: PySpark
    • Labels:
      None

      Description

      Hi,

      I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my pyspark jobs using pandas udf are failing with java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15:

      from pyspark.sql import SparkSession
      from pyspark.sql.functions import pandas_udf, PandasUDFType
      from pyspark.sql.types import BooleanType
      
      import pandas as pd
      
      @pandas_udf(BooleanType(), PandasUDFType.SCALAR)
      def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
          """ Apply simple quality cuts
      
          Returns
          ----------
          out: pandas.Series of booleans
          Return a Pandas DataFrame with the appropriate flag: false for bad alert,
              and true for good alert.
      
          """
          mask = nbad.values == 0
          mask *= rb.values >= 0.55
          mask *= abs(magdiff.values) <= 0.1
      
          return pd.Series(mask)
      
      
      spark = SparkSession.builder.getOrCreate()
      
      # Create dummy DF
      colnames = ["nbad", "rb", "magdiff"]
      df = spark.sparkContext.parallelize(
          zip(
              [0, 1, 0, 0],
              [0.01, 0.02, 0.6, 0.01],
              [0.02, 0.05, 0.1, 0.01]
          )
      ).toDF(colnames)
      
      df.show()
      
      # Apply cuts
      df = df\
          .withColumn("toKeep", qualitycuts(*colnames))\
          .filter("toKeep == true")\
          .drop("toKeep")
      
      # This will fail if latest pyarrow 0.15.0 is used
      df.show()
      

      and the log is:

      Driver stacktrace:
      19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at NativeMethodAccessorImpl.java:0, took 0.660523 s
      Traceback (most recent call last):
        File "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 44, in <module>
          df.show()
        File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show
        File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
        File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
      py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
      	at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
      	at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
      	at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
      	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
      	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
      	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
      	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
      	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
      	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
      	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
      	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
      	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
      	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      	at org.apache.spark.scheduler.Task.run(Task.scala:121)
      	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

      I am not sure what is the root of this failure, but I note there is a ticket opened (https://issues.apache.org/jira/browse/ARROW-6429) suggesting some work ongoing on the Spark side.

      I guess any user upgrading pyarrow would face the same error right away, and any help or feedback would be appreciated.

      Thanks,
      Julien

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                bryanc Bryan Cutler
                Reporter:
                JulienPeloton Julien Peloton
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: