Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22438

OutOfMemoryError on very small data sets

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Duplicate
    • 2.2.0
    • None
    • Spark Core
    • None

    Description

      We have a customer that uses Spark as an engine for running SQL on a collection of small datasets, typically no greater than a few thousand rows. Recently we started observing out-of-memory errors on some new workloads. Even though the datasets were only a few kilobytes, the job would almost immediately spike to > 10GB of memory usage, producing an out-of-memory error on the modest hardware (2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable high memory usage.

      The query involved was a left join on two datasets. In some, but not all, cases we were able to remove or reduce the problem by rewriting the query to use an exists sub-select instead. After a lot of debugging we were able to reproduce the problem locally with the following test:

      case class Data(value: String)
      
      val session = SparkSession.builder.master("local[1]").getOrCreate()
      import session.implicits._
      
      val foo = session.createDataset((1 to 500).map(i => Data(i.toString)))
      val bar = session.createDataset((1 to 1).map(i => Data(i.toString)))
      
      foo.persist(StorageLevel.MEMORY_ONLY)
      
      foo.createTempView("foo")
      bar.createTempView("bar")
      
      val result = session.sql("select * from bar left join foo on bar.value = foo.value")
      result.coalesce(2).collect()
      

      Running this produces the error below:

      java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
         at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127)
         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372)
         at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
         at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
         at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
         at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774)
         at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649)
         at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198)
         at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136)
         at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
         at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
         at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
         at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
         at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
         at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
         at org.apache.spark.scheduler.Task.run(Task.scala:108)
         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
         at java.lang.Thread.run(Thread.java:745)
      

      The exact failure point varies with the number of threads given to spark, the "coalesce" value and the number of rows in "foo". Using an inner join, removing the call to persist, removing the call to coalease (or using repartition) will all independently make the error go away.

      The reason persist and coalesce are used in the workload at all is because it is part of a more general Spark-based processing engine, not limited to these small datasets. Therefore the workaround is not a simple as it may seem, since we cannot tailor the Spark code to this specific case.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mhornbech Morten Hornbech
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: