Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27707

Prune unnecessary nested fields from Generate

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: SQL
    • Labels:
      None

      Description

      this is a corner case of SPARK-21657.
      we have a case where we want to explode array inside a struct and also keep some other columns of the struct. we again encounter a huge performance issue.
      reconstruction code:

      val df = spark.sparkContext.parallelize(Seq(("1",
                Array.fill(M)({
                  val i = math.random
                  (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
                })))).toDF("col", "arr")
                .selectExpr("col", "struct(col, arr) as st")
                .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
      
      df.write.mode("overwrite").save("/tmp/blah")
      

      a workaround is projecting before the explode:

      val df = spark.sparkContext.parallelize(Seq(("1",
                Array.fill(M)({
                  val i = math.random
                  (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
                })))).toDF("col", "arr")
                .selectExpr("col", "struct(col, arr) as st")
                .withColumn("col1", $"st.col")
                .selectExpr("col", "col1", "explode(st.arr) as arr_col")
      
      df.write.mode("overwrite").save("/tmp/blah")
      

      in this case the optimization done in SPARK-21657:

          // prune unrequired references
          case p @ Project(_, g: Generate) if p.references != g.outputSet =>
            val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
            val newChild = prunedChild(g.child, requiredAttrs)
            val unrequired = g.generator.references -- p.references
            val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
              .map(_._2)
            p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
      

      doesn't work because `p.references` has whole the `st` struct as reference and not just the projected field.
      this causes the entire struct including the huge array field to get duplicated as the number of array elements.

      I know this is kind of a corner case but was really non trivial to understand..

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                viirya L. C. Hsieh
                Reporter:
                uzadude Ohad Raviv
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: