Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21439

Cannot use Spark with Python ABCmeta (exception from cloudpickle)

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 2.1.1
    • Fix Version/s: 2.3.0
    • Component/s: PySpark, Spark Core
    • Labels:
      None

      Description

      I'm trying to use code with ABCMeta.
      This code gives exception as a result.

      from abc import ABCMeta, abstractmethod
      class A(metaclass=ABCMeta):
          @abstractmethod
          def x(self):
              """Abstract"""
              
      class B(A):
          def x(self):
              return 10
      
      b = B()
      
      sc.range(10).map(lambda x: b.x()).collect()
      

      Exception:

      ---------------------------------------------------------------------------
      AttributeError                            Traceback (most recent call last)
      /opt/spark/python/pyspark/cloudpickle.py in dump(self, obj)
          146         try:
      --> 147             return Pickler.dump(self, obj)
          148         except RuntimeError as e:
      
      /usr/lib/python3.4/pickle.py in dump(self, obj)
          409             self.framer.start_framing()
      --> 410         self.save(obj)
          411         self.write(STOP)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
          741         for element in obj:
      --> 742             save(element)
          743 
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
          253             if klass is None or klass is not obj:
      --> 254                 self.save_function_tuple(obj)
          255                 return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
          290         save(_make_skel_func)
      --> 291         save((code, closure, base_globals))
          292         write(pickle.REDUCE)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
          726             for element in obj:
      --> 727                 save(element)
          728             # Subtle.  Same as in the big comment below.
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_list(self, obj)
          771         self.memoize(obj)
      --> 772         self._batch_appends(obj)
          773 
      
      /usr/lib/python3.4/pickle.py in _batch_appends(self, items)
          795                 for x in tmp:
      --> 796                     save(x)
          797                 write(APPENDS)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
          253             if klass is None or klass is not obj:
      --> 254                 self.save_function_tuple(obj)
          255                 return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
          290         save(_make_skel_func)
      --> 291         save((code, closure, base_globals))
          292         write(pickle.REDUCE)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
          726             for element in obj:
      --> 727                 save(element)
          728             # Subtle.  Same as in the big comment below.
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_list(self, obj)
          771         self.memoize(obj)
      --> 772         self._batch_appends(obj)
          773 
      
      /usr/lib/python3.4/pickle.py in _batch_appends(self, items)
          798             elif n:
      --> 799                 save(tmp[0])
          800                 write(APPEND)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
          247             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
      --> 248             self.save_function_tuple(obj)
          249             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
          295         # save the rest of the func data needed by _fill_function
      --> 296         save(f_globals)
          297         save(defaults)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_dict(self, obj)
          811         self.memoize(obj)
      --> 812         self._batch_setitems(obj.items())
          813 
      
      /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
          842                 save(k)
      --> 843                 save(v)
          844                 write(SETITEM)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          521         # Save the reduce() output and finally memoize the object
      --> 522         self.save_reduce(obj=obj, *rv)
          523 
      
      /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
          565             args = args[1:]
      --> 566             save(cls)
          567 
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          490             if issc:
      --> 491                 self.save_global(obj)
          492                 return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
          415             self.save(_load_class)
      --> 416             self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj)
          417             d.pop('__doc__', None)
      
      /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
          580             save(func)
      --> 581             save(args)
          582             write(pickle.REDUCE)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
          726             for element in obj:
      --> 727                 save(element)
          728             # Subtle.  Same as in the big comment below.
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
          726             for element in obj:
      --> 727                 save(element)
          728             # Subtle.  Same as in the big comment below.
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          490             if issc:
      --> 491                 self.save_global(obj)
          492                 return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
          430                 dd[k] = v
      --> 431             self.save(dd)
          432             self.write(pickle.TUPLE2)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_dict(self, obj)
          811         self.memoize(obj)
      --> 812         self._batch_setitems(obj.items())
          813 
      
      /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
          837                     save(k)
      --> 838                     save(v)
          839                 write(SETITEMS)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          521         # Save the reduce() output and finally memoize the object
      --> 522         self.save_reduce(obj=obj, *rv)
          523 
      
      /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
          598         if state is not None:
      --> 599             save(state)
          600             write(pickle.BUILD)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_dict(self, obj)
          811         self.memoize(obj)
      --> 812         self._batch_setitems(obj.items())
          813 
      
      /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
          837                     save(k)
      --> 838                     save(v)
          839                 write(SETITEMS)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
          253             if klass is None or klass is not obj:
      --> 254                 self.save_function_tuple(obj)
          255                 return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
          296         save(f_globals)
      --> 297         save(defaults)
          298         save(dct)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_tuple(self, obj)
          726             for element in obj:
      --> 727                 save(element)
          728             # Subtle.  Same as in the big comment below.
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          521         # Save the reduce() output and finally memoize the object
      --> 522         self.save_reduce(obj=obj, *rv)
          523 
      
      /opt/spark/python/pyspark/cloudpickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
          565             args = args[1:]
      --> 566             save(cls)
          567 
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_global(self, obj, name, pack)
          430                 dd[k] = v
      --> 431             self.save(dd)
          432             self.write(pickle.TUPLE2)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /usr/lib/python3.4/pickle.py in save_dict(self, obj)
          811         self.memoize(obj)
      --> 812         self._batch_setitems(obj.items())
          813 
      
      /usr/lib/python3.4/pickle.py in _batch_setitems(self, items)
          837                     save(k)
      --> 838                     save(v)
          839                 write(SETITEMS)
      
      /usr/lib/python3.4/pickle.py in save(self, obj, save_persistent_id)
          476         if f is not None:
      --> 477             f(self, obj) # Call unbound method with explicit self
          478             return
      
      /opt/spark/python/pyspark/cloudpickle.py in save_builtin_function(self, obj)
          366             return self.save_global(obj)
      --> 367         return self.save_function(obj)
          368     dispatch[types.BuiltinFunctionType] = save_builtin_function
      
      /opt/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
          245         # reference (as is done in default pickler), via save_function_tuple.
      --> 246         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
          247             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
      
      AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
      
      During handling of the above exception, another exception occurred:
      
      AttributeError                            Traceback (most recent call last)
      <ipython-input-8-9ea6e84ab4cc> in <module>()
      ----> 1 sc.range(10).map(lambda x: b.x()).collect()
      
      /opt/spark/python/pyspark/rdd.py in collect(self)
          806         """
          807         with SCCallSiteSync(self.context) as css:
      --> 808             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
          809         return list(_load_from_socket(port, self._jrdd_deserializer))
          810 
      
      /opt/spark/python/pyspark/rdd.py in _jrdd(self)
         2438 
         2439         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
      -> 2440                                       self._jrdd_deserializer, profiler)
         2441         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
         2442                                              self.preservesPartitioning)
      
      /opt/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
         2371     assert serializer, "serializer should not be empty"
         2372     command = (func, profiler, deserializer, serializer)
      -> 2373     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
         2374     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
         2375                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)
      
      /opt/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
         2357     # the serialized command will be compressed by broadcast
         2358     ser = CloudPickleSerializer()
      -> 2359     pickled_command = ser.dumps(command)
         2360     if len(pickled_command) > (1 << 20):  # 1M
         2361         # The broadcast will have same life cycle as created PythonRDD
      
      /opt/spark/python/pyspark/serializers.py in dumps(self, obj)
          458 
          459     def dumps(self, obj):
      --> 460         return cloudpickle.dumps(obj, 2)
          461 
          462 
      
      /opt/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
          701 
          702     cp = CloudPickler(file,protocol)
      --> 703     cp.dump(obj)
          704 
          705     return file.getvalue()
      
      /opt/spark/python/pyspark/cloudpickle.py in dump(self, obj)
          153             raise
          154         except Exception as e:
      --> 155             if "'i' format requires" in e.message:
          156                 msg = "Object too large to serialize: " + e.message
          157             else:
      
      AttributeError: 'AttributeError' object has no attribute 'message'
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              maver1ck Maciej Bryński
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: