Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28016

Spark hangs when an execution plan has many projections on nested structs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 2.4.3
    • None
    • Optimizer, SQL
    • None
    • Tried in

      • Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
      • Spark 2.4.3 / Yarn on a Linux cluster

    Description

      Spark applications freeze on execution plan optimization stage (Catalyst) when a logical execution plan contains a lot of projections that operate on nested struct fields.

      2 Spark Applications are attached. One demonstrates the issue, the other demonstrates a workaround. Also, an archive is attached where these jobs are packages as a Maven Project.

      To reproduce the attached Spark App does the following:

      • A small dataframe is created from a JSON example.
      • A nested withColumn map transformation is used to apply a transformation on a struct field and create a new struct field. The code for this transformation is also attached. 
      • Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing the execution plan
      package za.co.absa.spark.app
      
      import org.apache.spark.sql._
      import org.apache.spark.sql.functions._
      
      object SparkApp1Issue {
      
        // A sample data for a dataframe with nested structs
        val sample =
          """
            |{
            |  "strings": {
            |    "simple": "Culpa repellat nesciunt accusantium",
            |    "all_random": "DESebo8d%fL9sX@AzVin",
            |    "whitespaces": "    q    bb    l    "
            |  },
            |  "numerics": {
            |    "small_positive": 722,
            |    "small_negative": -660,
            |    "big_positive": 669223368251997,
            |    "big_negative": -161176863305841,
            |    "zero": 0
            |  }
            |}
          """.stripMargin ::
            """{
              |  "strings": {
              |    "simple": "Accusamus quia vel deleniti",
              |    "all_random": "rY&n9UnVcD*KS]jPBpa[",
              |    "whitespaces": "  t e   t   rp   z p"
              |  },
              |  "numerics": {
              |    "small_positive": 268,
              |    "small_negative": -134,
              |    "big_positive": 768990048149640,
              |    "big_negative": -684718954884696,
              |    "zero": 0
              |  }
              |}
              |""".stripMargin ::
            """{
              |  "strings": {
              |    "simple": "Quia numquam deserunt delectus rem est",
              |    "all_random": "GmRdQlE4Avn1hSlVPAH",
              |    "whitespaces": "   c   sa    yv   drf "
              |  },
              |  "numerics": {
              |    "small_positive": 909,
              |    "small_negative": -363,
              |    "big_positive": 592517494751902,
              |    "big_negative": -703224505589638,
              |    "zero": 0
              |  }
              |}
              |""".stripMargin :: Nil
      
        /**
          * This Spark Job demonstrates an issue of execution plan freezing when there are a lot of projections
          * involving nested structs in an execution plan.
          *
          * The example works as follows:
          * - A small dataframe is created from a JSON example above
          * - A nested withColumn map transformation is used to apply a transformation on a struct field and create
          *   a new struct field.
          * - Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing
          *   the execution plan
          */
        def main(args: Array[String]): Unit = {
      
          val sparkBuilder = SparkSession.builder().appName("Nested Projections Issue")
          val spark = sparkBuilder
            .master("local[4]")
            .getOrCreate()
      
          import spark.implicits._
          import za.co.absa.spark.utils.NestedOps.DataSetWrapper
      
          val df = spark.read.json(sample.toDS)
      
          // Apply several uppercase and negation transformations
          val dfOutput = df
            .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => upper(c))
            .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => upper(c))
            .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => upper(c))
            .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => -c)
            .nestedWithColumnMap("numerics.small_negative", "numerics.num2", c => -c)
            .nestedWithColumnMap("numerics.big_positive", "numerics.num3", c => -c)
            .nestedWithColumnMap("numerics.big_negative", "numerics.num4", c => -c)
            .nestedWithColumnMap("numerics.small_positive", "numerics.num5", c => -c)
            .nestedWithColumnMap("numerics.small_negative", "numerics.num6", c => -c)
            .nestedWithColumnMap("numerics.big_positive", "numerics.num7", c => -c)
            .nestedWithColumnMap("numerics.big_negative", "numerics.num8", c => -c)
            // Uncommenting the line below will cause Catalyst to freeze completely
            //.nestedWithColumnMap("numerics.big_negative", "numerics.num9", c => -c)
      
          dfOutput.printSchema()
          dfOutput.explain(true)
          dfOutput.show
        }
      
      }
      

      Attachments

        1. SparkApp1IssueSelfContained.scala
          5 kB
          Ruslan Yushchenko
        2. spark-app-nested.tgz
          6 kB
          Ruslan Yushchenko
        3. NestedOps.scala
          3 kB
          Ruslan Yushchenko
        4. SparkApp2Workaround.scala
          18 kB
          Ruslan Yushchenko
        5. SparkApp1Issue.scala
          9 kB
          Ruslan Yushchenko

        Activity

          People

            Unassigned Unassigned
            yruslan2 Ruslan Yushchenko
            Votes:
            3 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: