Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21721

Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.0.2, 2.1.1, 2.2.0
    • 2.1.2, 2.2.1, 2.3.0
    • SQL
    • None

    Description

      The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated.

      Below is a simple code to reproduce it.

      package test
      
      import org.apache.spark.sql.SparkSession
      import org.apache.hadoop.fs.Path
      import org.apache.hadoop.fs.FileSystem
      import org.apache.spark.sql.SaveMode
      import java.lang.reflect.Field
      
      
      
      case class PathLeakTest(id: Int, gp: String)
      
      object StagePathLeak {
      
        def main(args: Array[String]): Unit = {
      
          val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
          spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
          //create a partitioned table
          spark.sql("drop table if exists path_leak");
          spark.sql("create table if not exists path_leak(id int)" +
              " partitioned by (gp String)"+
            " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
            " stored as"+
              " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
              " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
      
          var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
          // 2 partitions
          for (x <- 1 to 2) {
            seq += (new PathLeakTest(x, "g" + x))
          }
          val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
      
          //insert 50 records to Hive table
          for (j <- 1 to 50) {
            val df = spark.createDataFrame(rdd)
            //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem deleteOnExit cache
            //#2 InsertIntoHiveTable line 385:  delete the path from disk but not from the FileSystem cache, and it caused the leak
            df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
          }
          
          val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
          val deleteOnExit = getDeleteOnExit(fs.getClass)
          deleteOnExit.setAccessible(true)
          val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
          //check FileSystem deleteOnExit cache size
          println(caches.size())
          val it = caches.iterator()
          //all starge pathes were still cached even they have already been deleted from the disk
          while(it.hasNext()){
            println(it.next());
          }
        }
        
        def getDeleteOnExit(cls: Class[_]) : Field = {
          try{
             return cls.getDeclaredField("deleteOnExit")
          }catch{
            case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
          }
          return null
        }
      
      }
      

      Attachments

        Activity

          People

            viirya L. C. Hsieh
            yzheng616 yzheng616
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: