Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25144

distinct on Dataset leads to exception due to Managed memory leak detected

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.2, 2.1.3, 2.2.2, 2.3.1, 2.3.2
    • Fix Version/s: 2.2.3, 2.3.2
    • Component/s: SQL
    • Labels:
      None

      Description

      The following code example: 

      case class Foo(bar: Option[String])
      val ds = List(Foo(Some("bar"))).toDS
      val result = ds.flatMap(_.bar).distinct
      result.rdd.isEmpty
      

      Produces the following stacktrace

      [info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 7.0 failed 1 times, most recent failure: Lost task 42.0 in stage 7.0 (TID 125, localhost, executor driver): org.apache.spark.SparkException: Managed memory leak detected; size = 16777216 bytes, TID = 125
      [info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358)
      [info] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      [info] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      [info] 	at java.lang.Thread.run(Thread.java:748)
      [info] 
      [info] Driver stacktrace:
      [info]   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
      [info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
      [info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
      [info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      [info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
      [info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
      [info]   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
      [info]   at scala.Option.foreach(Option.scala:257)
      [info]   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
      [info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
      [info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
      [info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
      [info]   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      [info]   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
      [info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
      [info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
      [info]   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
      [info]   at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1358)
      [info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      [info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      [info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
      [info]   at org.apache.spark.rdd.RDD.take(RDD.scala:1331)
      [info]   at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1466)
      [info]   at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1466)
      [info]   at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1466)
      [info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      [info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      [info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
      [info]   at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1465)
      
      

      The code example doesn't produce any error when `distinct` function is not called.

        Attachments

          Activity

            People

            • Assignee:
              dongjoon Dongjoon Hyun
              Reporter:
              Ayoub Ayoub Benali
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: