Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21009

SparkListenerTaskEnd.taskInfo.accumulables might not be accurate

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.2.0
    • None
    • Spark Core
    • None

    Description

      The following code reproduces it:

        test("test") {
          val foundMetrics = mutable.Set.empty[String]
          spark.sparkContext.addSparkListener(new SparkListener {
            override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
              taskEnd.taskInfo.accumulables.foreach { a =>
                if (a.name.isDefined) {
                  foundMetrics.add(a.name.get)
                }
              }
            }
          })
          for (iter <- 0 until 100) {
            foundMetrics.clear()
            println(s"iter = $iter")
            spark.range(10).groupBy().agg("id" -> "sum").collect
            spark.sparkContext.listenerBus.waitUntilEmpty(3000)
            assert(foundMetrics.size > 0)
          }
        }
      

      The problem comes from DAGScheduler.handleTaskCompletion.
      The SparkListenerTaskEnd event is sent before updateAccumulators is called, so it might not be up to date.
      The code there looks like it needs refactoring.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              bograd Bogdan Raducanu
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: