Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-791

[pyspark] operator.getattr not serialized

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 0.7.2, 0.9.0, 1.0.0
    • Fix Version/s: 0.9.3, 1.0.3, 1.1.0
    • Component/s: PySpark
    • Labels:
      None
    • Target Version/s:

      Description

      Using operator.itemgetter as a function in map seems to confuse the serialization process in pyspark. I'm using itemgetter to return tuples, which fails with a TypeError (details below). Using an equivalent lambda function returns the correct result.

      Use a test file:

      echo 1,1 > test.txt
      

      Then try mapping it to a tuple:

      import csv
      sc.textFile("test.txt").mapPartitions(csv.reader).map(lambda l: (l[0],l[1])).first()
      Out[7]: ('1', '1')
      

      But this does not work when using operator.itemgetter:

      import operator
      sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first()
      # TypeError: list indices must be integers, not tuple
      

      This is running with git master, commit 6d60fe571a405eb9306a2be1817901316a46f892
      IPython 0.13.2
      java version "1.7.0_25"
      Scala code runner version 2.9.1
      Ubuntu 12.04

      Full debug output:

      In [9]: sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first()
      13/07/04 16:19:49 INFO storage.MemoryStore: ensureFreeSpace(33632) called with curMem=201792, maxMem=339585269
      13/07/04 16:19:49 INFO storage.MemoryStore: Block broadcast_6 stored as values to memory (estimated size 32.8 KB, free 323.6 MB)
      13/07/04 16:19:49 INFO mapred.FileInputFormat: Total input paths to process : 1
      13/07/04 16:19:49 INFO spark.SparkContext: Starting job: takePartition at NativeMethodAccessorImpl.java:-2
      13/07/04 16:19:49 INFO scheduler.DAGScheduler: Got job 4 (takePartition at NativeMethodAccessorImpl.java:-2) with 1 output partitions (allowLocal=true)
      13/07/04 16:19:49 INFO scheduler.DAGScheduler: Final stage: Stage 4 (PythonRDD at NativeConstructorAccessorImpl.java:-2)
      13/07/04 16:19:49 INFO scheduler.DAGScheduler: Parents of final stage: List()
      13/07/04 16:19:49 INFO scheduler.DAGScheduler: Missing parents: List()
      13/07/04 16:19:49 INFO scheduler.DAGScheduler: Computing the requested partition locally
      13/07/04 16:19:49 INFO scheduler.DAGScheduler: Failed to run takePartition at NativeMethodAccessorImpl.java:-2
      ---------------------------------------------------------------------------
      Py4JJavaError                             Traceback (most recent call last)
      <ipython-input-9-1fdb3e7a8ac7> in <module>()
      ----> 1 sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first()
      
      /home/jim/src/spark/python/pyspark/rdd.pyc in first(self)
          389         2
          390         """
      --> 391         return self.take(1)[0]
          392 
          393     def saveAsTextFile(self, path):
      
      /home/jim/src/spark/python/pyspark/rdd.pyc in take(self, num)
          372         items = []
          373         for partition in range(self._jrdd.splits().size()):
      --> 374             iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
          375             # Each item in the iterator is a string, Python object, batch of
          376             # Python objects.  Regardless, it is sufficient to take `num`
      
      /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/java_gateway.pyc in __call__(self, *args)
          498         answer = self.gateway_client.send_command(command)
          499         return_value = get_return_value(answer, self.gateway_client,
      --> 500                 self.target_id, self.name)
          501 
          502         for temp_arg in temp_args:
      
      /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
          298                 raise Py4JJavaError(
          299                     'An error occurred while calling {0}{1}{2}.\n'.
      --> 300                     format(target_id, '.', name), value)
          301             else:
          302                 raise Py4JError(
      
      Py4JJavaError: An error occurred while calling z:spark.api.python.PythonRDD.takePartition.
      : spark.api.python.PythonException: Traceback (most recent call last):
        File "/home/jim/src/spark/python/pyspark/worker.py", line 53, in main
          for obj in func(split_index, iterator):
        File "/home/jim/src/spark/python/pyspark/serializers.py", line 24, in batched
          for item in iterator:
      TypeError: list indices must be integers, not tuple
      
      	at spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:117)
      	at spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:139)
      	at spark.api.python.PythonRDD.compute(PythonRDD.scala:82)
      	at spark.RDD.computeOrReadCheckpoint(RDD.scala:232)
      	at spark.RDD.iterator(RDD.scala:221)
      	at spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:423)
      	at spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:410)
      

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              davies Davies Liu
              Reporter:
              jblomo Jim Blomo

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment