Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Duplicate
    • Affects Version/s: None
    • Fix Version/s: 0.23.0
    • Component/s: client, jobtracker
    • Labels:
      None
    • Release Note:
      An efficient implementation of small jobs by running all tasks in the same JVM, there-by effecting lower latency.

      Description

      Currently very small map-reduce jobs suffer from latency issues due to overheads in Hadoop Map-Reduce such as scheduling, jvm startup etc. We've periodically tried to optimize all parts of framework to achieve lower latencies.

      I'd like to turn the problem around a little bit. I propose we allow very small jobs to run as a single task job with multiple maps and reduces i.e. similar to our current implementation of the LocalJobRunner. Thus, under certain conditions (maybe user-set configuration, or if input data is small i.e. less a DFS blocksize) we could launch a special task which will run all maps in a serial manner, followed by the reduces. This would really help small jobs achieve significantly smaller latencies, thanks to lesser scheduling overhead, jvm startup, lack of shuffle over the network etc.

      This would be a huge benefit, especially on large clusters, to small Hive/Pig queries.

      Thoughts?

        Issue Links

          Activity

          Hide
          Arun C Murthy added a comment -

          Duplicate of MAPREDUCE-2405.

          Show
          Arun C Murthy added a comment - Duplicate of MAPREDUCE-2405 .
          Hide
          Arun C Murthy added a comment -

          Fixed via MAPREDUCE-279.

          Show
          Arun C Murthy added a comment - Fixed via MAPREDUCE-279 .
          Hide
          Greg Roelofs added a comment -

          MR-1220.v1b.sshot-03-jobdetailshistory.jsp.png

          screenshot of the jobdetailshistory page (uber-job complete)

          This was taken before the setup and cleanup tasks were moved inside UberTask. The UI didn't get updated for that, IIRC, so I think it shows lots of zeros on the top and bottom lines now. (Another TODO item...)

          Note also the wrappable column heading, courtesy of "<wbr>" pseudo-tags. (Browsers optionally break there, but cut-and-paste doesn't pick up a spurious space. Highly recommended for other fat table cells such as counter names and types, job names, hostnames, etc. I should file a separate JIRA...)

          Show
          Greg Roelofs added a comment - MR-1220.v1b.sshot-03-jobdetailshistory.jsp.png screenshot of the jobdetailshistory page (uber-job complete) This was taken before the setup and cleanup tasks were moved inside UberTask. The UI didn't get updated for that, IIRC, so I think it shows lots of zeros on the top and bottom lines now. (Another TODO item...) Note also the wrappable column heading, courtesy of "<wbr>" pseudo-tags. (Browsers optionally break there, but cut-and-paste doesn't pick up a spurious space. Highly recommended for other fat table cells such as counter names and types, job names, hostnames, etc. I should file a separate JIRA...)
          Hide
          Greg Roelofs added a comment -

          screenshot of jobdetails page (uber-job still running)

          The "Job Scheduling information" line shows up again here, but the top table is also modified, as is the title of the graph. Trivial stuff, but it provides a clue to the user in case the optimization is less transparent than intended.

          Show
          Greg Roelofs added a comment - screenshot of jobdetails page (uber-job still running) The "Job Scheduling information" line shows up again here, but the top table is also modified, as is the title of the graph. Trivial stuff, but it provides a clue to the user in case the optimization is less transparent than intended.
          Hide
          Greg Roelofs added a comment -

          screenshot of top-level (multi-job) JobTracker page

          Main addition is the UberTask details under the "Job Scheduling Information" column at far right. The uber stuff gets appended if there's anything else there (as is the case with the capacity scheduler).

          Colors: pale yellow for running jobs; pale pink for failed/killed jobs; pale green for successful jobs. (Not uber-specific, but trivial and in the same place as some of the other changes.)

          Show
          Greg Roelofs added a comment - screenshot of top-level (multi-job) JobTracker page Main addition is the UberTask details under the "Job Scheduling Information" column at far right. The uber stuff gets appended if there's anything else there (as is the case with the capacity scheduler). Colors: pale yellow for running jobs; pale pink for failed/killed jobs; pale green for successful jobs. (Not uber-specific, but trivial and in the same place as some of the other changes.)
          Hide
          Greg Roelofs added a comment -

          Status: basically functional; I believe all otherwise-passing unit tests still pass. Unfortunately, because of the duration over which patches were committed (and intervening commits), there's no easy way (that I'm aware of) to merge everything back into one patch. I'm currently working on the "MR v2" version (see MAPREDUCE-279), which is much less hackish and shares very little with the version above. I'm not sure this version has a future, but the patches are here if anyone is interested.

          Known bugs:

          • "Re-localization" is missing. Specifically, because all subtasks run in the same JVM, and Java doesn't have chdir(), there's no clean way to isolate them from each other. If any but the last sub-MapTask does something obnoxious (e.g., delete a distcache symlink or create a file that any other subtask wants to create), things will break. Obviously this is a problem for an optimization that's supposed to be (mostly) transparent to users.
          • Progress is still broken, apparently. Everything seemed to check out when I had gobs of debugging in there, but it doesn't make it to the UI (including the client) as frequently as it should. No clue what broke.
          • The max-input-size decision criterion (in JobInProgress) should check the default block size (if appropriate) for the actual input filesystem, not use a hardcoded HDFS config that's not necessarily available to tasktracker nodes anyway.
          • The UI changes are incomplete, and there are some 404 and error links in some cases. Basically, the whole idea of masquerading an UberTask as a ReduceTask, yet exposing it to the user in some cases, is awkward, and there are a lot of JSP pages to handle.

          There are also some cleanup items (test and potentially enable reduce-only case; fix memory criterion in uber-decision for map-only [and reduce-only] cases; clean up TaskStatus mess; instead of renaming file.out to map_#.out, always use attemptID.out; etc.). However, those kind of pale in comparison to the overall intrusive grubbiness of the patch. :-/

          Show
          Greg Roelofs added a comment - Status: basically functional; I believe all otherwise-passing unit tests still pass. Unfortunately, because of the duration over which patches were committed (and intervening commits), there's no easy way (that I'm aware of) to merge everything back into one patch. I'm currently working on the "MR v2" version (see MAPREDUCE-279 ), which is much less hackish and shares very little with the version above. I'm not sure this version has a future, but the patches are here if anyone is interested. Known bugs: "Re-localization" is missing. Specifically, because all subtasks run in the same JVM, and Java doesn't have chdir(), there's no clean way to isolate them from each other. If any but the last sub-MapTask does something obnoxious (e.g., delete a distcache symlink or create a file that any other subtask wants to create), things will break. Obviously this is a problem for an optimization that's supposed to be (mostly) transparent to users. Progress is still broken, apparently. Everything seemed to check out when I had gobs of debugging in there, but it doesn't make it to the UI (including the client) as frequently as it should. No clue what broke. The max-input-size decision criterion (in JobInProgress) should check the default block size (if appropriate) for the actual input filesystem, not use a hardcoded HDFS config that's not necessarily available to tasktracker nodes anyway. The UI changes are incomplete, and there are some 404 and error links in some cases. Basically, the whole idea of masquerading an UberTask as a ReduceTask, yet exposing it to the user in some cases, is awkward, and there are a lot of JSP pages to handle. There are also some cleanup items (test and potentially enable reduce-only case; fix memory criterion in uber-decision for map-only [and reduce-only] cases; clean up TaskStatus mess; instead of renaming file.out to map_#.out, always use attemptID.out; etc.). However, those kind of pale in comparison to the overall intrusive grubbiness of the patch. :-/
          Hide
          Greg Roelofs added a comment -

          Part 8/8: MR-1220.v15.ytrunk-hadoop-mapreduce.delta.patch.txt

          correct fix for commit bugs (vs. version 12b in "Part 5/8" above)

          [original comment: correct fix for commit bugs (vs. version 8b)]

          Show
          Greg Roelofs added a comment - Part 8/8: MR-1220.v15.ytrunk-hadoop-mapreduce.delta.patch.txt correct fix for commit bugs (vs. version 12b in "Part 5/8" above) [original comment: correct fix for commit bugs (vs. version 8b)]
          Hide
          Greg Roelofs added a comment -

          Part 7/8: MR-1220.v14b.ytrunk-hadoop-mapreduce.delta.patch.txt

          add counter feedback for users/QA

          Show
          Greg Roelofs added a comment - Part 7/8: MR-1220.v14b.ytrunk-hadoop-mapreduce.delta.patch.txt add counter feedback for users/QA
          Hide
          Greg Roelofs added a comment -

          Part 6/8: MR-1220.v13.ytrunk-hadoop-mapreduce.delta.patch.txt

          add support for compressed map outputs

          Show
          Greg Roelofs added a comment - Part 6/8: MR-1220.v13.ytrunk-hadoop-mapreduce.delta.patch.txt add support for compressed map outputs
          Hide
          Greg Roelofs added a comment -

          Part 5/8: MR-1220.v10e-v11c-v12b.ytrunk-hadoop-mapreduce.delta.patch.txt

          fix remaining unit tests; disable uber-Chain combo; fix _temporary dir-name bug

          Show
          Greg Roelofs added a comment - Part 5/8: MR-1220.v10e-v11c-v12b.ytrunk-hadoop-mapreduce.delta.patch.txt fix remaining unit tests; disable uber-Chain combo; fix _temporary dir-name bug
          Hide
          Greg Roelofs added a comment -

          Part 4/8: MR-1220.v9c.ytrunk-hadoop-mapreduce.delta.patch.txt

          fix abortTask() cleanup; fix TASK_CLEANUP case; fix 10 more unit tests

          Show
          Greg Roelofs added a comment - Part 4/8: MR-1220.v9c.ytrunk-hadoop-mapreduce.delta.patch.txt fix abortTask() cleanup; fix TASK_CLEANUP case; fix 10 more unit tests
          Hide
          Greg Roelofs added a comment -

          Part 3/8: MR-1220.v8b.ytrunk-hadoop-mapreduce.delta.patch.txt

          fix map-only commit bug; fix 4 unit tests

          (Part 2/8 simply disabled ubermode by default.)

          Show
          Greg Roelofs added a comment - Part 3/8: MR-1220.v8b.ytrunk-hadoop-mapreduce.delta.patch.txt fix map-only commit bug; fix 4 unit tests (Part 2/8 simply disabled ubermode by default.)
          Hide
          Greg Roelofs added a comment -

          Part 2/8: MR-1220.v7.ytrunk-hadoop-mapreduce.delta.patch.txt

          Show
          Greg Roelofs added a comment - Part 2/8: MR-1220.v7.ytrunk-hadoop-mapreduce.delta.patch.txt
          Hide
          Greg Roelofs added a comment -

          Part 1/8. The "v6" (and similar in following patches) matches the commit-comments in the yahoo-merge branch.

          Show
          Greg Roelofs added a comment - Part 1/8. The "v6" (and similar in following patches) matches the commit-comments in the yahoo-merge branch.
          Hide
          Greg Roelofs added a comment -

          Not sure if this is worthy of its own HADOOP-xxx issue, but it was useful while debugging UberTask's 3-level Progress/phase tree. (Progress needs more help than this, but that's a topic for another day.)

          Show
          Greg Roelofs added a comment - Not sure if this is worthy of its own HADOOP-xxx issue, but it was useful while debugging UberTask's 3-level Progress/phase tree. (Progress needs more help than this, but that's a topic for another day.)
          Hide
          Greg Roelofs added a comment -

          If you've been watching the commits go by on Owen's yahoo-merge branch, about five months' worth of work on this was included. Unfortunately, I screwed up my most recent push to Yahoo's internal git repo, and as a result, every internal/temporary/debug commit was exposed, which amounts to a lot of noise (> 3 dozen extra commits).

          I'll post the ~8 "real" patches corresponding to all of that here, along with a dump() method for Progress (in common), which was useful for debugging and may be needed again. There are also a few screenshots, but I'll probably need to scrub some internal hostnames before posting those.

          Show
          Greg Roelofs added a comment - If you've been watching the commits go by on Owen's yahoo-merge branch, about five months' worth of work on this was included. Unfortunately, I screwed up my most recent push to Yahoo's internal git repo, and as a result, every internal/temporary/debug commit was exposed, which amounts to a lot of noise (> 3 dozen extra commits). I'll post the ~8 "real" patches corresponding to all of that here, along with a dump() method for Progress (in common), which was useful for debugging and may be needed again. There are also a few screenshots, but I'll probably need to scrub some internal hostnames before posting those.
          Hide
          Greg Roelofs added a comment -

          I've been looking at what it will take to extend this from Arun's February sketch to something that will actually schedule and run small jobs. At the moment it looks like it will largely avoid JobTracker; most of the action seems to be in JobInProgress (particularly initTasks() and obtainNew*Task()), TaskInProgress (findNew*Task(), getTaskToRun(), addRunningTask()), and the scheduler (assignTasks()).

          I haven't asked Arun what he originally had in mind, but it seems that there are two fairly obvious approaches:

          • treat UberTask (or MetaTask or MultiTask or AllInOneTask or ...) as a third kind of Task, not exactly like either MapTask or ReduceTask but a peer (more or less) to both
          • treat UberTask as a variant (subclass) of MapTask

          I see the first approach as conceptually cleaner, and some of its implementation details would be cleaner as well, but overall it's harder to implement: there are lots of places where there's map-vs-reduce logic (occasionally with special setup/cleanup cases), and most of it would require modifications. The second approach seems slightly hackish, but it has at least that one big advantage: many of the map-vs-reduce bits of code would not require changes - in particular, I don't believe it would be necessary to touch the schedulers (and we're up to, what, four at this point?) since they'd simply see a job containing one map and no reduces.

          Thoughts? (I don't yet have a strong opinion myself, though I'm interested in getting a proof of concept running quickly so I/we can see what works and whether it's a net win in the first place; from that perspective, the latter approach may be better.)

          Either way, there will be changes needed in the UI, metrics, and other accounting to surface the tasks-within-a-task details, as well as the obvious configuration-related changes. Retry logic, speculation, etc., are still unclear (to me, anyway).

          Stab at a couple of the other upstream questions:

          Will users be able to set a number of reduce tasks greater than one? Or are you limited to at most one reduce task?

          The latter, at least for now. This is somewhat related to the question of serial execution; i.e., if it's serial, the only reason you would want multiple reduces is if a single one is too big to fit in memory, and if that's the case, it's arguably not a "small job" anymore.

          i am still a little dubious about whether this is ambitious enough. particularly - why serial execution? to me local execution == exploiting resources available in a single box. which are 8 core today and on the way up.

          Well, generally multiple cores => multiple slots, at which point you let the scheduler figure it out. Chris mentioned that there's a JIRA to extend the LocalJobRunner in this direction (MAPREDUCE-434?), and if the currently proposed version of this one works out, an obvious next step would be to look at similar extensions. But this is still an untested optimization, and all the usual caveats about premature optimization apply.

          I don't know how Arun looked at the problem - I'm guessing the motivation was empirical, based on the behavior of Oozie workflows and Pig jobs - but I view it as way to make task granularity a bit more homogeneous by packing multiple, too-small tasks into larger containers. My mental model is that that will tend to make the scheduler more efficient - but also that trying to do too much may begin to work at cross-purposes with the scheduler, i.e., one probably doesn't want to create a secondary scheduler (trying to tune both halves could get very messy).

          it does seem that a dispatcher facility at the lowest layer that can juggle between different hadoop clusters is useful generically (across Hive/Pig/native-Hadoop etc.) - but that's not quite the same as what's proposed here - is it?

          Nope. But I think other groups are looking at stuff like that.

          Show
          Greg Roelofs added a comment - I've been looking at what it will take to extend this from Arun's February sketch to something that will actually schedule and run small jobs. At the moment it looks like it will largely avoid JobTracker; most of the action seems to be in JobInProgress (particularly initTasks() and obtainNew*Task() ), TaskInProgress ( findNew*Task() , getTaskToRun() , addRunningTask() ), and the scheduler ( assignTasks() ). I haven't asked Arun what he originally had in mind, but it seems that there are two fairly obvious approaches: treat UberTask (or MetaTask or MultiTask or AllInOneTask or ...) as a third kind of Task, not exactly like either MapTask or ReduceTask but a peer (more or less) to both treat UberTask as a variant (subclass) of MapTask I see the first approach as conceptually cleaner, and some of its implementation details would be cleaner as well, but overall it's harder to implement: there are lots of places where there's map-vs-reduce logic (occasionally with special setup/cleanup cases), and most of it would require modifications. The second approach seems slightly hackish, but it has at least that one big advantage: many of the map-vs-reduce bits of code would not require changes - in particular, I don't believe it would be necessary to touch the schedulers (and we're up to, what, four at this point?) since they'd simply see a job containing one map and no reduces. Thoughts? (I don't yet have a strong opinion myself, though I'm interested in getting a proof of concept running quickly so I/we can see what works and whether it's a net win in the first place; from that perspective, the latter approach may be better.) Either way, there will be changes needed in the UI, metrics, and other accounting to surface the tasks-within-a-task details, as well as the obvious configuration-related changes. Retry logic, speculation, etc., are still unclear (to me, anyway). Stab at a couple of the other upstream questions: Will users be able to set a number of reduce tasks greater than one? Or are you limited to at most one reduce task? The latter, at least for now. This is somewhat related to the question of serial execution; i.e., if it's serial, the only reason you would want multiple reduces is if a single one is too big to fit in memory, and if that's the case, it's arguably not a "small job" anymore. i am still a little dubious about whether this is ambitious enough. particularly - why serial execution? to me local execution == exploiting resources available in a single box. which are 8 core today and on the way up. Well, generally multiple cores => multiple slots, at which point you let the scheduler figure it out. Chris mentioned that there's a JIRA to extend the LocalJobRunner in this direction ( MAPREDUCE-434 ?), and if the currently proposed version of this one works out, an obvious next step would be to look at similar extensions. But this is still an untested optimization, and all the usual caveats about premature optimization apply. I don't know how Arun looked at the problem - I'm guessing the motivation was empirical, based on the behavior of Oozie workflows and Pig jobs - but I view it as way to make task granularity a bit more homogeneous by packing multiple, too-small tasks into larger containers. My mental model is that that will tend to make the scheduler more efficient - but also that trying to do too much may begin to work at cross-purposes with the scheduler, i.e., one probably doesn't want to create a secondary scheduler (trying to tune both halves could get very messy). it does seem that a dispatcher facility at the lowest layer that can juggle between different hadoop clusters is useful generically (across Hive/Pig/native-Hadoop etc.) - but that's not quite the same as what's proposed here - is it? Nope. But I think other groups are looking at stuff like that.
          Hide
          Greg Roelofs added a comment -

          Oops, here's the real updated patch. (Forgot to "git add" the two new files, sigh.)

          Show
          Greg Roelofs added a comment - Oops, here's the real updated patch. (Forgot to "git add" the two new files, sigh.)
          Hide
          Greg Roelofs added a comment -

          Updated version of Arun's prototype patch; compiles cleanly, but not tested beyond that.

          Show
          Greg Roelofs added a comment - Updated version of Arun's prototype patch; compiles cleanly, but not tested beyond that.
          Hide
          Joydeep Sen Sarma added a comment -

          interesting jira! i have been clamoring for something like this internally .. (am totally at the receiving end of these days)

          i am still a little dubious about whether this is ambitious enough. particularly - why serial execution? to me local execution == exploiting resources available in a single box. which are 8 core today and on the way up.

          my suggestion internally was to have a set of machines running single node hadoop clusters and dispatch to one of those (randomly or using a load-balancer) when job size is small. i am struggling to see why i should like this solution instead.

          it does seem that a dispatcher facility at the lowest layer that can juggle between different hadoop clusters is useful generically (across Hive/Pig/native-Hadoop etc.) - but that's not quite the same as what's proposed here - is it?

          Show
          Joydeep Sen Sarma added a comment - interesting jira! i have been clamoring for something like this internally .. (am totally at the receiving end of these days) i am still a little dubious about whether this is ambitious enough. particularly - why serial execution? to me local execution == exploiting resources available in a single box. which are 8 core today and on the way up. my suggestion internally was to have a set of machines running single node hadoop clusters and dispatch to one of those (randomly or using a load-balancer) when job size is small. i am struggling to see why i should like this solution instead. it does seem that a dispatcher facility at the lowest layer that can juggle between different hadoop clusters is useful generically (across Hive/Pig/native-Hadoop etc.) - but that's not quite the same as what's proposed here - is it?
          Hide
          Tom White added a comment -

          Most of the effort involved teasing out the framework in the MapTask and ReduceTask to allow several components such as MapOutputBuffer, ReduceValuesIterator etc. to be used as 'pluggable' components.

          Interesting. MAPREDUCE-326 has a proposal for making these components pluggable, which might make the work of this JIRA simpler.

          Show
          Tom White added a comment - Most of the effort involved teasing out the framework in the MapTask and ReduceTask to allow several components such as MapOutputBuffer, ReduceValuesIterator etc. to be used as 'pluggable' components. Interesting. MAPREDUCE-326 has a proposal for making these components pluggable, which might make the work of this JIRA simpler.
          Hide
          Arun C Murthy added a comment -

          I spent a long (and happy) weekend building a half-baked prototype for this...

          Essentially, I've introduced a new kind of task, called "Uber Task", half in jest. I've got it to mimic the old local job-runner by running all maps serially and then a single reduce. It needs a lot more work to fix things on the JobTracker, TaskTracker, Scheduler and so on. Most of the effort involved teasing out the framework in the MapTask and ReduceTask to allow several components such as MapOutputBuffer, ReduceValuesIterator etc. to be used as 'pluggable' components.

          Show
          Arun C Murthy added a comment - I spent a long (and happy) weekend building a half-baked prototype for this... Essentially, I've introduced a new kind of task, called "Uber Task", half in jest. I've got it to mimic the old local job-runner by running all maps serially and then a single reduce. It needs a lot more work to fix things on the JobTracker, TaskTracker, Scheduler and so on. Most of the effort involved teasing out the framework in the MapTask and ReduceTask to allow several components such as MapOutputBuffer, ReduceValuesIterator etc. to be used as 'pluggable' components.
          Hide
          Aaron Kimball added a comment -

          Arun,

          Thank you for explaining your use cases above more thoroughly. Given the nature of clusters with submission nodes, etc., this does seem like it would be valuable functionality. As such, it can/should be added. +1 on the direction.

          Maybe I was jumping the gun on reading your description; you mentioned the LocalJobRunner, so I thought you were off-the-bat proposing to build another *JobRunner. Since we already have three job runners (the usual one, the local one, and the isolation runner), all of which have their own quirks, idiosyncrasies and bugs, I would be nervous about yet another one of these which will have its own slightly deviant semantics, and hope that we could reuse a lot of the existing task deployment code.

          Some questions about the proposal itself:

          • Will users be able to set a number of reduce tasks greater than one? Or are you limited to at most one reduce task?
          • How does this work with speculative execution, if at all?
          • Are all InputSplits associated with the task delivered all-at-once from the JobTracker? Or does this "super-task" for the job fetch them one-at-a-time? For that matter, will this be represented in the JobTracker as multiple InputSplits, or just one?
            • How is InputSplit locality information regarded? If we want to map over (for example) three small files, and FileInputFormat enumerates three InputSplits, each of which have different locality hints, how does the JobTracker pick where to schedule this job?
          • How do these sorts of jobs interact with the resource allocation algorithms used by the Fair/Capacity Schedulers?

          Thanks,

          • Aaron
          Show
          Aaron Kimball added a comment - Arun, Thank you for explaining your use cases above more thoroughly. Given the nature of clusters with submission nodes, etc., this does seem like it would be valuable functionality. As such, it can/should be added. +1 on the direction. Maybe I was jumping the gun on reading your description; you mentioned the LocalJobRunner, so I thought you were off-the-bat proposing to build another *JobRunner. Since we already have three job runners (the usual one, the local one, and the isolation runner), all of which have their own quirks, idiosyncrasies and bugs, I would be nervous about yet another one of these which will have its own slightly deviant semantics, and hope that we could reuse a lot of the existing task deployment code. Some questions about the proposal itself: Will users be able to set a number of reduce tasks greater than one? Or are you limited to at most one reduce task? How does this work with speculative execution, if at all? Are all InputSplits associated with the task delivered all-at-once from the JobTracker? Or does this "super-task" for the job fetch them one-at-a-time? For that matter, will this be represented in the JobTracker as multiple InputSplits, or just one? How is InputSplit locality information regarded? If we want to map over (for example) three small files, and FileInputFormat enumerates three InputSplits, each of which have different locality hints, how does the JobTracker pick where to schedule this job? How do these sorts of jobs interact with the resource allocation algorithms used by the Fair/Capacity Schedulers? Thanks, Aaron
          Hide
          Philip Zeyliger added a comment -

          Arun,

          I'm definitely +1 on having small jobs trigger one JVM start instead of 3+N (setup, N maps, reduce, cleanup) for small N.

          Implementation-wise, I'm wary of adding yet more dimensions and special-cases to the JobTracker. Does one of these "quick" jobs take a map slot, scheduling wise, or does it take both a map and a reduce slot? Does the framework make the optimization itself, or do users ask for it?

          – Philip

          Show
          Philip Zeyliger added a comment - Arun, I'm definitely +1 on having small jobs trigger one JVM start instead of 3+N (setup, N maps, reduce, cleanup) for small N. Implementation-wise, I'm wary of adding yet more dimensions and special-cases to the JobTracker. Does one of these "quick" jobs take a map slot, scheduling wise, or does it take both a map and a reduce slot? Does the framework make the optimization itself, or do users ask for it? – Philip
          Hide
          Arun C Murthy added a comment -

          Aaron,

          I'm, foremost, trying to sell this idea to the community at large. It is a new (major?) functionality and hence my interest in ensuring people are comfortable about the direction I'm proposing.

          From your first comment I was under the impression that you had reservations about this direction. Maybe I was mistaken about your lack of enthusiasm for the proposal, maybe I put too much emphasis on the implementation aspects when I put forth the proposal. My apologies.

          Backing up, I'd appreciate if you could re-state your position on the direction.


          To keep things clear, I'll post a separate comment about some of my thoughts on the implementation details once we all agree about the direction. Your thoughts are, as always, welcome there too.

          Show
          Arun C Murthy added a comment - Aaron, I'm, foremost, trying to sell this idea to the community at large. It is a new (major?) functionality and hence my interest in ensuring people are comfortable about the direction I'm proposing. From your first comment I was under the impression that you had reservations about this direction. Maybe I was mistaken about your lack of enthusiasm for the proposal, maybe I put too much emphasis on the implementation aspects when I put forth the proposal. My apologies. Backing up, I'd appreciate if you could re-state your position on the direction. To keep things clear, I'll post a separate comment about some of my thoughts on the implementation details once we all agree about the direction. Your thoughts are, as always, welcome there too.
          Hide
          Aaron Kimball added a comment -

          Arun,

          My concerns are primarily based around how much code would need to change in the tasktrackers to work with a new type of job runner, etc. As such, I suggested a possible internal implementation model that does not require building a new JobRunner nor changing how child tasks interface with the tasktracker. I understand why you want this to integrate cleanly into user-level job scheduling/configuration. So to that end, please feel free to suggest a clean interface which goes on top of this for the client to work with.

          For example, I think that we could add a Job.setSingleProcess() method which users use to configure a job in this mode; it would then create another Job internally that uses the process I described above to bootstrap into the real Job. The actual mechanics of how to manage the subprocess are still done in a "regular" map task that itself uses the LocalJobRunner. Does this make sense?

          Show
          Aaron Kimball added a comment - Arun, My concerns are primarily based around how much code would need to change in the tasktrackers to work with a new type of job runner, etc. As such, I suggested a possible internal implementation model that does not require building a new JobRunner nor changing how child tasks interface with the tasktracker. I understand why you want this to integrate cleanly into user-level job scheduling/configuration. So to that end, please feel free to suggest a clean interface which goes on top of this for the client to work with. For example, I think that we could add a Job.setSingleProcess() method which users use to configure a job in this mode; it would then create another Job internally that uses the process I described above to bootstrap into the real Job. The actual mechanics of how to manage the subprocess are still done in a "regular" map task that itself uses the LocalJobRunner. Does this make sense?
          Hide
          Arun C Murthy added a comment -

          Would this really be that much higher performance than running locally on the client? If the job is really so small that it makes sense to run it in a single thread, then I am suspicious that the overhead described above would be overcome by running in the quasi-locality of the cluster, vs. just staying in the client and starting immediately.

          Whilst I agree that applications can do it currently with LocalJobRunner, this jira is all about making it completely transparent to them!

          Imagine you have a large pipeline of jobs, some of which are small and other are not. In this case it would be a nightmare for the owner of the workflow to configure parts of them differently from the others. With this feature jobs would get the benefits without requiring heavy amounts of per-job crafting, capacity-planning etc.

          Also lots of real-world clusters have special 'launcher' (or gateway) nodes which really do not have the capacity to run very many jobs via the LocalJobRunner.


          The client needs to have access to HDFS anyway in order to do things like create the InputSplits for the cluster; so what's the advantage of running a single-threaded process on the cluster?

          Short-to-mid term we would all like to see MAPREDUCE-207 get in, this way InputSplits are computed in-cluster... we have talked about the 'setup' task computing the splits - potentially it could then turn around and run the job itself.

          Show
          Arun C Murthy added a comment - Would this really be that much higher performance than running locally on the client? If the job is really so small that it makes sense to run it in a single thread, then I am suspicious that the overhead described above would be overcome by running in the quasi-locality of the cluster, vs. just staying in the client and starting immediately. Whilst I agree that applications can do it currently with LocalJobRunner, this jira is all about making it completely transparent to them! Imagine you have a large pipeline of jobs, some of which are small and other are not. In this case it would be a nightmare for the owner of the workflow to configure parts of them differently from the others. With this feature jobs would get the benefits without requiring heavy amounts of per-job crafting, capacity-planning etc. Also lots of real-world clusters have special 'launcher' (or gateway) nodes which really do not have the capacity to run very many jobs via the LocalJobRunner. The client needs to have access to HDFS anyway in order to do things like create the InputSplits for the cluster; so what's the advantage of running a single-threaded process on the cluster? Short-to-mid term we would all like to see MAPREDUCE-207 get in, this way InputSplits are computed in-cluster... we have talked about the 'setup' task computing the splits - potentially it could then turn around and run the job itself.
          Hide
          Aaron Kimball added a comment -

          I'm not really sure I see the utility of such a big change. Would this really be that much higher performance than running locally on the client? The client needs to have access to HDFS anyway in order to do things like create the InputSplits for the cluster; so what's the advantage of running a single-threaded process on the cluster? You'd still need to do some of the more heavyweight job-setup operations – ship the client jar over, spawn a separate JVM (even if it's reused for all map/reduce tasks), set up the task IPC connection to the tasktracker, etc. You'd also be vulnerable to the inherent very large time penalty associated with the "tasktracker polls" model of task scheduling that Hadoop uses.

          If the job is really so small that it makes sense to run it in a single thread, then I am suspicious that the overhead described above would be overcome by running in the quasi-locality of the cluster, vs. just staying in the client and starting immediately.

          If you do need this behavior, though, then rather than build a significant new amount of internal architecture, it occurs to me that you could probably do this all with "user level" code (maybe something that goes in the o.a.h.mapreduce.lib package) as follows: Write a map-only job that uses something like NLineInputFormat to create a single map task. That single map task could then itself be used as a springboard to set up the real job (maybe you've pre-serialized the jobconf.xml in the client and sent it to the singleton map task via the distributed cache) and run it in the existing LocalJobRunner there. I think this approach would be a lot cleaner; thoughts?

          Show
          Aaron Kimball added a comment - I'm not really sure I see the utility of such a big change. Would this really be that much higher performance than running locally on the client? The client needs to have access to HDFS anyway in order to do things like create the InputSplits for the cluster; so what's the advantage of running a single-threaded process on the cluster? You'd still need to do some of the more heavyweight job-setup operations – ship the client jar over, spawn a separate JVM (even if it's reused for all map/reduce tasks), set up the task IPC connection to the tasktracker, etc. You'd also be vulnerable to the inherent very large time penalty associated with the "tasktracker polls" model of task scheduling that Hadoop uses. If the job is really so small that it makes sense to run it in a single thread, then I am suspicious that the overhead described above would be overcome by running in the quasi-locality of the cluster, vs. just staying in the client and starting immediately. If you do need this behavior, though, then rather than build a significant new amount of internal architecture, it occurs to me that you could probably do this all with "user level" code (maybe something that goes in the o.a.h.mapreduce.lib package) as follows: Write a map-only job that uses something like NLineInputFormat to create a single map task. That single map task could then itself be used as a springboard to set up the real job (maybe you've pre-serialized the jobconf.xml in the client and sent it to the singleton map task via the distributed cache) and run it in the existing LocalJobRunner there. I think this approach would be a lot cleaner; thoughts?
          Hide
          Hong Tang added a comment -

          +1 on the direction.

          Agree with Dhruba that this should be explicitly enabled until we have more experience.

          Not sure we should mandate the serialization of task execution, maybe we can use multiple slots if available on a particular TT? We probably should defer such optimizations after JT refactor is done though.

          Show
          Hong Tang added a comment - +1 on the direction. Agree with Dhruba that this should be explicitly enabled until we have more experience. Not sure we should mandate the serialization of task execution, maybe we can use multiple slots if available on a particular TT? We probably should defer such optimizations after JT refactor is done though.
          Hide
          Devaraj Das added a comment -

          +1 seems like a good thing. But maybe it is possible to do this without the special task. I am imagining that a combination of jvm-reuse, and, special job attributes (like if the Job's input is less than a dfs block), then send all tasks of that job to the tasktracker, will make it work.

          Show
          Devaraj Das added a comment - +1 seems like a good thing. But maybe it is possible to do this without the special task. I am imagining that a combination of jvm-reuse, and, special job attributes (like if the Job's input is less than a dfs block), then send all tasks of that job to the tasktracker, will make it work.
          Hide
          Mahadev konar added a comment -

          +1 .. seems like a very useful idea....

          Show
          Mahadev konar added a comment - +1 .. seems like a very useful idea....
          Hide
          Scott Chen added a comment -

          +1 on the idea. This will be very useful for the hive users.

          Show
          Scott Chen added a comment - +1 on the idea. This will be very useful for the hive users.
          Hide
          Todd Lipcon added a comment -

          I like the idea, but am curious what the implementation path would be. The "Task Type" stuff is already pretty messy in the JT, so I'm concerned that this will worsen the situation. Will there be a refactor patch prior to this that cleans up the existing functionality before adding more task types?

          Show
          Todd Lipcon added a comment - I like the idea, but am curious what the implementation path would be. The "Task Type" stuff is already pretty messy in the JT, so I'm concerned that this will worsen the situation. Will there be a refactor patch prior to this that cleans up the existing functionality before adding more task types?
          Hide
          dhruba borthakur added a comment -

          I like this idea. If the configuration that triggers this new behaviour is user-settable, that will be great.

          Show
          dhruba borthakur added a comment - I like this idea. If the configuration that triggers this new behaviour is user-settable, that will be great.

            People

            • Assignee:
              Greg Roelofs
              Reporter:
              Arun C Murthy
            • Votes:
              3 Vote for this issue
              Watchers:
              35 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development