Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26658

Pyspark job is unable to serialize large objects

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 3.0.0
    • None
    • PySpark
    • None

    Description

      When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast.

      global object:

       

      Traceback (most recent call last):
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 235, in dump
          return Pickler.dump(self, obj)
        File "/home/var/python36/lib/python3.6/pickle.py", line 409, in dump
          self.save(obj)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
          save(element)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
          self.save_function_tuple(obj)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
          save(closure_values)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
          self._batch_appends(obj)
        File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
          save(x)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
          self.save_function_tuple(obj)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
          save(closure_values)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
          self._batch_appends(obj)
        File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
          save(x)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
          self.save_function_tuple(obj)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
          save(closure_values)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
          self._batch_appends(obj)
        File "/home/var/python36/lib/python3.6/pickle.py", line 805, in _batch_appends
          save(x)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 378, in save_function
          self.save_function_tuple(obj)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
          save(closure_values)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 781, in save_list
          self._batch_appends(obj)
        File "/home/var/python36/lib/python3.6/pickle.py", line 808, in _batch_appends
          save(tmp[0])
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 372, in save_function
          self.save_function_tuple(obj)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple
          save(f_globals)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
          self._batch_setitems(obj.items())
        File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems
          save(v)
        File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
          self.save_reduce(obj=obj, *rv)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
          save(state)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 821, in save_dict
          self._batch_setitems(obj.items())
        File "/home/var/python36/lib/python3.6/pickle.py", line 847, in _batch_setitems
          save(v)
        File "/home/var/python36/lib/python3.6/pickle.py", line 521, in save
          self.save_reduce(obj=obj, *rv)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
          save(state)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 751, in save_tuple
          save(element)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 693, in save_bytes
          (str(obj, 'latin1'), 'latin1'), obj=obj)
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/cloudpickle.py", line 786, in save_reduce
          save(args)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 736, in save_tuple
          save(element)
        File "/home/var/python36/lib/python3.6/pickle.py", line 476, in save
          f(self, obj) # Call unbound method with explicit self
        File "/home/var/python36/lib/python3.6/pickle.py", line 714, in save_str
          self.write(BINUNICODE + pack("<I", n) + encoded)
      struct.error: 'I' format requires 0 <= number <= 4294967295
      

       

       

       

      broadcast:

      Traceback (most recent call last):
       
      File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump
          pickle.dump(value, f, 2)
      OverflowError: cannot serialize a string larger than 4GiB
      Traceback (most recent call last):
        File "/home/pgandhi/pgandhi/spark-2.3/spark/python/pyspark/broadcast.py", line 113, in dump
          pickle.dump(value, f, 2)
      OverflowError: cannot serialize a string larger than 4GiB
      

       

       

      Steps to Reproduce:

       - Use python 3.x with module gensim installed(or ship the module zip file using --py-files).

      Launch pyspark with the following command:

      bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhead=16g --conf spark.executor.pyspark.memory=16g

      For the sake of reproducing the issue, I have simply pasted certain parts of the code here:

       

      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      19/01/17 21:16:46 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
      19/01/17 21:16:48 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /__ / .__/\_,_/_/ /_/\_\   version 2.3.3-SNAPSHOT
            /_/
       
      Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
      SparkSession available as 'spark'.
      >>> import gensim
      /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
        RequestsDependencyWarning)
      >>> score_threshold = 0.65
      >>> synonym_limit = 3
      >>> model = gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', binary=True)
      >>> def isPhrase(word):
      ...     if word.find('_') != -1 :
      ...         return 1
      ...     return 0
      ... 
      >>> def process_word(line):
      ...             word = "test"
      ...             positiveWords = []
      ...             positiveWords.append(word)
      ...             try :
      ...                results = model.most_similar(positive=positiveWords)
      ...                synonym_vec = []
      ...                for i in range(len(results)) :
      ...                   result = results[i]
      ...                   if (result[1] > score_threshold ) :
      ...                       synonym = result[0]
      ...                       synonym = synonym.lower()
      ...                       if (isPhrase(synonym)==0) and (word != synonym) :
      ...                           synonym_vec.append(synonym)
      ...                   if len(synonym_vec) > synonym_limit :
      ...                       break
      ...                if  len(synonym_vec) > 0 :
      ...                    #print(word +"\t"+ ",".join(synonym_vec))
      ...                    return (word, ",".join(synonym_vec))
      ...             except KeyError :
      ...                sys.stderr.write("key error: " + word + "\n")
      ... 
      >>> if __name__ == "__main__":
      ...   rdd = sc.parallelize(["test1", "test2", "test3"])
      ...   rdd2 = rdd.map(process_word)
      ...   rdd2.count()
      ...
      

       

      • For reproducing the issue with broadcast, simply run the code below in pyspark shell:

       

      Setting default log level to "WARN".
      To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
      19/01/17 19:31:10 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
      19/01/17 19:31:12 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /__ / .__/\_,_/_/ /_/\_\   version 2.3.3-SNAPSHOT
            /_/
       
      Using Python version 3.6.3 (default, Jun 19 2018 22:39:11)
      SparkSession available as 'spark'.
      >>> import gensim
      /home/var/python36/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.24.1) or chardet (3.0.4) doesn't match a supported version!
        RequestsDependencyWarning)
      >>> model = sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', binary=True))
      

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pgandhi Parth Gandhi
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: