Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12717

pyspark broadcast fails when using multiple threads

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.6.0
    • 2.1.2, 2.2.1, 2.3.0
    • PySpark
    • None
    • Linux, python 2.6 or python 2.7.

    Description

      The following multi-threaded program that uses broadcast variables consistently throws exceptions like: Exception("Broadcast variable '18' not loaded!",) — even when run with "--master local[10]".

      bug_spark.py
      try:                                                                                                           
          import pyspark                                                                                             
      except:                                                                                                        
          pass                                                                                                       
      from optparse import OptionParser                                                                              
                                                                                                                     
      def my_option_parser():                                                                                        
          op = OptionParser()                                                                                        
          op.add_option("--parallelism", dest="parallelism", type="int", default=20)                                  
          return op                                                                                                  
                                                                                                                     
      def do_process(x, w):                                                                                          
          return x * w.value                                                                                         
                                                                                                                     
      def func(name, rdd, conf):                                                                                     
          new_rdd = rdd.map(lambda x :   do_process(x, conf))                                                        
          total = new_rdd.reduce(lambda x, y : x + y)                                                                
          count = rdd.count()                                                                                        
          print name, 1.0 * total / count                                                                            
                                                                                                                     
      if __name__ == "__main__":                                                                                     
          import threading                                                                                           
          op = my_option_parser()                                                                                    
          options, args = op.parse_args()                                                                            
          sc = pyspark.SparkContext(appName="Buggy")                                                                 
          data_rdd = sc.parallelize(range(0,1000), 1)                                                                
          confs = [ sc.broadcast(i) for i in xrange(options.parallelism) ]                                           
          threads = [ threading.Thread(target=func, args=["thread_" + str(i), data_rdd, confs[i]]) for i in xrange(options.parallelism) ]                                                                                          
          for t in threads:                                                                                          
              t.start()                                                                                              
          for t in threads:                                                                                          
              t.join() 
      

      Abridged run output:

      abridge_run.txt
      % spark-submit --master local[10] bug_spark.py --parallelism 20
      [snip]
      16/01/08 17:10:20 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 9)
      org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
          command = pickleSer._read_with_length(infile)
        File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
          return self.loads(obj)
        File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
          return pickle.loads(obj)
        File "/Network/Servers/mother.adverplex.com/Volumes/homeland/Users/walker/.spark/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id
          raise Exception("Broadcast variable '%s' not loaded!" % bid)
      Exception: (Exception("Broadcast variable '6' not loaded!",), <function _from_id at 0xce7a28>, (6L,))
      
      	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
      	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
      	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
      	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      [snip]
      

      Attachments

        1. run.log
          5.16 MB
          Srinivas

        Activity

          People

            bryanc Bryan Cutler
            efwalkermit Edward Walker
            Votes:
            4 Vote for this issue
            Watchers:
            16 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: