Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.3.0
-
None
-
None
Description
RROR 18/05/24 14:08:25 AsyncEventQueue: Dropping event from queue appStatus. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. [dag-scheduler-event-loop]
WARN 18/05/24 16:39:54 AsyncEventQueue: Dropped 208205 events from appStatus since Thu May 24 16:30:57 CST 2018. [dag-scheduler-event-loop]
WARN 18/05/24 16:40:19 SQLAppStatusListener: agregateMetrics cost=18.6s, metricIds.size=775, metrics.size=5292940, aggregatedMetrics=615 [pool-22-thread-1]
private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
val metricIds = exec.metrics.map(_.accumulatorId).sorted
val metricTypes = exec.metrics.map
{ m => (m.accumulatorId, m.metricType) }.toMap
val metrics = exec.stages.toSeq
.flatMap { stageId => Option(stageMetrics.get(stageId)) }
.flatMap(_.taskMetrics.values().asScala)
.flatMap { metrics => metrics.ids.zip(metrics.values) }
val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
.filter { case (id, _) => metricIds.contains(id) }
.groupBy(_._1)
.map
if (exec.metricsValues != null)
{ exec.metricsValues }else
{ aggregatedMetrics }}
Case:
when aggregatedMetrics.size=5000000, metrics.size=1000, then the execution(val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
.filter { case (id, _) => metricIds.contains(id) }) is very slow
Attachments
Issue Links
- duplicates
-
SPARK-26003 Improve performance in SQLAppStatusListener
- Resolved
- links to