Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23986

CompileException when using too many avg aggregation after joining

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1, 2.4.0
    • SQL
    • None

    Description

      Considering the following code:

          val df1: DataFrame = sparkSession.sparkContext
            .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6)))
            .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6")
      
          val df2: DataFrame = sparkSession.sparkContext
            .makeRDD(Seq((0, "val1", "val2")))
            .toDF("key", "dummy1", "dummy2")
      
          val agg = df1
            .join(df2, df1("key") === df2("key"), "leftouter")
            .groupBy(df1("key"))
            .agg(
              avg("col2").as("avg2"),
              avg("col3").as("avg3"),
              avg("col4").as("avg4"),
              avg("col1").as("avg1"),
              avg("col5").as("avg5"),
              avg("col6").as("avg6")
            )
      
          val head = agg.take(1)
      

      This logs the following exception:

      ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 467, Column 28: Redefinition of parameter "agg_expr_11"
      

      I am not a spark expert but after investigation, I realized that the generated doConsume method is responsible of the exception.

      Indeed, avg calls several times org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction. The 1st time with the 'avg' Expr and a second time for the base aggregation Expr (count and sum).

      The problem comes from the generation of parameters in CodeGenerator:

        /**
         * Returns a term name that is unique within this instance of a `CodegenContext`.
         */
        def freshName(name: String): String = synchronized {
          val fullName = if (freshNamePrefix == "") {
            name
          } else {
            s"${freshNamePrefix}_$name"
          }
          if (freshNameIds.contains(fullName)) {
            val id = freshNameIds(fullName)
            freshNameIds(fullName) = id + 1
            s"$fullName$id"
          } else {
            freshNameIds += fullName -> 1
            fullName
          }
        }
      

      The freshNameIds already contains agg_expr_[1..6] from the 1st call.
      The second call is made with agg_expr_[1..12] and generates the following names:
      agg_expr_[11|21|31|41|51|61|11|12]. We then have a parameter name conflicts in the generated code: agg_expr_11.

      Appending the 'id' in s"$fullName$id" to generate unique term name is source of conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            mgaido Marco Gaido
            RustedBones Michel Davit
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment