Details
Description
Spark applications freeze on execution plan optimization stage (Catalyst) when a logical execution plan contains a lot of projections that operate on nested struct fields.
2 Spark Applications are attached. One demonstrates the issue, the other demonstrates a workaround. Also, an archive is attached where these jobs are packages as a Maven Project.
To reproduce the attached Spark App does the following:
- A small dataframe is created from a JSON example.
- A nested withColumn map transformation is used to apply a transformation on a struct field and create a new struct field. The code for this transformation is also attached.
- Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing the execution plan
package za.co.absa.spark.app import org.apache.spark.sql._ import org.apache.spark.sql.functions._ object SparkApp1Issue { // A sample data for a dataframe with nested structs val sample = """ |{ | "strings": { | "simple": "Culpa repellat nesciunt accusantium", | "all_random": "DESebo8d%fL9sX@AzVin", | "whitespaces": " q bb l " | }, | "numerics": { | "small_positive": 722, | "small_negative": -660, | "big_positive": 669223368251997, | "big_negative": -161176863305841, | "zero": 0 | } |} """.stripMargin :: """{ | "strings": { | "simple": "Accusamus quia vel deleniti", | "all_random": "rY&n9UnVcD*KS]jPBpa[", | "whitespaces": " t e t rp z p" | }, | "numerics": { | "small_positive": 268, | "small_negative": -134, | "big_positive": 768990048149640, | "big_negative": -684718954884696, | "zero": 0 | } |} |""".stripMargin :: """{ | "strings": { | "simple": "Quia numquam deserunt delectus rem est", | "all_random": "GmRdQlE4Avn1hSlVPAH", | "whitespaces": " c sa yv drf " | }, | "numerics": { | "small_positive": 909, | "small_negative": -363, | "big_positive": 592517494751902, | "big_negative": -703224505589638, | "zero": 0 | } |} |""".stripMargin :: Nil /** * This Spark Job demonstrates an issue of execution plan freezing when there are a lot of projections * involving nested structs in an execution plan. * * The example works as follows: * - A small dataframe is created from a JSON example above * - A nested withColumn map transformation is used to apply a transformation on a struct field and create * a new struct field. * - Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing * the execution plan */ def main(args: Array[String]): Unit = { val sparkBuilder = SparkSession.builder().appName("Nested Projections Issue") val spark = sparkBuilder .master("local[4]") .getOrCreate() import spark.implicits._ import za.co.absa.spark.utils.NestedOps.DataSetWrapper val df = spark.read.json(sample.toDS) // Apply several uppercase and negation transformations val dfOutput = df .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => upper(c)) .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => upper(c)) .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => upper(c)) .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => -c) .nestedWithColumnMap("numerics.small_negative", "numerics.num2", c => -c) .nestedWithColumnMap("numerics.big_positive", "numerics.num3", c => -c) .nestedWithColumnMap("numerics.big_negative", "numerics.num4", c => -c) .nestedWithColumnMap("numerics.small_positive", "numerics.num5", c => -c) .nestedWithColumnMap("numerics.small_negative", "numerics.num6", c => -c) .nestedWithColumnMap("numerics.big_positive", "numerics.num7", c => -c) .nestedWithColumnMap("numerics.big_negative", "numerics.num8", c => -c) // Uncommenting the line below will cause Catalyst to freeze completely //.nestedWithColumnMap("numerics.big_negative", "numerics.num9", c => -c) dfOutput.printSchema() dfOutput.explain(true) dfOutput.show } }