Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.3.2, 3.4.0, 3.4.1
-
None
-
None
Description
The code sample below is to showcase the wholestagecodegen generated when exploding nested arrays. The data sample in the dataframe is quite small so it will not trigger the Out of Memory error .
However if the array is larger and the row size is large , you will definitely end up with an OOM error .
consider a scenario where you flatten a nested array
// e.g you can use the following steps to create the dataframe
//create a partClass case class
case class partClass (PARTNAME: String , PartNumber: String , PartPrice : Double )
//create a nested array array class
case class array_array_class (
col_int: Int,
arr_arr_string : Seq[Seq[String]],
arr_arr_bigint : Seq[Seq[Long]],
col_string : String,
parts_s : Seq[Seq[partClass]]
)
//create a dataframe
var df_array_array = sc.parallelize(
Seq(
(1,Seq(Seq("a","b" ,"c" ,"d") ,Seq("aa","bb" ,"cc","dd")) , Seq(Seq(1000,20000), Seq(30000,-10000)),"ItemPart1",
Seq(Seq(partClass("PNAME1","P1",20.75),partClass("PNAME1_1","P1_1",30.75)))
) ,
(2,Seq(Seq("ab","bc" ,"cd" ,"de") ,Seq("aab","bbc" ,"ccd","dde"),Seq("aaaaaabbbbb")) , Seq(Seq(-1000,-20000,-1,-2), Seq(0,30000,-10000)),"ItemPart2",
Seq(Seq(partClass("PNAME2","P2",170.75),partClass("PNAME2_1","P2_1",33.75),partClass("PNAME2_2","P2_2",73.75)))
)
)
).toDF("c1" ,"c2" ,"c3" ,"c4" ,"c5")
//explode a nested array
var result = df_array_array.select( col("c1"), explode(col("c2"))).select('c1 , explode('col))
result.explain
The physical for this operator is seen below.
-------------------------------------
Physical plan
== Physical Plan ==
*(1) Generate explode(col#27), c1#17, false, col#30
+- *(1) Filter ((size(col#27, true) > 0) AND isnotnull(col#27))
+- *(1) Generate explode(c2#18), c1#17, false, col#27
+- *(1) Project _1#6 AS c1#17, _2#7 AS c2#18
+- *(1) Filter ((size(_2#7, true) > 0) AND isnotnull(_2#7))
+- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._1 AS _1#6, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), StringType, ObjectType(class java.lang.String)), true, false, true), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), ArrayType(StringType,true), ObjectType(interface scala.collection.Seq)), None), knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._2, None) AS _2#7, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), assertnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), IntegerType, IntegerType)), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), ArrayType(IntegerType,false), ObjectType(interface scala.collection.Seq)), None), knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._3, None) AS _3#8, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._4, true, false, true) AS _4#9, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -5), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), if (isnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass)))) null else named_struct(PARTNAME, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PARTNAME, true, false, true), PartNumber, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PartNumber, true, false, true), PartPrice, knownnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -6), StructField(PARTNAME,StringType,true), StructField(PartNumber,StringType,true), StructField(PartPrice,DoubleType,false), ObjectType(class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$partClass))).PartPrice), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -5), ArrayType(StructType(StructField(PARTNAME,StringType,true),StructField(PartNumber,StringType,true),StructField(PartPrice,DoubleType,false)),true), ObjectType(interface scala.collection.Seq)), None), knownnotnull(assertnotnull(input[0, scala.Tuple5, true]))._5, None) AS _5#10]
+- Scanobj#5
Because the explode function can create multiple rows from a single row , we should account for the memory available when adding rows to the buffer .
This is even more important when we are exploding nested arrays .