Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10821

RandomForest serialization OOM during findBestSplits

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Done
    • 1.4.0, 1.5.0
    • None
    • MLlib
    • Amazon EC2 Linux

    Description

      I am getting OOM during serialization for a relatively small dataset for a RandomForest. Even with spark.serializer.objectStreamReset at 1, It is still running out of memory when attempting to serialize my data.

      Stack Trace:
      Traceback (most recent call last):
      File "/root/random_forest/random_forest_spark.py", line 198, in <module>
      main()
      File "/root/random_forest/random_forest_spark.py", line 166, in main
      trainModel(dset)
      File "/root/random_forest/random_forest_spark.py", line 191, in trainModel
      impurity='gini', maxDepth=4, maxBins=32)
      File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 352, in trainClassifier
      File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/tree.py", line 270, in _train
      File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 130, in callMLlibFunc
      File "/root/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 123, in callJavaFunc
      File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in _call_
      File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
      py4j.protocol.Py4JJavaError15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Done removing RDD 7, response is 0
      15/09/25 00:44:41 DEBUG BlockManagerSlaveEndpoint: Sent response: 0 to AkkaRpcEndpointRef(Actor[akka://sparkDriver/temp/$Mj])
      : An error occurred while calling o89.trainRandomForestModel.
      : java.lang.OutOfMemoryError
      at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
      at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
      at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
      at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
      at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
      at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
      at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
      at org.apache.spark.mllib.tree.DecisionTree$.findBestSplits(DecisionTree.scala:625)
      at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:235)
      at org.apache.spark.mllib.tree.RandomForest$.trainClassifier(RandomForest.scala:291)
      at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRandomForestModel(PythonMLLibAPI.scala:742)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
      at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
      at py4j.Gateway.invoke(Gateway.java:259)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:207)
      at java.lang.Thread.run(Thread.java:745)

      Details:

      My RDD is type MLLIB LabeledPoint objects, with each holding sparse vectors inside. This RDD has a total size of roughly 45MB. My sparse vector has a total length of ~15 million while only about 3000 or so are non-zeros. Works fine for up to sparse vector size 10 million.

      My cluster is setup on AWS such that my master is a r3.8xlarge along with two r3.4xlarge workers. Driver has ~190GB allocated to it while my RDD is ~45MB.

      Configurations as follows:
      spark version: 1.5.0
      -----------------------------------
      spark.executor.memory 32000m
      spark.driver.memory 230000m
      spark.driver.cores 10
      spark.executor.cores 5
      spark.executor.instances 17
      spark.driver.maxResultSize 0
      spark.storage.safetyFraction 1
      spark.storage.memoryFraction 0.9
      spark.storage.shuffleFraction 0.05
      spark.default.parallelism 128
      spark.serializer.objectStreamReset 1

      My original code is in python which I tried on 1.4.0 and 1.5.0, so I thought that maybe running something in scala may resolve the problem. I wrote a toy scala example and tested it on the same system yielding the same errors. Note the test code will most likely eventually throw an error due to the fact certain features are always 0 and MLLIB currently errors out during this operation.

      Running the following using spark-shell with my spark configuration gives me the OOM:
      --------------------------------------------------------------------------
      import scala.util.Random
      import scala.collection.mutable.ArrayBuffer

      import org.apache.spark.mllib.tree.RandomForest
      import org.apache.spark.mllib.tree.model.RandomForestModel
      import org.apache.spark.mllib.util.MLUtils

      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.mllib.regression.LabeledPoint

      val r = Random

      var size = 15000000
      var count = 3000
      val indptr = (1 to size by size/count).toArray
      val data = Seq.fill(count)(r.nextDouble()).toArray

      var dset = ArrayBuffer[LabeledPoint]()
      for (i <- 1 to 10) {
      dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr, data));
      }

      val distData = sc.parallelize(dset)
      val splits = distData.randomSplit(Array(0.7, 0.3))
      val (trainingData, testData) = (splits(0), splits(1))

      // Train a RandomForest model.
      // Empty categoricalFeaturesInfo indicates all features are continuous.
      val numClasses = 2
      val categoricalFeaturesInfo = Map[Int, Int]()
      val numTrees = 3 // Use more in practice.
      val featureSubsetStrategy = "auto" // Let the algorithm choose.
      val impurity = "gini"
      val maxDepth = 4
      val maxBins = 32

      val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
      numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

      Attachments

        Activity

          People

            Unassigned Unassigned
            jluan Jay Luan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: