Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Duplicate
-
2.2.0
-
None
-
None
Description
We have a customer that uses Spark as an engine for running SQL on a collection of small datasets, typically no greater than a few thousand rows. Recently we started observing out-of-memory errors on some new workloads. Even though the datasets were only a few kilobytes, the job would almost immediately spike to > 10GB of memory usage, producing an out-of-memory error on the modest hardware (2 CPUs, 16 RAM) that is used. Using larger hardware and allocating more memory to Spark (4 CPUs, 32 RAM) made the job complete, but still with an unreasonable high memory usage.
The query involved was a left join on two datasets. In some, but not all, cases we were able to remove or reduce the problem by rewriting the query to use an exists sub-select instead. After a lot of debugging we were able to reproduce the problem locally with the following test:
case class Data(value: String) val session = SparkSession.builder.master("local[1]").getOrCreate() import session.implicits._ val foo = session.createDataset((1 to 500).map(i => Data(i.toString))) val bar = session.createDataset((1 to 1).map(i => Data(i.toString))) foo.persist(StorageLevel.MEMORY_ONLY) foo.createTempView("foo") bar.createTempView("bar") val result = session.sql("select * from bar left join foo on bar.value = foo.value") result.coalesce(2).collect()
Running this produces the error below:
java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:127) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:372) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:774) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:649) at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:198) at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:136) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
The exact failure point varies with the number of threads given to spark, the "coalesce" value and the number of rows in "foo". Using an inner join, removing the call to persist, removing the call to coalease (or using repartition) will all independently make the error go away.
The reason persist and coalesce are used in the workload at all is because it is part of a more general Spark-based processing engine, not limited to these small datasets. Therefore the workaround is not a simple as it may seem, since we cannot tailor the Spark code to this specific case.
Attachments
Issue Links
- duplicates
-
SPARK-21033 fix the potential OOM in UnsafeExternalSorter
- Resolved