Description
When using a PySpark RDD local iterator and an error occurs on the worker, it is not picked up by Py4J and raised in the Python driver process. So unless looking at logs, there is no way for the application to know the worker had an error. This is a test that should pass if the error is raised in the driver:
def test_to_local_iterator_error(self): def fail(_): raise RuntimeError("local iterator error") rdd = self.sc.parallelize(range(10)).map(fail) with self.assertRaisesRegexp(Exception, "local iterator error"): for _ in rdd.toLocalIterator(): pass
but it does not raise an exception:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 428, in main process() File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 423, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 505, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper return f(*args, **kwargs) File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 742, in fail raise RuntimeError("local iterator error") RuntimeError: local iterator error at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:453) ... java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) FAIL ====================================================================== FAIL: test_to_local_iterator_error (pyspark.tests.test_rdd.RDDTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 748, in test_to_local_iterator_error pass AssertionError: Exception not raised
Attachments
Issue Links
- is related to
-
SPARK-23961 pyspark toLocalIterator throws an exception
- Resolved
- relates to
-
SPARK-27992 PySpark socket server should sync with JVM connection thread future
- Resolved
- links to