Hadoop Map/Reduce
  1. Hadoop Map/Reduce
  2. MAPREDUCE-1463

Reducer should start faster for smaller jobs

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: jobtracker
    • Labels:
      None

      Description

      Our users often complain about the slowness of smaller ad-hoc jobs.
      The overhead to wait for the reducers to start in this case is significant.
      It will be good if we can start the reducer sooner in this case.

      1. MAPREDUCE-1463-v3.patch
        3 kB
        Scott Chen
      2. MAPREDUCE-1463-v2.patch
        3 kB
        Scott Chen
      3. MAPREDUCE-1463-v1.patch
        2 kB
        Scott Chen

        Issue Links

          Activity

          Hide
          Tsz Wo Nicholas Sze added a comment -

          How would you define "small"?

          Show
          Tsz Wo Nicholas Sze added a comment - How would you define "small"?
          Hide
          Todd Lipcon added a comment -

          Is this basically about changing the slowstart to be nonlinear? ie instead of just "start reducers when x% of maps are complete", factor in the total number of maps in the job as well?

          Show
          Todd Lipcon added a comment - Is this basically about changing the slowstart to be nonlinear? ie instead of just "start reducers when x% of maps are complete", factor in the total number of maps in the job as well?
          Hide
          Scott Chen added a comment -

          Our proposal is starting the reducers for job with few mappers or reducers.
          We make these two number configurable.
          I have uploaded the patch according to this proposal.

          Unit test will be included later.

          Show
          Scott Chen added a comment - Our proposal is starting the reducers for job with few mappers or reducers. We make these two number configurable. I have uploaded the patch according to this proposal. Unit test will be included later.
          Hide
          Todd Lipcon added a comment -

          Why not integrate this directly into JobInProgress.scheduleReduces() rather than in the fairscheduler? This should be a generally useful feature.

          Show
          Todd Lipcon added a comment - Why not integrate this directly into JobInProgress.scheduleReduces() rather than in the fairscheduler? This should be a generally useful feature.
          Hide
          Scott Chen added a comment -

          @Todd: Yes, that's a great idea. And this should logically be in scheduleReduces(). I will repost the patch soon.

          Show
          Scott Chen added a comment - @Todd: Yes, that's a great idea. And this should logically be in scheduleReduces(). I will repost the patch soon.
          Hide
          Todd Lipcon added a comment -

          Cool. The other thing I noticed is that the new configurations should be documented in mapred-site.xml (since you're moving to mapred proper)

          Show
          Todd Lipcon added a comment - Cool. The other thing I noticed is that the new configurations should be documented in mapred-site.xml (since you're moving to mapred proper)
          Hide
          Scott Chen added a comment -

          Followed Todd's suggestion to integrate this in JobInProgress.scheduleReduces() and up date the patch.
          I will do the documentation and unit test soon.
          Thanks for the suggestions.

          Show
          Scott Chen added a comment - Followed Todd's suggestion to integrate this in JobInProgress.scheduleReduces() and up date the patch. I will do the documentation and unit test soon. Thanks for the suggestions.
          Hide
          Todd Lipcon added a comment -

          I think the transferred logic is wrong. Shouldn't it be:

          return numMapTasks <= reduceRushMapsThreshold ||
                      numReduceTasks <= reduceRushReducesThreshold ||
                      finishedMapTasks >= completedMapsForReduceSlowstart;
          

          Also, I'm not sure that the design is quite right. If I have 1 map but 200 reduces, I don't want to rush the reduces, do I? That is to say, should the condition be && between the two rush parameters, or ||?

          Show
          Todd Lipcon added a comment - I think the transferred logic is wrong. Shouldn't it be: return numMapTasks <= reduceRushMapsThreshold || numReduceTasks <= reduceRushReducesThreshold || finishedMapTasks >= completedMapsForReduceSlowstart; Also, I'm not sure that the design is quite right. If I have 1 map but 200 reduces, I don't want to rush the reduces, do I? That is to say, should the condition be && between the two rush parameters, or ||?
          Hide
          Amar Kamat added a comment -

          Scott,

          • How do you define small jobs. Shouldnt it be based on total number of tasks instead of considering maps and reduces individually?
          • Why do we need special case for small jobs? If its for fairness then this piece of code rightly belongs to contrib/fairscheduler, no?
          • If not for fairness then what is the problem with the current framework w.r.t small jobs?
          • Can be fixed by simple (configuration-like) tweaking?
          • If not then whats the right fix.

          Wouldn't the reducers be scheduled faster if 'mapreduce.job.reduce.slowstart.completedmaps' is set to 0? If not then can we change the slowstart feature to get it right?

          Show
          Amar Kamat added a comment - Scott, How do you define small jobs. Shouldnt it be based on total number of tasks instead of considering maps and reduces individually? Why do we need special case for small jobs? If its for fairness then this piece of code rightly belongs to contrib/fairscheduler, no? If not for fairness then what is the problem with the current framework w.r.t small jobs? Can be fixed by simple (configuration-like) tweaking? If not then whats the right fix. Wouldn't the reducers be scheduled faster if 'mapreduce.job.reduce.slowstart.completedmaps' is set to 0? If not then can we change the slowstart feature to get it right?
          Hide
          Arun C Murthy added a comment -

          -1

          These knobs seem backwards - as both Todd and Amar have pointed out we could add heuristics to tweak mapreduce.job.reduce.slowstart.completedmaps automatically without adding more config knobs.

          Show
          Arun C Murthy added a comment - -1 These knobs seem backwards - as both Todd and Amar have pointed out we could add heuristics to tweak mapreduce.job.reduce.slowstart.completedmaps automatically without adding more config knobs.
          Hide
          Scott Chen added a comment -

          @Todd:
          Yes, you're right. The logic in the patch is wrong. The one you post is the correct logic. Sorry about the mistake.

          @Amar:

          How do you define small jobs. Shouldnt it be based on total number of tasks instead of considering maps and reduces individually?

          We want to start reducer faster in both the fewer mapper and fewer reducer cases.
          Because for fewer reducer case, starting reducer earlier is cheap anyway. And for fewer mapper case, the mapper finishes faster.
          But I think it may not be a bad idea if we take the total instead (it is simpler at least).

          Why do we need special case for small jobs? If its for fairness then this piece of code rightly belongs to contrib/fairscheduler, no?
          If not for fairness then what is the problem with the current framework w.r.t small jobs?

          Handling the special case for small jobs increase the overall latency which gives the users better experience.

          Can be fixed by simple (configuration-like) tweaking?
          If not then whats the right fix.

          For experienced users, setting completedmaps=0 does fix this problem. But it will be nice if this can be automatically done for other users who do not know how to configure hadoop.

          @Arun:
          Thanks for the comments. I agree. Tweaking mapreduce.job.reduce.slowstart.completedmaps in the job client side should be a cleaner way for this one. For experienced users, settting completedmaps to 0 in the client side will make their small jobs finish faster. But it would be nice if some automatic decision can be done here such that the normal users don't have to learn how to configure an extra parameter.

          The point here is that for some cases (small job, small number of mappers or reducers) we should not be spending time on waiting the reducers to start because the waiting time is significant (or it is cheap to start the reducer earlier). Automatically reducing the latency makes our user happy.

          Show
          Scott Chen added a comment - @Todd: Yes, you're right. The logic in the patch is wrong. The one you post is the correct logic. Sorry about the mistake. @Amar: How do you define small jobs. Shouldnt it be based on total number of tasks instead of considering maps and reduces individually? We want to start reducer faster in both the fewer mapper and fewer reducer cases. Because for fewer reducer case, starting reducer earlier is cheap anyway. And for fewer mapper case, the mapper finishes faster. But I think it may not be a bad idea if we take the total instead (it is simpler at least). Why do we need special case for small jobs? If its for fairness then this piece of code rightly belongs to contrib/fairscheduler, no? If not for fairness then what is the problem with the current framework w.r.t small jobs? Handling the special case for small jobs increase the overall latency which gives the users better experience. Can be fixed by simple (configuration-like) tweaking? If not then whats the right fix. For experienced users, setting completedmaps=0 does fix this problem. But it will be nice if this can be automatically done for other users who do not know how to configure hadoop. @Arun: Thanks for the comments. I agree. Tweaking mapreduce.job.reduce.slowstart.completedmaps in the job client side should be a cleaner way for this one. For experienced users, settting completedmaps to 0 in the client side will make their small jobs finish faster. But it would be nice if some automatic decision can be done here such that the normal users don't have to learn how to configure an extra parameter. The point here is that for some cases (small job, small number of mappers or reducers) we should not be spending time on waiting the reducers to start because the waiting time is significant (or it is cheap to start the reducer earlier). Automatically reducing the latency makes our user happy.
          Hide
          Amar Kamat added a comment -

          What should be the behavior where total number of maps and reducers are less (i.e a small job for now) but takes huge amount of time to finish. For example the map takes a day to run while the reduces are also compute intensive. In such a case would we still consider the job as small job? I think what we want to capture is the job behavior (fast finishing job versus others). Using task counts might not be sufficient.

          Scott, wouldn't this problem be solved if you set 'mapreduce.job.reduce.slowstart.completedmaps' to a default value of 0 (instead of 0.5) for all your users?

          Show
          Amar Kamat added a comment - What should be the behavior where total number of maps and reducers are less (i.e a small job for now) but takes huge amount of time to finish. For example the map takes a day to run while the reduces are also compute intensive. In such a case would we still consider the job as small job? I think what we want to capture is the job behavior (fast finishing job versus others). Using task counts might not be sufficient. Scott, wouldn't this problem be solved if you set 'mapreduce.job.reduce.slowstart.completedmaps' to a default value of 0 (instead of 0.5) for all your users?
          Hide
          Scott Chen added a comment -

          @Amar: Sorry for the late reply. I have just got back from vacation. About your long running mapper argument I think you are right. Using task counts is not sufficient. Maybe we need more information than task counts to determine when to delay the reducers. Can you give me some suggestions? Setting mapreduce.job.reduce.slowstart.completedmaps to zero does increase the latency. But it hurts the reducer utilization.

          I think the trade-off here is that we want to delay the reducers to increase the reducer utilization but we also want to minimize the impact of this delay for smaller jobs because this delay is significant for smaller jobs but is OK for large jobs. So these two cases should be treated differently. There should be a way to balance the reducer utilization and small job latency, thoughts?

          Show
          Scott Chen added a comment - @Amar: Sorry for the late reply. I have just got back from vacation. About your long running mapper argument I think you are right. Using task counts is not sufficient. Maybe we need more information than task counts to determine when to delay the reducers. Can you give me some suggestions? Setting mapreduce.job.reduce.slowstart.completedmaps to zero does increase the latency. But it hurts the reducer utilization. I think the trade-off here is that we want to delay the reducers to increase the reducer utilization but we also want to minimize the impact of this delay for smaller jobs because this delay is significant for smaller jobs but is OK for large jobs. So these two cases should be treated differently. There should be a way to balance the reducer utilization and small job latency, thoughts?
          Hide
          Todd Lipcon added a comment -

          Stepping back a bit to think about the model. Correct me if you disagree:

          Our end goal is that the reducers finish fetching reduce output as soon as possible after the last mapper finishes, but that the reducers are started as late as possible, so they don't occupy slots and hurt utilization.

          So, let's assume that the mappers generate data at some rate M. The reducers can fetch data at some maximum rate R. It's the ratio of M/R that determines when to start the reducers fetching. For example, if the mappers generate data faster than the reducers can fetch it, it behooves us to start the reduce fetch immediately when the job starts. If the reducers can fetch twice as fast as the mappers can output, we want to start the reducers halfway through the map phase.

          Since both kinds of tasks have some kind of startup cost, it's as if the average rate is slowed down by a factor that's determined by the number of tasks. In the case of 200 mappers and 1 reducers, it's as if the map output speed has been lowered (since the fixed costs of the map tasks slow down map completion), and thus we can afford to wait until later to start the reducer. If you have 1 mapper and 1 reducer, even for the exact same job, the ratio swings as if the map side output faster, and thus we want to start the reduce early.

          This is of course a much simplified model, but I think it's worth discussing this on somewhat abstract terms before we discuss the implementation details. One factor I'm ignoring above is the limiting that the reducer does with respect to particular hosts - that is to say, the reducer fetch speed varies with the number of unique hosts, not just the number of mappers.

          Show
          Todd Lipcon added a comment - Stepping back a bit to think about the model. Correct me if you disagree: Our end goal is that the reducers finish fetching reduce output as soon as possible after the last mapper finishes, but that the reducers are started as late as possible, so they don't occupy slots and hurt utilization. So, let's assume that the mappers generate data at some rate M. The reducers can fetch data at some maximum rate R. It's the ratio of M/R that determines when to start the reducers fetching. For example, if the mappers generate data faster than the reducers can fetch it, it behooves us to start the reduce fetch immediately when the job starts. If the reducers can fetch twice as fast as the mappers can output, we want to start the reducers halfway through the map phase. Since both kinds of tasks have some kind of startup cost, it's as if the average rate is slowed down by a factor that's determined by the number of tasks. In the case of 200 mappers and 1 reducers, it's as if the map output speed has been lowered (since the fixed costs of the map tasks slow down map completion), and thus we can afford to wait until later to start the reducer. If you have 1 mapper and 1 reducer, even for the exact same job, the ratio swings as if the map side output faster, and thus we want to start the reduce early. This is of course a much simplified model, but I think it's worth discussing this on somewhat abstract terms before we discuss the implementation details. One factor I'm ignoring above is the limiting that the reducer does with respect to particular hosts - that is to say, the reducer fetch speed varies with the number of unique hosts, not just the number of mappers.
          Hide
          Scott Chen added a comment -

          @Todd: That is a good point. If the reducer can later on catch up with the mapper, then there is no harm for the delay at the beginning.

          @Arun: I know you don't like this because it increases the complexity. Will you feel more comfortable if we move this just inside fairscheduler?

          Show
          Scott Chen added a comment - @Todd: That is a good point. If the reducer can later on catch up with the mapper, then there is no harm for the delay at the beginning. @Arun: I know you don't like this because it increases the complexity. Will you feel more comfortable if we move this just inside fairscheduler?
          Hide
          dhruba borthakur added a comment -

          @Arun: are you suggesting that the job submission process first generate the input splits, then determines if the number of map tasks is smaller than a certain value, and if so then set mapreduce.job.reduce.slowstart.completedmaps to zero?

          Show
          dhruba borthakur added a comment - @Arun: are you suggesting that the job submission process first generate the input splits, then determines if the number of map tasks is smaller than a certain value, and if so then set mapreduce.job.reduce.slowstart.completedmaps to zero?
          Hide
          Arun C Murthy added a comment -

          Actually, assuming we have a reasonable model, I'd do it on the JobTracker, maybe in JobInProgress.initTasks i.e. during job-initialization.

          Show
          Arun C Murthy added a comment - Actually, assuming we have a reasonable model, I'd do it on the JobTracker, maybe in JobInProgress.initTasks i.e. during job-initialization.
          Hide
          Scott Chen added a comment -

          I think improving the timing for launching reducers is not just for small jobs.
          In the case of FairSchduler, for larger jobs with 10000+ mappers, the mappers needs several batches to be fully scheduled.
          In this case if we launch the reducer when 5% mapper finished, those reducers will just be idling.

          Here is the trade-off.
          If we launch the reducer too late, we lose the parallel execution for the mapper execution and reducer shuffling.
          But if we launch the reducer too early, we waste the reducer slots because they have to wait the mappers to finish.

          The optimal case for this is that we launch the reducers as late as possible while the reducer shuffling phase finishes right after the last mapper finished.

          The goal is to somehow estimate the mapper finish time based on the information we have and launch the reducers at the right moment.
          I think this decision should depend on TaskScheduler because different scheduling policy affects the mapper finish time.

          Thoughts?

          Show
          Scott Chen added a comment - I think improving the timing for launching reducers is not just for small jobs. In the case of FairSchduler, for larger jobs with 10000+ mappers, the mappers needs several batches to be fully scheduled. In this case if we launch the reducer when 5% mapper finished, those reducers will just be idling. Here is the trade-off. If we launch the reducer too late, we lose the parallel execution for the mapper execution and reducer shuffling. But if we launch the reducer too early, we waste the reducer slots because they have to wait the mappers to finish. The optimal case for this is that we launch the reducers as late as possible while the reducer shuffling phase finishes right after the last mapper finished. The goal is to somehow estimate the mapper finish time based on the information we have and launch the reducers at the right moment. I think this decision should depend on TaskScheduler because different scheduling policy affects the mapper finish time. Thoughts?
          Hide
          Joydeep Sen Sarma added a comment -

          on a somewhat different note: i frequently see reducers not being scheduled (to wait for map completions) even when the cluster has tons of idle reduce slots. that makes no sense (especially when pre-emption is enabled). that seems to suggest that some of the heuristics should take cluster load into account.

          Show
          Joydeep Sen Sarma added a comment - on a somewhat different note: i frequently see reducers not being scheduled (to wait for map completions) even when the cluster has tons of idle reduce slots. that makes no sense (especially when pre-emption is enabled). that seems to suggest that some of the heuristics should take cluster load into account.

            People

            • Assignee:
              Scott Chen
              Reporter:
              Scott Chen
            • Votes:
              0 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:

                Development