Details
Description
Code to Reproduce Bug:
from collections import namedtuple PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"]) rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0)) rdd.count()
Error message on Spark 1.5:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__' --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-5-59448e31019f> in <module>() 2 PowerPlantRow=namedtuple("PowerPlantRow", ["AT", "V", "AP", "RH", "PE"]) 3 rdd=sc.parallelize([1]).map(lambda x: PowerPlantRow(1.0, 2.0, 3.0, 4.0, 5.0)) ----> 4 rdd.count() /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in count(self) 1004 3 1005 """ -> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1007 1008 def stats(self): /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in sum(self) 995 6.0 996 """ --> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 998 999 def count(self): /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in fold(self, zeroValue, op) 869 # zeroValue provided to each partition is unique from the one provided 870 # to the final reduce call --> 871 vals = self.mapPartitions(func).collect() 872 return reduce(op, vals, zeroValue) 873 /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in collect(self) 771 """ 772 with SCCallSiteSync(self.context) as css: --> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 774 return list(_load_from_socket(port, self._jrdd_deserializer)) 775 /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _jrdd(self) 2383 command = (self.func, profiler, self._prev_jrdd_deserializer, 2384 self._jrdd_deserializer) -> 2385 pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self) 2386 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 2387 bytearray(pickled_cmd), /home/ubuntu/databricks/spark/python/pyspark/rdd.pyc in _prepare_for_python_RDD(sc, command, obj) 2303 # the serialized command will be compressed by broadcast 2304 ser = CloudPickleSerializer() -> 2305 pickled_command = ser.dumps(command) 2306 if len(pickled_command) > (1 << 20): # 1M 2307 # The broadcast will have same life cycle as created PythonRDD /home/ubuntu/databricks/spark/python/pyspark/serializers.pyc in dumps(self, obj) 425 426 def dumps(self, obj): --> 427 return cloudpickle.dumps(obj, 2) 428 429 /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dumps(obj, protocol) 639 640 cp = CloudPickler(file,protocol) --> 641 cp.dump(obj) 642 643 return file.getvalue() /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in dump(self, obj) 105 self.inject_addons() 106 try: --> 107 return Pickler.dump(self, obj) 108 except RuntimeError as e: 109 if 'recursion' in e.args[0]: /usr/lib/python2.7/pickle.pyc in dump(self, obj) 222 if self.proto >= 2: 223 self.write(PROTO + chr(self.proto)) --> 224 self.save(obj) 225 self.write(STOP) 226 /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 560 write(MARK) 561 for element in obj: --> 562 save(element) 563 564 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 ... skipped 23125 bytes ... 650 651 dispatch[DictionaryType] = save_dict /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items) 684 k, v = tmp[0] 685 save(k) --> 686 save(v) 687 write(SETITEM) 688 # else tmp is empty, and we're done /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_global(self, obj, name, pack) 367 v = v.__func__ 368 dd[k] = v --> 369 self.save(dd) 370 self.write(pickle.TUPLE2) 371 self.write(pickle.REDUCE) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_dict(self, obj) 647 648 self.memoize(obj) --> 649 self._batch_setitems(obj.iteritems()) 650 651 dispatch[DictionaryType] = save_dict /usr/lib/python2.7/pickle.pyc in _batch_setitems(self, items) 679 for k, v in tmp: 680 save(k) --> 681 save(v) 682 write(SETITEMS) 683 elif n: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None: 192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule) --> 193 self.save_function_tuple(obj) 194 return 195 else: /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function_tuple(self, func) 240 # save the rest of the func data needed by _fill_function 241 save(f_globals) --> 242 save(defaults) 243 save(dct) 244 write(pickle.TUPLE) /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /usr/lib/python2.7/pickle.pyc in save_tuple(self, obj) 546 if n <= 3 and proto >= 2: 547 for element in obj: --> 548 save(element) 549 # Subtle. Same as in the big comment below. 550 if id(obj) in memo: /usr/lib/python2.7/pickle.pyc in save(self, obj) 284 f = self.dispatch.get(t) 285 if f: --> 286 f(self, obj) # Call unbound method with explicit self 287 return 288 /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_builtin_function(self, obj) 313 if obj.__module__ is "__builtin__": 314 return self.save_global(obj) --> 315 return self.save_function(obj) 316 dispatch[types.BuiltinFunctionType] = save_builtin_function 317 /home/ubuntu/databricks/spark/python/pyspark/cloudpickle.pyc in save_function(self, obj, name) 189 # we'll pickle the actual function object rather than simply saving a 190 # reference (as is done in default pickler), via save_function_tuple. --> 191 if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None: 192 #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule) 193 self.save_function_tuple(obj) AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
Attachments
Issue Links
- duplicates
-
SPARK-10544 Serialization of Python namedtuple subclasses in functions / closures is broken
- Closed
- links to