Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10542

The PySpark 1.5 closure serializer can't serialize a namedtuple instance.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.5.0
    • 1.5.1, 1.6.0
    • PySpark
    • None

    Description

      Code to Reproduce Bug:

      from collections import namedtuple
      PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
      rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
      rdd.count()
      

      Error message on Spark 1.5:

      AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
      ---------------------------------------------------------------------------
      AttributeError                            Traceback (most recent call last)
      <ipython-input-5-59448e31019f> in <module>()
            2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"])
            3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0))
      ----> 4 rdd.count()
      
      /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self)
         1004         3
         1005         """
      -> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
         1007 
         1008     def stats(self):
      
      /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self)
          995         6.0
          996         """
      --> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
          998 
          999     def count(self):
      
      /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
          869         # zeroValue provided to each partition is unique from the one provided
          870         # to the final reduce call
      --> 871         vals = self.mapPartitions(func).collect()
          872         return reduce(op, vals, zeroValue)
          873 
      
      /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self)
          771         """
          772         with SCCallSiteSync(self.context) as css:
      --> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
          774         return list(_load_from_socket(port, self._jrdd_deserializer))
          775 
      
      /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self)
         2383         command = (self.func, profiler, self._prev_jrdd_deserializer,
         2384                    self._jrdd_deserializer)
      -> 2385         pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
         2386         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
         2387                                              bytearray(pickled_cmd),
      
      /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
         2303     # the serialized command will be compressed by broadcast
         2304     ser = CloudPickleSerializer()
      -> 2305     pickled_command = ser.dumps(command)
         2306     if len(pickled_command) > (1 << 20):  # 1M
         2307         # The broadcast will have same life cycle as created PythonRDD
      
      /home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj)
          425 
          426     def dumps(self, obj):
      --> 427         return cloudpickle.dumps(obj, 2)
          428 
          429 
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol)
          639 
          640     cp = CloudPickler(file,protocol)
      --> 641     cp.dump(obj)
          642 
          643     return file.getvalue()
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj)
          105         self.inject_addons()
          106         try:
      --> 107             return Pickler.dump(self, obj)
          108         except RuntimeError as e:
          109             if 'recursion' in e.args[0]:
      
      /usr/lib/python2.7/pickle.pyc in dump(self, obj)
          222         if self.proto >= 2:
          223             self.write(PROTO + chr(self.proto))
      --> 224         self.save(obj)
          225         self.write(STOP)
          226 
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      
      /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
          560         write(MARK)
          561         for element in obj:
      --> 562             save(element)
          563 
          564         if id(obj) in memo:
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      ... skipped 23125 bytes ...
          650 
          651     dispatch[DictionaryType] = save_dict
      
      /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
          684                 k, v = tmp[0]
          685                 save(k)
      --> 686                 save(v)
          687                 write(SETITEM)
          688             # else tmp is empty, and we're done
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_global(self, obj, name, pack)
          367                     v = v.__func__
          368                 dd[k] = v
      --> 369             self.save(dd)
          370             self.write(pickle.TUPLE2)
          371             self.write(pickle.REDUCE)
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      
      /usr/lib/python2.7/pickle.pyc in save_dict(self, obj)
          647 
          648         self.memoize(obj)
      --> 649         self._batch_setitems(obj.iteritems())
          650 
          651     dispatch[DictionaryType] = save_dict
      
      /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items)
          679                 for k, v in tmp:
          680                     save(k)
      --> 681                     save(v)
          682                 write(SETITEMS)
          683             elif n:
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
          191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
          192             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
      --> 193             self.save_function_tuple(obj)
          194             return
          195         else:
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func)
          240         # save the rest of the func data needed by _fill_function
          241         save(f_globals)
      --> 242         save(defaults)
          243         save(dct)
          244         write(pickle.TUPLE)
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      
      /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj)
          546         if n <= 3 and proto >= 2:
          547             for element in obj:
      --> 548                 save(element)
          549             # Subtle.  Same as in the big comment below.
          550             if id(obj) in memo:
      
      /usr/lib/python2.7/pickle.pyc in save(self, obj)
          284         f = self.dispatch.get(t)
          285         if f:
      --> 286             f(self, obj) # Call unbound method with explicit self
          287             return
          288 
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_builtin_function(self, obj)
          313         if obj.__module__ is "__builtin__":
          314             return self.save_global(obj)
      --> 315         return self.save_function(obj)
          316     dispatch[types.BuiltinFunctionType] = save_builtin_function
          317 
      
      /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name)
          189         # we'll pickle the actual function object rather than simply saving a
          190         # reference (as is done in default pickler), via save_function_tuple.
      --> 191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
          192             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
          193             self.save_function_tuple(obj)
      
      AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
      

      Attachments

        Issue Links

          Activity

            People

              davies Davies Liu
              davies Davies Liu
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: