Details
Description
Hi,
I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my pyspark jobs using pandas udf are failing with java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15:
from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import BooleanType import pandas as pd @pandas_udf(BooleanType(), PandasUDFType.SCALAR) def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series: """ Apply simple quality cuts Returns ---------- out: pandas.Series of booleans Return a Pandas DataFrame with the appropriate flag: false for bad alert, and true for good alert. """ mask = nbad.values == 0 mask *= rb.values >= 0.55 mask *= abs(magdiff.values) <= 0.1 return pd.Series(mask) spark = SparkSession.builder.getOrCreate() # Create dummy DF colnames = ["nbad", "rb", "magdiff"] df = spark.sparkContext.parallelize( zip( [0, 1, 0, 0], [0.01, 0.02, 0.6, 0.01], [0.02, 0.05, 0.1, 0.01] ) ).toDF(colnames) df.show() # Apply cuts df = df\ .withColumn("toKeep", qualitycuts(*colnames))\ .filter("toKeep == true")\ .drop("toKeep") # This will fail if latest pyarrow 0.15.0 is used df.show()
and the log is:
Driver stacktrace: 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at NativeMethodAccessorImpl.java:0, took 0.660523 s Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 44, in <module> df.show() File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98) at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96) at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127) at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
I am not sure what is the root of this failure, but I note there is a ticket opened (https://issues.apache.org/jira/browse/ARROW-6429) suggesting some work ongoing on the Spark side.
I guess any user upgrading pyarrow would face the same error right away, and any help or feedback would be appreciated.
Thanks,
Julien
Attachments
Issue Links
- is related to
-
SPARK-29378 Upgrade SparkR to use Arrow 0.15 API
- Resolved
- links to