Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.2.0
-
None
Description
After SPARK-18191 commit in pull request 15769, using the new commit protocol it is possible that driver and executors uses different jobIds during a rdd commit.
In the old code, the variable stageId is part of the closure used to define the task as you can see here:
https://github.com/apache/spark/blob/9c8deef64efee20a0ddc9b612f90e77c80aede60/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1098
As a result, a TaskAttemptId is constructed in executors using the same "stageId" as the driver, since it is a value that is serialized in the driver. Also the value of stageID is actually the rdd.id which is assigned here: https://github.com/apache/spark/blob/9c8deef64efee20a0ddc9b612f90e77c80aede60/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1084
However, after the change in pull request 15769, the value is no longer part of the task closure, which gets serialized by the driver. Instead, it is pulled from the taskContext as you can see here:https://github.com/apache/spark/pull/15769/files#diff-dff185cb90c666bce445e3212a21d765R103
and then that value is used to construct the TaskAttemptId on the executors: https://github.com/apache/spark/pull/15769/files#diff-dff185cb90c666bce445e3212a21d765R134
taskContext has a stageID value which will be set in DAGScheduler. So after the change unlike the old code which a rdd.id was used, an actual stage.id is used which can be different between executors and the driver since it is no longer serialized.
In summary, the old code consistently used rddId, and just incorrectly named it "stageId".
The new code uses a mix of rddId and stageId. There should be a consistent ID between executors and the drivers.