Description
Proposing to add a per-query ID to the codegen stages as represented by WholeStageCodegenExec operators. This ID will be used in
- the explain output of the physical plan, and in
- the generated class name.
Specifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the WholeStageCodegenExec inserted into a plan.
The ID value 0 is reserved for "free-floating" WholeStageCodegenExec objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators.
Example: for the following query:
scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1) scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x + 1 as 'z, 'y) df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint] scala> val df2 = spark.range(5) df2: org.apache.spark.sql.Dataset[Long] = [id: bigint] scala> val query = df1.join(df2, 'z === 'id) query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]
The explain output before the change is:
scala> query.explain == Physical Plan == *SortMergeJoin [z#9L], [id#13L], Inner :- *Sort [z#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(z#9L, 200) : +- *Project [(x#3L + 1) AS z#9L, y#4L] : +- *Sort [x#3L ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200) : +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L] : +- *Range (0, 10, step=1, splits=8) +- *Sort [id#13L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#13L, 200) +- *Range (0, 5, step=1, splits=8)
Note how codegen'd operators are annotated with a prefix "*".
and after this change it'll be:
scala> query.explain == Physical Plan == *(6) SortMergeJoin [z#9L], [id#13L], Inner :- *(3) Sort [z#9L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(z#9L, 200) : +- *(2) Project [(x#3L + 1) AS z#9L, y#4L] : +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0 : +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200) : +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L] : +- *(1) Range (0, 10, step=1, splits=8) +- *(5) Sort [id#13L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#13L, 200) +- *(4) Range (0, 5, step=1, splits=8)
Note that the annotated prefix becomes "*(id) "
It'll also show up in the name of the generated class, as a suffix in the format of
GeneratedClass$GeneratedIterator$id
for example, note how GeneratedClass$GeneratedIteratorForCodegenStage3 and GeneratedClass$GeneratedIteratorForCodegenStage6 in the following stack trace corresponds to the IDs shown in the explain output above:
"Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
Rationale:
Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things:
1. It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units.
There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with SortMergeJoin and its Sort inputs – they're hard coded to be split into separate stages although both are codegen'd.
When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star ('*') but would have no way to figure out if they're in the same stage.
2. Performance/error diagnosis
The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at.
By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened.
The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.
The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff.