Details
Description
This Java code results in a StackOverflowError:
List<Column> groupByCols = new ArrayList<>(); groupByCols.add(new Column("id1")); scala.collection.Seq<Column> groupByColsSeq = JavaConverters.asScalaIteratorConverter(groupByCols.iterator()) .asScala().toSeq(); df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
The toSeq method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv") df: org.apache.spark.sql.DataFrame = [id1: int, id2: int] scala> val groupBySeq = Stream(col("id1")) groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?) scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect java.lang.StackOverflowError at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161) at scala.collection.immutable.Stream.drop(Stream.scala:797) at scala.collection.immutable.Stream.drop(Stream.scala:204) at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66) at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65) at scala.collection.immutable.Stream.apply(Stream.scala:204) at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138) at scala.Option.getOrElse(Option.scala:138) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161) at scala.collection.immutable.Stream.drop(Stream.scala:797) at scala.collection.immutable.Stream.drop(Stream.scala:204) at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66) at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65) at scala.collection.immutable.Stream.apply(Stream.scala:204) at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45) at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138) at scala.Option.getOrElse(Option.scala:138) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133) at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159) ...etc...
This is due to the lazy nature of Streams. The method consume in CodegenSupport assumes that a map function will be eagerly evaluated:
val inputVars = ctx.currentVars = null <== the closure cares about this ctx.INPUT_ROW = row output.zipWithIndex.map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable).genCode(ctx) - - - ctx.currentVars = inputVars ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
The closure passed to the map function assumes ctx.currentVars will be set to null. But due to lazy evaluation, ctx.currentVars is set to something else by the time the closure is actually called. Worse yet, ctx.currentVars is set to the yet-to-be evaluated inputVars stream. The closure uses ctx.currentVars (via the call genCode(ctx)), therefore it ends up using the data structure it is attempting to create.
You can recreate the problem is a vanilla Scala shell:
scala> var p1: Seq[Any] = null p1: Seq[Any] = null scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x } s: scala.collection.immutable.Stream[Any] = Stream(1, ?) scala> p1 = s p1: Seq[Any] = Stream(1, ?) scala> s.foreach(println) 1 java.lang.StackOverflowError at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159) ... etc ...
Possible fixes:
- In DataSet.groupBy, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing cols.map(.expr) to cols.toList.map(.expr)
- In CodegenSupport.consume, we could ensure that the map function is eagerly evaluated (simply by moving the existing match statement to handle the result from either path of the if statement).
- Something else that hasn't occurred to me (opinions welcome).
Attachments
Issue Links
- relates to
-
SPARK-33260 SortExec produces incorrect results if sortOrder is a Stream
- Resolved
- links to