Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6886

Big closure in PySpark will fail during shuffle

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.2.1, 1.3.0, 1.4.0
    • 1.2.3, 1.3.2, 1.4.0
    • PySpark
    • None

    Description

      Reported by beifei.zhou <beifei.zhou at ximalaya.com>:

      I am using spark to process bid datasets. However, there is always problem when executing reduceByKey on a large dataset, whereas with a smaller dataset. May I asked you how could I solve this issue?

      The error is always like this:

      15/04/09 11:27:46 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 5)
      org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/Users/nali/Softwares/spark/python/pyspark/worker.py", line 90, in main
          command = pickleSer.loads(command.value)
        File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 106, in value
          self._value = self.load(self._path)
        File "/Users/nali/Softwares/spark/python/pyspark/broadcast.py", line 87, in load
          with open(path, 'rb', 1 << 20) as f:
      IOError: [Errno 2] No such file or directory: '/private/var/folders/_x/n59vb1b54pl96lvldz2lr_v40000gn/T/spark-37d8ecbc-9ac9-4aa2-be23-12823f4cd1ed/pyspark-1e3d5904-a5b6-4222-a146-91bfdb4a33a7/tmp8XMhgG'
      

      Here I attach my code:

      import codecs
      from pyspark import SparkContext, SparkConf
      from operator import add 
      import operator
      from pyspark.storagelevel import StorageLevel
      
      def combine_dict(a,b):
          a.update(b)
          return a
      conf = SparkConf()
      sc = SparkContext(appName = "tag")
      al_tag_dict = sc.textFile('albumtag.txt').map(lambda x: x.split(',')).map(lambda x: {x[0]: x[1:]}).reduce(lambda a, b: combine_dict(a,b))
      
      result = sc.textFile('uidAlbumscore.txt')\
              .map(lambda x: x.split(','))\
              .filter(lambda x: x[1] in al_tag_dict.keys())\
              .map(lambda x: (x[0], al_tag_dict[x[1]], float(x[2])))\
              .map(lambda x: map(lambda a: ((x[0], a), x[2]), x[1]))\
              .flatMap(lambda x: x)\ 
              .map(lambda x: (str(x[0][0]), x[1]))\
              .reduceByKey(add)\
      #        .map(lambda x: x[0][0]+','+x[0][1]+','+str(x[1])+'\n')\
      #        .reduce(add)
      #codecs.open('tag_score.txt','w','utf-8').write(result)
      print result.first()
      

      Attachments

        Activity

          People

            davies Davies Liu
            davies Davies Liu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: