Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.1
-
None
Description
private def updateStageMetrics( stageId: Int, attemptId: Int, taskId: Long, accumUpdates: Seq[AccumulableInfo], succeeded: Boolean): Unit = { Option(stageMetrics.get(stageId)).foreach { metrics => if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { return } val oldTaskMetrics = metrics.taskMetrics.get(taskId) if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { return } val updates = accumUpdates .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } .sortBy(_.id) if (updates.isEmpty) { return } val ids = new Array[Long](updates.size) val values = new Array[Long](updates.size) updates.zipWithIndex.foreach { case (acc, idx) => ids(idx) = acc.id // In a live application, accumulators have Long values, but when reading from event // logs, they have String values. For now, assume all accumulators are Long and covert // accordingly. values(idx) = acc.update.get match { case s: String => s.toLong case l: Long => l case o => throw new IllegalArgumentException(s"Unexpected: $o") } } // TODO: storing metrics by task ID can cause metrics for the same task index to be // counted multiple times, for example due to speculation or re-attempts. metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) } }
'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, it's inefficient use Arrray#contains.
Actually, application may timeout while quit and will killed by RM on YARN mode.