Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3721

Broadcast Variables above 2GB break in PySpark

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: PySpark
    • Labels:
      None

      Description

      The bug displays 3 unique failure modes in PySpark, all of which seem to be related to broadcast variable size. Note that the tests below ran python 2.7.3 on all machines and used the Spark 1.1.0 binaries.

      *BLOCK 1* [no problem]

      import cPickle
      from pyspark import SparkContext
      
      def check_pre_serialized(size):
          msg = cPickle.dumps(range(2 ** size))
          print 'serialized length:', len(msg)
          bvar = sc.broadcast(msg)
          print 'length recovered from broadcast variable:', len(bvar.value)
          print 'correct value recovered:', msg == bvar.value
          bvar.unpersist()    
      
      def check_unserialized(size):
          msg = range(2 ** size)
          bvar = sc.broadcast(msg)
          print 'correct value recovered:', msg == bvar.value
          bvar.unpersist()
      
      SparkContext.setSystemProperty('spark.executor.memory', '15g')
      SparkContext.setSystemProperty('spark.cores.max', '5')
      sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'broadcast_bug')
      

      *BLOCK 2* [no problem]

      check_pre_serialized(20)
      > serialized length: 9374656
      > length recovered from broadcast variable: 9374656
      > correct value recovered: True
      

      *BLOCK 3* [no problem]

      check_unserialized(20)
      > correct value recovered: True
      

      *BLOCK 4* [no problem]

      check_pre_serialized(27)
      > serialized length: 1499501632
      > length recovered from broadcast variable: 1499501632
      > correct value recovered: True
      

      *BLOCK 5* [no problem]

      check_unserialized(27)
      > correct value recovered: True
      

      *BLOCK 6* *[ERROR 1: unhandled error from cPickle.dumps inside sc.broadcast]*

      check_pre_serialized(28)
      .....
      > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
      >     354
      >     355     def dumps(self, obj):
      > --> 356         return cPickle.dumps(obj, 2)
      >     357
      >     358     loads = cPickle.loads
      >
      > SystemError: error return without exception set
      

      *BLOCK 7* [no problem]

      check_unserialized(28)
      > correct value recovered: True
      

      *BLOCK 8* *[ERROR 2: no error occurs and *incorrect result* is returned]*

      check_pre_serialized(29)
      > serialized length: 6331339840
      > length recovered from broadcast variable: 2036372544
      > correct value recovered: False
      

      *BLOCK 9* *[ERROR 3: unhandled error from zlib.compress inside sc.broadcast]*

      check_unserialized(29)
      ......
      > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
      >     418 
      >     419     def dumps(self, obj):
      > --> 420         return zlib.compress(self.serializer.dumps(obj), 1)
      >     421 
      >     422     def loads(self, obj):
      > 
      > OverflowError: size does not fit in an int
      

      *BLOCK 10* [ERROR 1]

      check_pre_serialized(30)
      ...same as above...
      

      *BLOCK 11* [ERROR 3]

      check_unserialized(30)
      ...same as above...
      

        Attachments

          Activity

            People

            • Assignee:
              davies Davies Liu
              Reporter:
              bmiller1 Brad Miller
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: