Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-4305

Implement delay scheduling in capacity scheduler for improving data locality

    Details

    • Type: New Feature New Feature
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Capacity Scheduler data local tasks are about 40%-50% which is not good.
      While my test with 70 node cluster i consistently get data locality around 40-50% on a free cluster.

      I think we need to implement something like delay scheduling in the capacity scheduler for improving the data locality.
      http://radlab.cs.berkeley.edu/publication/308

      After implementing the delay scheduling on Hadoop 22 I am getting 100 % data locality in free cluster and around 90% data locality in busy cluster.

      Thanks,
      Mayank

      1. MAPREDUCE-4305
        22 kB
        Mayank Bansal
      2. MAPREDUCE-4305-1.patch
        22 kB
        Mayank Bansal
      3. PATCH-MAPREDUCE-4305-MR1.patch
        43 kB
        Mayank Bansal
      4. PATCH-MAPREDUCE-4305-MR1-1.patch
        43 kB
        Mayank Bansal
      5. PATCH-MAPREDUCE-4305-MR1-2.patch
        39 kB
        Mayank Bansal
      6. PATCH-MAPREDUCE-4305-MR1-3.patch
        40 kB
        Mayank Bansal
      7. PATCH-MAPREDUCE-4305-MR1-6.patch
        35 kB
        Mayank Bansal
      8. PATCH-MAPREDUCE-4305-MR1-7.patch
        35 kB
        Mayank Bansal

        Issue Links

          Activity

          Hide
          Karthik Kambatla added a comment -

          Arun C Murthy, can you take a look at this when you get a chance. It would be a nice to have addition.

          Show
          Karthik Kambatla added a comment - Arun C Murthy , can you take a look at this when you get a chance. It would be a nice to have addition.
          Hide
          Karthik Kambatla added a comment -

          Mayank's latest patch looks good. Arun C Murthy, do you have any further comments on this?

          Show
          Karthik Kambatla added a comment - Mayank's latest patch looks good. Arun C Murthy , do you have any further comments on this?
          Hide
          Karthik Kambatla added a comment -

          Thanks Mayank. +1.

          Show
          Karthik Kambatla added a comment - Thanks Mayank. +1.
          Hide
          Mayank Bansal added a comment -

          Thanks Karthik for the review.

          Updated your comments.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Thanks Karthik for the review. Updated your comments. Thanks, Mayank
          Hide
          Karthik Kambatla added a comment -

          Thanks Mayank. +1 on the code part.

          Sorry for missing these readability nits in my last review. Choose to ignore some/all of them.

          1. Should we call NumberBasedDelayScheduling to SkipsBasedDelayScheduling, makes it easier to understand? (Might have to change other method names accordingly)
          2. Rename calcNumberBasedDelayScheduling to resetNumberBasedDelayScheduling
          3. Within calcNumberBasedDelayScheduling, use ternary operator instead of if-else
          Show
          Karthik Kambatla added a comment - Thanks Mayank. +1 on the code part. Sorry for missing these readability nits in my last review. Choose to ignore some/all of them. Should we call NumberBasedDelayScheduling to SkipsBasedDelayScheduling, makes it easier to understand? (Might have to change other method names accordingly) Rename calcNumberBasedDelayScheduling to resetNumberBasedDelayScheduling Within calcNumberBasedDelayScheduling, use ternary operator instead of if-else
          Hide
          Mayank Bansal added a comment -

          Thanks Arun and Karthik for your valuable comments.

          I am updating the patch with all your comments.

          Please take a look.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Thanks Arun and Karthik for your valuable comments. I am updating the patch with all your comments. Please take a look. Thanks, Mayank
          Hide
          Karthik Kambatla added a comment -

          Thanks Mayank. From a logic point of view, the code looks good.

          Have a few nits, mostly formatting:

          1. The patch has a few unrelated diffs - mostly whitespace and formatting changes[
          2. Definition of SKIP_SCHEDULING_TIMES seems way over 80 chars - can we wrap it around?
          3. Formatting seems off at
            long numActiveNodesinCluster = scheduler.taskTrackerManager
                  .getClusterStatus().getTaskTrackers()
                  - scheduler.taskTrackerManager.getClusterStatus()
                  .getGraylistedTrackers();
            
          4. Switch statement in getAllowedLocalityLevel() has a few statements over 80 chars
          5. LocalityStage javadoc refers to FairScheduler
          6. In TestCapacityScheduler, formatting in method parameters of obtain*MapTask: no space after comma
          Show
          Karthik Kambatla added a comment - Thanks Mayank. From a logic point of view, the code looks good. Have a few nits, mostly formatting: The patch has a few unrelated diffs - mostly whitespace and formatting changes[ Definition of SKIP_SCHEDULING_TIMES seems way over 80 chars - can we wrap it around? Formatting seems off at long numActiveNodesinCluster = scheduler.taskTrackerManager .getClusterStatus().getTaskTrackers() - scheduler.taskTrackerManager.getClusterStatus() .getGraylistedTrackers(); Switch statement in getAllowedLocalityLevel() has a few statements over 80 chars LocalityStage javadoc refers to FairScheduler In TestCapacityScheduler, formatting in method parameters of obtain*MapTask : no space after comma
          Hide
          Mayank Bansal added a comment -

          Updating the patch with default value.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Updating the patch with default value. Thanks, Mayank
          Hide
          Mayank Bansal added a comment -

          Hi,

          Thanks Arun for your comments.
          I had a offline discussion with Arun.

          The reason behind the timeouts which I have added was due to save jobs from starving if we have priority however we need more work in that case.
          So I am refactoring my patch in to two patches.
          This patch is mostly Yarn-80 for Hadoop-1 with test framework for testing those scenarios like node local , rack local etc.
          I will file the JIRA with priority and timeouts and update the patch there.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Hi, Thanks Arun for your comments. I had a offline discussion with Arun. The reason behind the timeouts which I have added was due to save jobs from starving if we have priority however we need more work in that case. So I am refactoring my patch in to two patches. This patch is mostly Yarn-80 for Hadoop-1 with test framework for testing those scenarios like node local , rack local etc. I will file the JIRA with priority and timeouts and update the patch there. Thanks, Mayank
          Hide
          Arun C Murthy added a comment -

          Mayank - the time-based configs are a bad idea (I've said the same about FairScheduler long ago) - it doesn't consider cluster sizes, job length, job current progress etc.

          I promise you that porting YARN-80 is sufficient and will get you required locality improvements. Please, let us not add more configs if possible. Thanks.

          Show
          Arun C Murthy added a comment - Mayank - the time-based configs are a bad idea (I've said the same about FairScheduler long ago) - it doesn't consider cluster sizes, job length, job current progress etc. I promise you that porting YARN-80 is sufficient and will get you required locality improvements. Please, let us not add more configs if possible. Thanks.
          Hide
          Mayank Bansal added a comment -

          Fixing small bug

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Fixing small bug Thanks, Mayank
          Hide
          Karthik Kambatla added a comment -

          Didn't mean to undermine the patch's scope, completely agree it is a combination of YARN-80 and delay scheduling with timeouts from FS. Personally, I like this approach better, may be we can augment the one in YARN where applicable.

          Show
          Karthik Kambatla added a comment - Didn't mean to undermine the patch's scope, completely agree it is a combination of YARN-80 and delay scheduling with timeouts from FS. Personally, I like this approach better, may be we can augment the one in YARN where applicable.
          Hide
          Mayank Bansal added a comment -

          HI Karthik,

          There is more to it then YARN-80, Please followup with my previous comments.

          Its combination of YARN-80 and part of fair scheduler delay scheduling with timeouts.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - HI Karthik, There is more to it then YARN-80 , Please followup with my previous comments. Its combination of YARN-80 and part of fair scheduler delay scheduling with timeouts. Thanks, Mayank
          Hide
          Karthik Kambatla added a comment -

          Thanks Arun. My understanding is that Mayank's patch is mostly just backporting YARN-80 to MR1, along with other MR1 specific changes.

          Show
          Karthik Kambatla added a comment - Thanks Arun. My understanding is that Mayank's patch is mostly just backporting YARN-80 to MR1, along with other MR1 specific changes.
          Hide
          Mayank Bansal added a comment -

          Hi Arun,

          In Hadoop-1 we have something called scheduling opportunities counter for a job which gets incremented every time is get the opportunity to get scheduled.
          In Yarn-80 we are using the same counter for delaying the schedule for Node.

          In this patch I used the scheduling opportunity counter as well as time outs for staging the jobs for different scheduling categories.

          Initially jobs will be eligible to schedule only on node local and after the (scheduling opportunity are greater then the number of nodes in cluster or timeout for node which ever comes first) will be graduated for Next Level which is Rack.

          Again Job will be waiting for (scheduling opportunity are greater then the number of nodes in cluster or timeout for node+ rack which ever comes first) And will be graduated for the next level which is off rack.

          And once the job is off-rack and it will scheduled immediately based on prior logic.

          Actually this approach is the combination of Yarn-80 and Fair scheduler delay scheduling algo which gives us the flexibility of staging the jobs between different levels and the same time using the scheduling opportunity counter which was already there.

          Please review the approach and let me know if this needs some improvement.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Hi Arun, In Hadoop-1 we have something called scheduling opportunities counter for a job which gets incremented every time is get the opportunity to get scheduled. In Yarn-80 we are using the same counter for delaying the schedule for Node. In this patch I used the scheduling opportunity counter as well as time outs for staging the jobs for different scheduling categories. Initially jobs will be eligible to schedule only on node local and after the (scheduling opportunity are greater then the number of nodes in cluster or timeout for node which ever comes first) will be graduated for Next Level which is Rack. Again Job will be waiting for (scheduling opportunity are greater then the number of nodes in cluster or timeout for node+ rack which ever comes first) And will be graduated for the next level which is off rack. And once the job is off-rack and it will scheduled immediately based on prior logic. Actually this approach is the combination of Yarn-80 and Fair scheduler delay scheduling algo which gives us the flexibility of staging the jobs between different levels and the same time using the scheduling opportunity counter which was already there. Please review the approach and let me know if this needs some improvement. Thanks, Mayank
          Hide
          Arun C Murthy added a comment -

          Karthik & Mayank - CS already has delay scheduling built-in, one area for improvement is to backport something like YARN-80 to branch-1.

          Show
          Arun C Murthy added a comment - Karthik & Mayank - CS already has delay scheduling built-in, one area for improvement is to backport something like YARN-80 to branch-1.
          Hide
          Mayank Bansal added a comment -

          Thanks Karthik for looking at the patch. If you have some comments please provide I will try to incorporate those asap.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Thanks Karthik for looking at the patch. If you have some comments please provide I will try to incorporate those asap. Thanks, Mayank
          Hide
          Karthik Kambatla added a comment -

          Thanks Mayank. The overall approach in the patch seems correct. I have a few code-specific comments, but it might be better to review the final patch.

          Show
          Karthik Kambatla added a comment - Thanks Mayank. The overall approach in the patch seems correct. I have a few code-specific comments, but it might be better to review the final patch.
          Hide
          Mayank Bansal added a comment -

          I am still working on adding more tests will update the latest soon.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - I am still working on adding more tests will update the latest soon. Thanks, Mayank
          Hide
          Mayank Bansal added a comment -

          Initial Patch for MR1

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Initial Patch for MR1 Thanks, Mayank
          Hide
          Mayank Bansal added a comment -

          Hi,

          Thanks Konst for your comments.

          I am working on MR-1 patch, will put it shortly.

          Thanks,
          Mayank

          Show
          Mayank Bansal added a comment - Hi, Thanks Konst for your comments. I am working on MR-1 patch, will put it shortly. Thanks, Mayank
          Hide
          Konstantin Shvachko added a comment -

          Task locality is important. Interesting that it is only necessary to hook Capacity Scheduler up to the logic that already existed in JobInProgress etc. I went over the general logic of the patch. It looks good. But I have several formatting and code organization comments.

          1. Append _PROPERTY to new config key constants, e.g. NODE_LOCALITY_DELAY_PROPERTY. Looks like other constants in CapacitySchedulerConf are like that.
          2. Bend longs lines.
          3. In CapacitySchedulerConf convert comments describing variables to a JavaDoc.
          4. In initializeDefaults() you should use capacity-scheduler not fairscheduler config variables. Also since you introduced constants for the keys, use them rather than the raw keys.
          5. JobInfo is confusing because there is already a class with that name. Call it something like JobLocality. I'd rather move it into JobQueuesManager, because the latter maintains the map of those
          6. Correct indentations in CapacityTaskScheduler, particularly eliminate all tabs, should be spaces only.
          7. Add spaces between arguments, operators, and in some LOG messages.
          8. Add empty lines between new methods.
          9. updateLocalityWaitTimes() and updateLastMapLocalityLevel() should belong to JobQueuesManager, imo.
          10. JobQueuesManager.infos is a map keyed with JobInProgress. It'd be better to use JobID as a key?
          11. In TaskSchedulingMgr you need only one version of obtainNewTask to be abstract, the one with cachelevel parameter. The other one should not be abstract and just call the abstract obtainNewTask() with cachelevel set to any.
          Show
          Konstantin Shvachko added a comment - Task locality is important. Interesting that it is only necessary to hook Capacity Scheduler up to the logic that already existed in JobInProgress etc. I went over the general logic of the patch. It looks good. But I have several formatting and code organization comments. Append _PROPERTY to new config key constants, e.g. NODE_LOCALITY_DELAY_PROPERTY. Looks like other constants in CapacitySchedulerConf are like that. Bend longs lines. In CapacitySchedulerConf convert comments describing variables to a JavaDoc. In initializeDefaults() you should use capacity-scheduler not fairscheduler config variables. Also since you introduced constants for the keys, use them rather than the raw keys. JobInfo is confusing because there is already a class with that name. Call it something like JobLocality. I'd rather move it into JobQueuesManager, because the latter maintains the map of those Correct indentations in CapacityTaskScheduler, particularly eliminate all tabs, should be spaces only. Add spaces between arguments, operators, and in some LOG messages. Add empty lines between new methods. updateLocalityWaitTimes() and updateLastMapLocalityLevel() should belong to JobQueuesManager, imo. JobQueuesManager.infos is a map keyed with JobInProgress. It'd be better to use JobID as a key? In TaskSchedulingMgr you need only one version of obtainNewTask to be abstract, the one with cachelevel parameter. The other one should not be abstract and just call the abstract obtainNewTask() with cachelevel set to any.
          Hide
          Mayank Bansal added a comment -

          Uploading the Updated patch

          Show
          Mayank Bansal added a comment - Uploading the Updated patch
          Hide
          Mayank Bansal added a comment -

          Initial patch for 22

          Show
          Mayank Bansal added a comment - Initial patch for 22
          Hide
          Mayank Bansal added a comment -

          @Amar

          Can you please please share the numbers for data locality which you are seeing? I am running Tera sort for my benchmarking.

          @Thomas

          2-5% tasks are off rack and rest are rack local.

          Show
          Mayank Bansal added a comment - @Amar Can you please please share the numbers for data locality which you are seeing? I am running Tera sort for my benchmarking. @Thomas 2-5% tasks are off rack and rest are rack local.
          Hide
          Thomas Graves added a comment -

          I'm curious. Is improving the data locality improving overall job performance/decreasing runtime? What is the other 50-60% - is it rack local or off rack?

          Show
          Thomas Graves added a comment - I'm curious. Is improving the data locality improving overall job performance/decreasing runtime? What is the other 50-60% - is it rack local or off rack?
          Hide
          Amar Kamat added a comment -

          Mayank,
          I assume that you are using Hadoop 0.22. The numbers that we are seeing (on 0.20.x) is different from what you have reported. IIRC, Hadoop 22 code is still the old Hadoop codebase (compared to 0.23/trunk) and should be similar to Hadoop 0.20. Can you re-run your experiments on 0.20.x (i.e branch 1.x) and share your finding?

          Show
          Amar Kamat added a comment - Mayank, I assume that you are using Hadoop 0.22. The numbers that we are seeing (on 0.20.x) is different from what you have reported. IIRC, Hadoop 22 code is still the old Hadoop codebase (compared to 0.23/trunk) and should be similar to Hadoop 0.20. Can you re-run your experiments on 0.20.x (i.e branch 1.x) and share your finding?

            People

            • Assignee:
              Mayank Bansal
              Reporter:
              Mayank Bansal
            • Votes:
              0 Vote for this issue
              Watchers:
              22 Start watching this issue

              Dates

              • Created:
                Updated:

                Development