Description
Considering the following code:
val df1: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, 1, 2, 3, 4, 5, 6))) .toDF("key", "col1", "col2", "col3", "col4", "col5", "col6") val df2: DataFrame = sparkSession.sparkContext .makeRDD(Seq((0, "val1", "val2"))) .toDF("key", "dummy1", "dummy2") val agg = df1 .join(df2, df1("key") === df2("key"), "leftouter") .groupBy(df1("key")) .agg( avg("col2").as("avg2"), avg("col3").as("avg3"), avg("col4").as("avg4"), avg("col1").as("avg1"), avg("col5").as("avg5"), avg("col6").as("avg6") ) val head = agg.take(1)
This logs the following exception:
ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 467, Column 28: Redefinition of parameter "agg_expr_11"
I am not a spark expert but after investigation, I realized that the generated doConsume method is responsible of the exception.
Indeed, avg calls several times org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction. The 1st time with the 'avg' Expr and a second time for the base aggregation Expr (count and sum).
The problem comes from the generation of parameters in CodeGenerator:
/** * Returns a term name that is unique within this instance of a `CodegenContext`. */ def freshName(name: String): String = synchronized { val fullName = if (freshNamePrefix == "") { name } else { s"${freshNamePrefix}_$name" } if (freshNameIds.contains(fullName)) { val id = freshNameIds(fullName) freshNameIds(fullName) = id + 1 s"$fullName$id" } else { freshNameIds += fullName -> 1 fullName } }
The freshNameIds already contains agg_expr_[1..6] from the 1st call.
The second call is made with agg_expr_[1..12] and generates the following names:
agg_expr_[11|21|31|41|51|61|11|12]. We then have a parameter name conflicts in the generated code: agg_expr_11.
Appending the 'id' in s"$fullName$id" to generate unique term name is source of conflict. Maybe simply using undersoce can solve this issue : $fullName_$id"