Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17381

Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.0.0
    • None
    • SQL
    • None
    • EMR 5.0.0 (submitted as yarn-client)
      Java Version 1.8.0_101 (Oracle Corporation)
      Scala Version version 2.11.8

      Problem also happens when I run locally with similar versions of java/scala. OS: Ubuntu 16.04

    Description

      I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems:
      1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before:
      https://issues.apache.org/jira/browse/SPARK-11192);

      To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below:

          val dstream = ssc.union(kinesisStreams)
          dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
            val toyDF = streamInfo.map(_ =>
              (1, "data","more data "
              ))
              .toDF("Num", "Data", "MoreData" )
            toyDF.agg(sum("Num")).first().get(0)
          }
          )
      

      2) huge amount of Array[Byte] (9Gb+)

      After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were being referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors, so they should never be in the driver at all!

      Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting.

      To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) to the data I see when I do a driver heap dump.

      I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and I could run my streaming app for a long period of time, but I think there will always be some performance lost.

      Attachments

        Activity

          People

            Unassigned Unassigned
            joaomaiaduarte Joao Duarte
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: