Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Cannot Reproduce
-
0.8.0
-
None
Description
Spark appears to build up a backlog of unreachable byte arrays when an RDD is constructed from a sequence file, and then that RDD is sorted.
I have a class that wraps a Java ArrayList, that can be serialized and written to a Hadoop SequenceFile (I.e. Implements the Writable interface). Let's call it WritableDataRow. It can take a Java List as its argument to wrap around, and also has a copy constructor.
Setup: 10 slaves, launched via EC2, 65.9GB RAM each, dataset is 100GB of text, 120GB when in sequence file format (not using compression to compact the bytes). CDH4.2.0-backed hadoop cluster.
First, building the RDD from a CSV and then sorting on index 1 works fine:
scala> import scala.collection.JavaConversions._ // Other imports here as well import scala.collection.JavaConversions._ scala> val rddAsTextFile = sc.textFile("s3n://some-bucket/events-*.csv") rddAsTextFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14 scala> val rddAsWritableDataRows = rddAsTextFile.map(x => new WritableDataRow(x.split("\\|").toList)) rddAsWritableDataRows: org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] = MappedRDD[2] at map at <console>:19 scala> val rddAsKeyedWritableDataRows = rddAsWritableDataRows.map(x => (x.getContents().get(1).toString(), x)); rddAsKeyedWritableDataRows: org.apache.spark.rdd.RDD[(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = MappedRDD[4] at map at <console>:22 scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, WritableDataRow)](rddAsKeyedWritableDataRows) orderedFunct: org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = org.apache.spark.rdd.OrderedRDDFunctions@587acb54 scala> orderedFunct.sortByKey(true).count(); // Actually triggers the computation, as stated in a different e-mail thread res0: org.apache.spark.rdd.RDD[(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = MapPartitionsRDD[8] at sortByKey at <console>:27
The above works without too many surprises. I then save it as a Sequence File (using JavaPairRDD as a way to more easily call saveAsHadoopFile(), and this is how it's done in our Java-based application):
scala> val pairRDD = new JavaPairRDD(rddAsWritableDataRows.map(x => (NullWritable.get(), x))); pairRDD: org.apache.spark.api.java.JavaPairRDD[org.apache.hadoop.io.NullWritable,com.palantir.finance.datatable.server.spark.WritableDataRow] = org.apache.spark.api.java.JavaPairRDD@8d2e9d9 scala> pairRDD.saveAsHadoopFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], classOf[WritableDataRow], classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[NullWritable, WritableDataRow]]); … 2013-12-11 20:09:14,444 [main] INFO org.apache.spark.SparkContext - Job finished: saveAsHadoopFile at <console>:26, took 1052.116712748 s
And now I want to get the RDD from the sequence file and sort THAT, and this is when I monitor Ganglia and "ps aux" and notice the memory usage climbing ridiculously:
scala> val rddAsSequenceFile = sc.sequenceFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], classOf[WritableDataRow]).map(x => new WritableDataRow(x._2)); // Invokes copy constructor to get around re-use of writable objects rddAsSequenceFile: org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] = MappedRDD[19] at map at <console>:19 scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, WritableDataRow)](rddAsSequenceFile.map(x => (x.getContents().get(1).toString(), x))) orderedFunct: org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = org.apache.spark.rdd.OrderedRDDFunctions@6262a9a6 scala>orderedFunct.sortByKey().count();
(On the necessity to copy writables from hadoop RDDs, see: https://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3CCAF_KkPzrq4OTyQVwcOC6pLAz9X9_SFo33u4ySatki5PTqoYEDA@mail.gmail.com%3E )
I got a memory dump from one worker node but can't share it. I've attached a screenshot from YourKit. At the point where around 5GB of RAM is being used on the worker, 3GB of unreachable byte arrays have accumulated. Furthermore, they're all exactly the same size, and seem to be the same size as most of the byte arrays that are strong reachable. The strong reachable byte arrays are referenced from output streams in the block output writers.
Let me know if you require any more information. Thanks.