Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.2.0
-
None
-
None
Description
The following code reproduces it:
test("test") { val foundMetrics = mutable.Set.empty[String] spark.sparkContext.addSparkListener(new SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskInfo.accumulables.foreach { a => if (a.name.isDefined) { foundMetrics.add(a.name.get) } } } }) for (iter <- 0 until 100) { foundMetrics.clear() println(s"iter = $iter") spark.range(10).groupBy().agg("id" -> "sum").collect spark.sparkContext.listenerBus.waitUntilEmpty(3000) assert(foundMetrics.size > 0) } }
The problem comes from DAGScheduler.handleTaskCompletion.
The SparkListenerTaskEnd event is sent before updateAccumulators is called, so it might not be up to date.
The code there looks like it needs refactoring.
Attachments
Issue Links
- duplicates
-
SPARK-20342 DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
- Resolved