Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34779

ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0, 3.0.1, 3.0.2, 3.1.0, 3.1.1
    • 3.2.0
    • Spark Core
    • None

    Description

      The current implementation of ExecutoMetricsPoller uses task count in each stage to decide whether to keep a stage entry or not. In the case of the executor only has 1 core, it may have these issues:

      1. Peak metrics missing (due to stage entry being removed within a heartbeat interval)
      2. Unnecessary and frequent hashmap entry removal and insertion.

      Assuming an executor with 1 core has 2 tasks (task1 and task2, both belong to stage (0,0)) to execute in a heartbeat interval, the workflow in current ExecutorMetricsPoller implementation would be:

      1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment to1

      2. 1st poll() -> update peak metrics of stage (0, 0)

      3. task1 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry removed, peak metrics lost.

      4. task2 start -> stage (0, 0) entry created in stageTCMP, task count increment to1

      5. 2nd poll() -> update peak metrics of stage (0, 0)

      6. task2 end -> stage (0, 0) task count decrement to 0, stage (0, 0) entry removed, peak metrics lost

      7. heartbeat() ->  empty or inaccurate peak metrics for stage(0,0) reported.

      We can fix the issue by keeping entries with task count = 0 in stageTCMP map until a heartbeat occurs. At the heartbeat, after reporting the peak metrics for each stage, we scan each stage in stageTCMP and remove entries with task count = 0.

      After the fix, the workflow would be:

      1. task1 start -> stage (0, 0) entry created in stageTCMP, task count increment to1

      2. 1st poll() -> update peak metrics of stage (0, 0)

      3. task1 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) still remain.

      4. task2 start -> task count of stage (0,0) increment to1

      5. 2nd poll() -> update peak metrics of stage (0, 0)

      6. task2 end -> stage (0, 0) task count decrement to 0,but the entry (0,0) still remain.

      7. heartbeat() ->  accurate peak metrics for stage (0, 0) reported. Remove entry for stage (0,0) in stageTCMP because its task count is 0.

       

      How to verify the behavior? 

      Submit a job with a custom polling interval (e.g., 2s) and spark.executor.cores=1 and check the debug logs of ExecutoMetricsPoller.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Baohe Zhang Baohe Zhang
            Baohe Zhang Baohe Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Issue deployment