Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-1001

Memory leak when reading sequence file and then sorting

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Cannot Reproduce
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: Shuffle, Spark Core
    • Labels:

      Description

      Spark appears to build up a backlog of unreachable byte arrays when an RDD is constructed from a sequence file, and then that RDD is sorted.

      I have a class that wraps a Java ArrayList, that can be serialized and written to a Hadoop SequenceFile (I.e. Implements the Writable interface). Let's call it WritableDataRow. It can take a Java List as its argument to wrap around, and also has a copy constructor.

      Setup: 10 slaves, launched via EC2, 65.9GB RAM each, dataset is 100GB of text, 120GB when in sequence file format (not using compression to compact the bytes). CDH4.2.0-backed hadoop cluster.

      First, building the RDD from a CSV and then sorting on index 1 works fine:

      scala> import scala.collection.JavaConversions._ // Other imports here as well
      import scala.collection.JavaConversions._
      
      scala> val rddAsTextFile = sc.textFile("s3n://some-bucket/events-*.csv")
      rddAsTextFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
      
      scala> val rddAsWritableDataRows = rddAsTextFile.map(x => new WritableDataRow(x.split("\\|").toList))
      rddAsWritableDataRows: org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] = MappedRDD[2] at map at <console>:19
      
      scala> val rddAsKeyedWritableDataRows = rddAsWritableDataRows.map(x => (x.getContents().get(1).toString(), x));
      rddAsKeyedWritableDataRows: org.apache.spark.rdd.RDD[(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = MappedRDD[4] at map at <console>:22
      
      scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, WritableDataRow)](rddAsKeyedWritableDataRows)
      orderedFunct: org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = org.apache.spark.rdd.OrderedRDDFunctions@587acb54
      
      scala> orderedFunct.sortByKey(true).count(); // Actually triggers the computation, as stated in a different e-mail thread
      res0: org.apache.spark.rdd.RDD[(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = MapPartitionsRDD[8] at sortByKey at <console>:27
      

      The above works without too many surprises. I then save it as a Sequence File (using JavaPairRDD as a way to more easily call saveAsHadoopFile(), and this is how it's done in our Java-based application):

      scala> val pairRDD = new JavaPairRDD(rddAsWritableDataRows.map(x => (NullWritable.get(), x)));
      pairRDD: org.apache.spark.api.java.JavaPairRDD[org.apache.hadoop.io.NullWritable,com.palantir.finance.datatable.server.spark.WritableDataRow] = org.apache.spark.api.java.JavaPairRDD@8d2e9d9
      
      scala> pairRDD.saveAsHadoopFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], classOf[WritableDataRow], classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[NullWritable, WritableDataRow]]);
      …
      2013-12-11 20:09:14,444 [main] INFO  org.apache.spark.SparkContext - Job finished: saveAsHadoopFile at <console>:26, took 1052.116712748 s
      

      And now I want to get the RDD from the sequence file and sort THAT, and this is when I monitor Ganglia and "ps aux" and notice the memory usage climbing ridiculously:

      scala> val rddAsSequenceFile = sc.sequenceFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], classOf[WritableDataRow]).map(x => new WritableDataRow(x._2)); // Invokes copy constructor to get around re-use of writable objects
      rddAsSequenceFile: org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] = MappedRDD[19] at map at <console>:19
      
      scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, WritableDataRow)](rddAsSequenceFile.map(x => (x.getContents().get(1).toString(), x)))
      orderedFunct: org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = org.apache.spark.rdd.OrderedRDDFunctions@6262a9a6
      
      scala>orderedFunct.sortByKey().count();
      

      (On the necessity to copy writables from hadoop RDDs, see: https://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3CCAF_KkPzrq4OTyQVwcOC6pLAz9X9_SFo33u4ySatki5PTqoYEDA@mail.gmail.com%3E )

      I got a memory dump from one worker node but can't share it. I've attached a screenshot from YourKit. At the point where around 5GB of RAM is being used on the worker, 3GB of unreachable byte arrays have accumulated. Furthermore, they're all exactly the same size, and seem to be the same size as most of the byte arrays that are strong reachable. The strong reachable byte arrays are referenced from output streams in the block output writers.

      Let me know if you require any more information. Thanks.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mccheah Matthew Cheah
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: