Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Won't Fix
-
1.1.0
-
None
-
None
Description
Users often have a single RDD of key-value pairs that they want to save to multiple locations based on the keys.
For example, say I have an RDD like this:
>>> a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) >>> a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] >>> a.keys().distinct().collect() ['B', 'F', 'N']
Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple part- files, one per RDD partition.
So the output would look something like:
/path/prefix/B [/part-1, /part-2, etc] /path/prefix/F [/part-1, /part-2, etc] /path/prefix/N [/part-1, /part-2, etc]
Though it may be possible to do this with some combination of saveAsNewAPIHadoopFile(), saveAsHadoopFile(), and the MultipleTextOutputFormat output format class, it isn't straightforward. It's not clear if it's even possible at all in PySpark.
Please add a saveAsTextFileByKey() method or something similar to RDDs that makes it easy to save RDDs out to multiple locations at once.
—
Update: March 2016
There are two workarounds to this problem:
1. See this answer on Stack Overflow, which implements MultipleTextOutputFormat. (Scala-only)
2. See this comment by Davies Liu, which uses DataFrames:
val df = rdd.map(t => Row(gen_key(t), t)).toDF("key", "text") df.write.partitionBy("key").text(path)