Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34779

ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
    • 3.2.0
    • Spark Core
    • None

    Description

      The current implementation of ExecutoMetricsPoller uses task count in each stage to decide whether to keep a stage entry or not. In the case of the executor only has 1 core, it may have these issues:

      1. Peak metrics missing (due to stage entry being removed within a heartbeat interval)
      2. Unnecessary and frequent hashmap entry removal and insertion.

      Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to stage (0,0)) to execute in a heartbeat interval, the workflow in current ExecutorMetricsPoller implementation would be:

      1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment to1

      2. 1st poll() -> update peak metrics of stage (0, 0)

      3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry removed, peak metrics lost.

      4. task2 start -> stage (0, 0) entry created in stageTCMP, task count increment to1

      5. 2nd poll() -> update peak metrics of stage (0, 0)

      6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry removed, peak metrics lost

      7. heartbeat() ->  empty or inaccurate peak metrics for stage(0,0) reported.

      We can fix the issue by keeping entries with task count = 0 in stageTCMP map until a heartbeat occurs. At the heartbeat, after reporting the peak metrics for each stage, we scan each stage in stageTCMP and remove entries with task count = 0.

      After the fix, the workflow would be:

      1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment to1

      2. 1st poll() -> update peak metrics of stage (0, 0)

      3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) still remain.

      4. task2 start -> task count of stage (0,0) increment to1

      5. 2nd poll() -> update peak metrics of stage (0, 0)

      6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) still remain.

      7. heartbeat() ->  accurate peak metrics for stage (0, 0) reported. Remove entry for stage (0,0) in stageTCMP because its task count is 0.

       

      How to verify the behavior? 

      Submit a job with a custom polling interval (e.g., 2s) and spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller.

      Attachments

        Issue Links

          Activity

            People

              Baohe Zhang Baohe Zhang
              Baohe Zhang Baohe Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: