Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44768

Improve WSCG handling of row buffer by accounting for executor memory . Exploding nested arrays can easily lead to out of memory errors.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.2, 3.4.0, 3.4.1
    • None
    • Optimizer
    • None

    Description

      The code sample below is to showcase the wholestagecodegen generated when exploding nested arrays.  The data sample in the dataframe is quite small so it will not trigger the Out of Memory error . 

      However if the array is larger and the row size is large , you will definitely end up with an OOM error .  

       

      consider a scenario where you flatten  a nested array 

      // e.g you can use the following steps to create the dataframe 

      //create a partClass case class
      case class partClass (PARTNAME: String , PartNumber: String , PartPrice : Double )

      //create a nested array array class
      case  class array_array_class (
       col_int: Int,
       arr_arr_string : Seq[Seq[String]],
       arr_arr_bigint : Seq[Seq[Long]],
       col_string     : String,
       parts_s        : Seq[Seq[partClass]]
       
      )

      //create a dataframe
      var df_array_array = sc.parallelize(
       Seq(
       (1,Seq(Seq("a","b" ,"c" ,"d") ,Seq("aa","bb" ,"cc","dd")) , Seq(Seq(1000,20000), Seq(30000,-10000)),"ItemPart1",
        Seq(Seq(partClass("PNAME1","P1",20.75),partClass("PNAME1_1","P1_1",30.75)))
       ) ,
       
       (2,Seq(Seq("ab","bc" ,"cd" ,"de") ,Seq("aab","bbc" ,"ccd","dde"),Seq("aaaaaabbbbb")) , Seq(Seq(-1000,-20000,-1,-2), Seq(0,30000,-10000)),"ItemPart2",
        Seq(Seq(partClass("PNAME2","P2",170.75),partClass("PNAME2_1","P2_1",33.75),partClass("PNAME2_2","P2_2",73.75)))
       )
        
       )

      ).toDF("c1" ,"c2" ,"c3" ,"c4" ,"c5")

      //explode a nested array 

      var  result   =  df_array_array.select( col("c1"), explode(col("c2"))).select('c1 , explode('col))

      result.explain

       

      The physical for this operator is seen below.

      -------------------------------------
      Physical plan 

      == Physical Plan ==
      *(1) Generate explode(col#27), c1#17, false, col#30
      +- *(1) Filter ((size(col#27, true) > 0) AND isnotnull(col#27))
         +- *(1) Generate explode(c2#18), c1#17, false, col#27
            +- *(1) Project _1#6 AS c1#17, _2#7 AS c2#18
               +- *(1) Filter ((size(_2#7, true) > 0) AND isnotnull(_2#7))
                  +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._1 AS _1#6, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), StringType, ObjectType(class java.lang.String)), true, false, true), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), ArrayType(StringType,true), ObjectType(interface scala.collection.Seq)), None), knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._2, None) AS _2#7, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), assertnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), IntegerType, IntegerType)), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), ArrayType(IntegerType,false), ObjectType(interface scala.collection.Seq)), None), knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._3, None) AS _3#8, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._4, true, false, true) AS _4#9, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -5), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), if (isnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass)))) null else named_struct(PARTNAME, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PARTNAME, true, false, true), PartNumber, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PartNumber, true, false, true), PartPrice, knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PartPrice), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -5), ArrayType(StructType(StructField(PARTNAME,StringType,true),StructField(PartNumber,StringType,true),StructField(PartPrice,DoubleType,false)),true), ObjectType(interface scala.collection.Seq)), None), knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._5, None) AS _5#10]
                     +- Scanobj#5

       

       

      Because the explode function can create multiple rows from a single row  , we should account for the memory available when adding rows to the buffer .  

       

      This is even more important when we are exploding nested arrays . 

      Attachments

        1. image-2023-08-10-20-32-55-684.png
          42 kB
          Franck Tago
        2. spark-jira_wscg_code.txt
          23 kB
          Franck Tago

        Activity

          People

            Unassigned Unassigned
            tafranky@gmail.com Franck Tago
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: