Details
Description
Using operator.itemgetter as a function in map seems to confuse the serialization process in pyspark. I'm using itemgetter to return tuples, which fails with a TypeError (details below). Using an equivalent lambda function returns the correct result.
Use a test file:
echo 1,1 > test.txt
Then try mapping it to a tuple:
import csv sc.textFile("test.txt").mapPartitions(csv.reader).map(lambda l: (l[0],l[1])).first() Out[7]: ('1', '1')
But this does not work when using operator.itemgetter:
import operator sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first() # TypeError: list indices must be integers, not tuple
This is running with git master, commit 6d60fe571a405eb9306a2be1817901316a46f892
IPython 0.13.2
java version "1.7.0_25"
Scala code runner version 2.9.1
Ubuntu 12.04
Full debug output:
In [9]: sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first() 13/07/04 16:19:49 INFO storage.MemoryStore: ensureFreeSpace(33632) called with curMem=201792, maxMem=339585269 13/07/04 16:19:49 INFO storage.MemoryStore: Block broadcast_6 stored as values to memory (estimated size 32.8 KB, free 323.6 MB) 13/07/04 16:19:49 INFO mapred.FileInputFormat: Total input paths to process : 1 13/07/04 16:19:49 INFO spark.SparkContext: Starting job: takePartition at NativeMethodAccessorImpl.java:-2 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Got job 4 (takePartition at NativeMethodAccessorImpl.java:-2) with 1 output partitions (allowLocal=true) 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Final stage: Stage 4 (PythonRDD at NativeConstructorAccessorImpl.java:-2) 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Parents of final stage: List() 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Missing parents: List() 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Computing the requested partition locally 13/07/04 16:19:49 INFO scheduler.DAGScheduler: Failed to run takePartition at NativeMethodAccessorImpl.java:-2 --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-9-1fdb3e7a8ac7> in <module>() ----> 1 sc.textFile("test.txt").mapPartitions(csv.reader).map(operator.itemgetter(0,1)).first() /home/jim/src/spark/python/pyspark/rdd.pyc in first(self) 389 2 390 """ --> 391 return self.take(1)[0] 392 393 def saveAsTextFile(self, path): /home/jim/src/spark/python/pyspark/rdd.pyc in take(self, num) 372 items = [] 373 for partition in range(self._jrdd.splits().size()): --> 374 iterator = self.ctx._takePartition(self._jrdd.rdd(), partition) 375 # Each item in the iterator is a string, Python object, batch of 376 # Python objects. Regardless, it is sufficient to take `num` /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/java_gateway.pyc in __call__(self, *args) 498 answer = self.gateway_client.send_command(command) 499 return_value = get_return_value(answer, self.gateway_client, --> 500 self.target_id, self.name) 501 502 for temp_arg in temp_args: /home/jim/src/spark/python/lib/py4j0.7.egg/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:spark.api.python.PythonRDD.takePartition. : spark.api.python.PythonException: Traceback (most recent call last): File "/home/jim/src/spark/python/pyspark/worker.py", line 53, in main for obj in func(split_index, iterator): File "/home/jim/src/spark/python/pyspark/serializers.py", line 24, in batched for item in iterator: TypeError: list indices must be integers, not tuple at spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:117) at spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:139) at spark.api.python.PythonRDD.compute(PythonRDD.scala:82) at spark.RDD.computeOrReadCheckpoint(RDD.scala:232) at spark.RDD.iterator(RDD.scala:221) at spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.scala:423) at spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:410)
Attachments
Issue Links
- is duplicated by
-
SPARK-1091 Cloudpickle does not work correctly for some methods that use a splat
- Resolved