Details
Description
The following multi-threaded program that uses broadcast variables consistently throws exceptions like: Exception("Broadcast variable '18' not loaded!",) — even when run with "--master local[10]".
bug_spark.py
try: import pyspark except: pass from optparse import OptionParser def my_option_parser(): op = OptionParser() op.add_option("--parallelism", dest="parallelism", type="int", default=20) return op def do_process(x, w): return x * w.value def func(name, rdd, conf): new_rdd = rdd.map(lambda x : do_process(x, conf)) total = new_rdd.reduce(lambda x, y : x + y) count = rdd.count() print name, 1.0 * total / count if __name__ == "__main__": import threading op = my_option_parser() options, args = op.parse_args() sc = pyspark.SparkContext(appName="Buggy") data_rdd = sc.parallelize(range(0,1000), 1) confs = [ sc.broadcast(i) for i in xrange(options.parallelism) ] threads = [ threading.Thread(target=func, args=["thread_" + str(i), data_rdd, confs[i]]) for i in xrange(options.parallelism) ] for t in threads: t.start() for t in threads: t.join()
Abridged run output:
abridge_run.txt
% spark-submit --master local[10] bug_spark.py --parallelism 20 [snip] 16/01/08 17:10:20 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer._read_with_length(infile) File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length return self.loads(obj) File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads return pickle.loads(obj) File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id raise Exception("Broadcast variable '%s' not loaded!" % bid) Exception: (Exception("Broadcast variable '6' not loaded!",), <function _from_id at 0xce7a28>, (6L,)) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) [snip]