Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-1687

Support NamedTuples in RDDs

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0
    • 1.1.0
    • PySpark
    • None
    • Spark version 1.0.0-SNAPSHOT
      Python 2.7.5

    Description

      Add Support for NamedTuples in RDDs. Some sample code is below, followed by the current error that comes from it.

      Based on a quick conversation with ahirreddy, Dill might be a good solution here.

      In [26]: from collections import namedtuple
      ...
      In [33]: Person = namedtuple('Person', 'id firstName lastName')
      
      In [34]: jon = Person(1, "Jon", "Doe")
      
      In [35]: jane = Person(2, "Jane", "Doe")
      
      In [36]: theDoes = sc.parallelize((jon, jane))
      
      In [37]: theDoes.collect()
      Out[37]: 
      [Person(id=1, firstName='Jon', lastName='Doe'),
       Person(id=2, firstName='Jane', lastName='Doe')]
      
      In [38]: theDoes.count()
      PySpark worker failed with exception:
      PySpark worker failed with exception:
      Traceback (most recent call last):
        File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
          serializer.dump_stream(func(split_index, iterator), outfile)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
          def func(s, iterator): return f(iterator)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
          yield self._read_with_length(stream)
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
          return self.loads(obj)
      AttributeError: 'module' object has no attribute 'Person'
      
      Traceback (most recent call last):
        File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
          serializer.dump_stream(func(split_index, iterator), outfile)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
          def func(s, iterator): return f(iterator)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
          yield self._read_with_length(stream)
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
          return self.loads(obj)
      AttributeError: 'module' object has no attribute 'Person'
      
      14/04/30 14:43:53 ERROR Executor: Exception in task ID 23
      org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
          serializer.dump_stream(func(split_index, iterator), outfile)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
          def func(s, iterator): return f(iterator)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
          yield self._read_with_length(stream)
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
          return self.loads(obj)
      AttributeError: 'module' object has no attribute 'Person'
      
      	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
      	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
      	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
      	at org.apache.spark.scheduler.Task.run(Task.scala:51)
      	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
      	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:744)
      14/04/30 14:43:53 ERROR Executor: Exception in task ID 21
      org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
          serializer.dump_stream(func(split_index, iterator), outfile)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
          def func(s, iterator): return f(iterator)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
          yield self._read_with_length(stream)
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
          return self.loads(obj)
      AttributeError: 'module' object has no attribute 'Person'
      
      	at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
      	at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
      	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
      	at org.apache.spark.scheduler.Task.run(Task.scala:51)
      	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
      	at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:744)
      14/04/30 14:43:53 ERROR TaskSetManager: Task 5.0:3 failed 1 times; aborting job
      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-38-8689b264fa46> in <module>()
      ----> 1 theDoes.count()
      
      /Users/pat/Projects/spark/python/pyspark/rdd.pyc in count(self)
          706         3
          707         """
      --> 708         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
          709 
          710     def stats(self):
      
      /Users/pat/Projects/spark/python/pyspark/rdd.pyc in sum(self)
          697         6.0
          698         """
      --> 699         return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
          700 
          701     def count(self):
      
      /Users/pat/Projects/spark/python/pyspark/rdd.pyc in reduce(self, f)
          617             if acc is not None:
          618                 yield acc
      --> 619         vals = self.mapPartitions(func).collect()
          620         return reduce(f, vals)
          621 
      
      /Users/pat/Projects/spark/python/pyspark/rdd.pyc in collect(self)
          581         """
          582         with _JavaStackTrace(self.context) as st:
      --> 583           bytesInJava = self._jrdd.collect().iterator()
          584         return list(self._collect_iterator_through_file(bytesInJava))
          585 
      
      /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
          535         answer = self.gateway_client.send_command(command)
          536         return_value = get_return_value(answer, self.gateway_client,
      --> 537                 self.target_id, self.name)
          538 
          539         for temp_arg in temp_args:
      
      /Users/pat/Projects/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
          298                 raise Py4JJavaError(
          299                     'An error occurred while calling {0}{1}{2}.\n'.
      --> 300                     format(target_id, '.', name), value)
          301             else:
          302                 raise Py4JError(
      
      Py4JJavaError: An error occurred while calling o53.collect.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.0:3 failed 1 times, most recent failure: Exception failure in TID 23 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/Users/pat/Projects/spark/python/pyspark/worker.py", line 77, in main
          serializer.dump_stream(func(split_index, iterator), outfile)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 1373, in pipeline_func
          return func(split, prev_func(split, iterator))
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 283, in func
          def func(s, iterator): return f(iterator)
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <lambda>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/rdd.py", line 708, in <genexpr>
          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 129, in load_stream
          yield self._read_with_length(stream)
        File "/Users/pat/Projects/spark/python/pyspark/serializers.py", line 146, in _read_with_length
          return self.loads(obj)
      AttributeError: 'module' object has no attribute 'Person'
      
              org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:190)
              org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:214)
              org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:151)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
              org.apache.spark.scheduler.Task.run(Task.scala:51)
              org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:210)
              org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
              org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:175)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              java.lang.Thread.run(Thread.java:744)
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      

      Attachments

        Issue Links

          Activity

            People

              davies Davies Liu
              cheffpj Pat McDonough
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: