Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26680

StackOverflowError if Stream passed to groupBy

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.2, 2.4.0, 3.0.0
    • 2.3.3, 2.4.1, 3.0.0
    • SQL
    • None

    Description

      This Java code results in a StackOverflowError:

      List<Column> groupByCols = new ArrayList<>();
      groupByCols.add(new Column("id1"));
      scala.collection.Seq<Column> groupByColsSeq =
          JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
              .asScala().toSeq();
      df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
      

      The toSeq method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:

      scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
      df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
      scala> val groupBySeq = Stream(col("id1"))
      groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
      scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
      java.lang.StackOverflowError
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
        at scala.collection.immutable.Stream.drop(Stream.scala:797)
        at scala.collection.immutable.Stream.drop(Stream.scala:204)
        at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
        at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
        at scala.collection.immutable.Stream.apply(Stream.scala:204)
        at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
        at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
        at scala.Option.getOrElse(Option.scala:138)
        at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
        at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
        at scala.collection.immutable.Stream.drop(Stream.scala:797)
        at scala.collection.immutable.Stream.drop(Stream.scala:204)
        at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
        at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
        at scala.collection.immutable.Stream.apply(Stream.scala:204)
        at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
        at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
        at scala.Option.getOrElse(Option.scala:138)
        at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
      ...etc...
      

      This is due to the lazy nature of Streams. The method consume in CodegenSupport assumes that a map function will be eagerly evaluated:

      val inputVars =
              ctx.currentVars = null <== the closure cares about this
              ctx.INPUT_ROW = row
              output.zipWithIndex.map { case (attr, i) =>
                BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
      -
      -
      -
          ctx.currentVars = inputVars
          ctx.INPUT_ROW = null
          ctx.freshNamePrefix = parent.variablePrefix
          val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
      

      The closure passed to the map function assumes ctx.currentVars will be set to null. But due to lazy evaluation, ctx.currentVars is set to something else by the time the closure is actually called. Worse yet, ctx.currentVars is set to the yet-to-be evaluated inputVars stream. The closure uses ctx.currentVars (via the call genCode(ctx)), therefore it ends up using the data structure it is attempting to create.

      You can recreate the problem is a vanilla Scala shell:

      scala> var p1: Seq[Any] = null
      p1: Seq[Any] = null
      scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
      s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
      scala> p1 = s
      p1: Seq[Any] = Stream(1, ?)
      scala> s.foreach(println)
      1
      java.lang.StackOverflowError
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
        at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
      ... etc ...
      

      Possible fixes:

      • In DataSet.groupBy, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing cols.map(.expr) to cols.toList.map(.expr)
      • In CodegenSupport.consume, we could ensure that the map function is eagerly evaluated (simply by moving the existing match statement to handle the result from either path of the if statement).
      • Something else that hasn't occurred to me (opinions welcome).

      Attachments

        Issue Links

          Activity

            People

              bersprockets Bruce Robbins
              bersprockets Bruce Robbins
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: