Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25429

SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.1
    • Fix Version/s: 3.0.0
    • Component/s: Spark Core
    • Labels:
      None

      Description

      private def updateStageMetrics(
            stageId: Int,
            attemptId: Int,
            taskId: Long,
            accumUpdates: Seq[AccumulableInfo],
            succeeded: Boolean): Unit = {
          Option(stageMetrics.get(stageId)).foreach { metrics =>
            if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
              return
            }
      
            val oldTaskMetrics = metrics.taskMetrics.get(taskId)
            if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
              return
            }
      
            val updates = accumUpdates
              .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
              .sortBy(_.id)
      
            if (updates.isEmpty) {
              return
            }
      
            val ids = new Array[Long](updates.size)
            val values = new Array[Long](updates.size)
            updates.zipWithIndex.foreach { case (acc, idx) =>
              ids(idx) = acc.id
              // In a live application, accumulators have Long values, but when reading from event
              // logs, they have String values. For now, assume all accumulators are Long and covert
              // accordingly.
              values(idx) = acc.update.get match {
                case s: String => s.toLong
                case l: Long => l
                case o => throw new IllegalArgumentException(s"Unexpected: $o")
              }
            }
      
            // TODO: storing metrics by task ID can cause metrics for the same task index to be
            // counted multiple times, for example due to speculation or re-attempts.
            metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
          }
        }
      

      'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, it's inefficient use Arrray#contains.
      Actually, application may timeout while quit and will killed by RM on YARN mode.

        Attachments

          Activity

            People

            • Assignee:
              yumwang Yuming Wang
              Reporter:
              Deng FEI DENG FEI
            • Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: