Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
2.4.0
-
None
Description
Running a barrier job after a normal spark job causes the barrier job to run without a BarrierTaskContext. Here is some code to reproduce.
def task(*args): from pyspark import BarrierTaskContext context = BarrierTaskContext.get() context.barrier() print("in barrier phase") context.barrier() return [] a = sc.parallelize(list(range(4))).map(lambda x: x ** 2).collect() assert a == [0, 1, 4, 9] b = sc.parallelize(list(range(4)), 4).barrier().mapPartitions(task).collect()
Here is some of the trace
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(14, 0) finished unsuccessfully. org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 372, in main process() File "/databricks/spark/python/pyspark/worker.py", line 367, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/databricks/spark/python/pyspark/rdd.py", line 2482, in func return f(iterator) File "<command-717066991496742>", line 4, in task AttributeError: 'TaskContext' object has no attribute 'barrier'