Details

    • Type: New Feature New Feature
    • Status: Reopened
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: applicationmaster, mrv2
    • Labels:
      None

      Description

      It would be nice if we could add method to OutputFormat that would allow a job to indicate where a reducer for a given partition should should run. This is similar to the getSplits() method on InputFormat. In our application the reducer is using other data in addition to the map outputs during processing and data accesses could be made more efficient if the JobTracker scheduled the reducers to run on specific hosts.

      1. MAPREDUCE-199.patch
        6 kB
        Harsh J
      2. MAPREDUCE-199.patch
        6 kB
        Harsh J

        Issue Links

          Activity

          Hide
          Ben Podgursky added a comment -

          Hey Harsh. Delay was because I've only worked with MR1 so far (cloudera hadoop 4) and all of my source suggestions were in the context of MR1, so I spent a bit of time checking out what in the source changed between MR1 and MR2.

          After looking around your patch seems like a pretty nice way of enabling this functionality without baking anything else into the API or complicating the code (since it bootstraps on locality logic which already exists.)

          The other alternative I was thinking about was making the logic pluggable via the JobConf, similar to how partitions are set, eg

          conf.setReduceTaskLocalizer(MyLocalityLogic.class);

          Where MyLocalityLogic would have logic for assigning task -> host. I'm not really sure how it would work though since (1) I'm not sure whether user-code is on the classpath at the time tasks are assigned to nodes and (2) the locality logic would need to be presented with a whole network topology to be able to do anything intelligent, and I'm not sure where that would come from...

          Show
          Ben Podgursky added a comment - Hey Harsh. Delay was because I've only worked with MR1 so far (cloudera hadoop 4) and all of my source suggestions were in the context of MR1, so I spent a bit of time checking out what in the source changed between MR1 and MR2. After looking around your patch seems like a pretty nice way of enabling this functionality without baking anything else into the API or complicating the code (since it bootstraps on locality logic which already exists.) The other alternative I was thinking about was making the logic pluggable via the JobConf, similar to how partitions are set, eg conf.setReduceTaskLocalizer(MyLocalityLogic.class); Where MyLocalityLogic would have logic for assigning task -> host. I'm not really sure how it would work though since (1) I'm not sure whether user-code is on the classpath at the time tasks are assigned to nodes and (2) the locality logic would need to be presented with a whole network topology to be able to do anything intelligent, and I'm not sure where that would come from...
          Hide
          Harsh J added a comment -

          Hey Ben,

          The previously posted patch "works" for the hints, but uses a manual config based hint approach instead of an API based one. See the tests in the patch for an example.

          I'll be glad to implement an API wrapper/alternate approach for this as well, can you post what you may have in mind? We can refine it up from there.

          Show
          Harsh J added a comment - Hey Ben, The previously posted patch "works" for the hints, but uses a manual config based hint approach instead of an API based one. See the tests in the patch for an example. I'll be glad to implement an API wrapper/alternate approach for this as well, can you post what you may have in mind? We can refine it up from there.
          Hide
          Ben Podgursky added a comment -

          Doesn't seem like there's been any progress on this recently, but this functionality would be really helpful to us (we've been hunting for a way to do exactly this.)

          Our use-case is somewhat similar to the HBase one-we have a number of stores which we keep sorted on the same keys and partitioned identically (ex, partitioned into partfiles 0000 0599). When we need to join these stores, instead of running a full map + reduce, we can just run a map task for each file which reads in the partfiles for each side of the join. Since we are reading these stores many times, it saves us a lot of cluster time to only sort the files once.

          These files are each produced by a normal reduce task. It would be great if we were able to give hadoop a hint that part-0123 of store A and part-0123 of store B should end up on the same host, so any job joining the two files will be reading purely local data. Ideally we could accomplish this by giving hadoop a hint about where to run each reduce task so we don't have to shuffle the data around later.

          Show
          Ben Podgursky added a comment - Doesn't seem like there's been any progress on this recently, but this functionality would be really helpful to us (we've been hunting for a way to do exactly this.) Our use-case is somewhat similar to the HBase one- we have a number of stores which we keep sorted on the same keys and partitioned identically (ex, partitioned into partfiles 0000 0599). When we need to join these stores, instead of running a full map + reduce, we can just run a map task for each file which reads in the partfiles for each side of the join. Since we are reading these stores many times, it saves us a lot of cluster time to only sort the files once. These files are each produced by a normal reduce task. It would be great if we were able to give hadoop a hint that part-0123 of store A and part-0123 of store B should end up on the same host, so any job joining the two files will be reading purely local data. Ideally we could accomplish this by giving hadoop a hint about where to run each reduce task so we don't have to shuffle the data around later.
          Hide
          eric baldeschwieler added a comment -

          I can see the value of matching reduce outputs to region servers. This does seem like a compelling use case.

          That said, the MR interface is already very broad. Let's let any extensions to the API bake for a while to make sure we are doing the right thing. Its a lot easier to add thing to the config or API than take them out. Using the same abstractions / API as the Map would be nice if doable.

          Show
          eric baldeschwieler added a comment - I can see the value of matching reduce outputs to region servers. This does seem like a compelling use case. That said, the MR interface is already very broad. Let's let any extensions to the API bake for a while to make sure we are doing the right thing. Its a lot easier to add thing to the config or API than take them out. Using the same abstractions / API as the Map would be nice if doable.
          Hide
          Karthik Kambatla added a comment -

          This might not be the use case Harsh was thinking of, but here is a use case from my summer internship a couple of years ago:

          Our use case: We were building a topic-based pub/sub system. The published events were in one HBase table, and the subscriptions were in another table. While the published events were stored by their published time-stamp, the subscriptions were stored by <Topic ID: Subscription ID> as the key. Matching the published events to subscriptions required a join of the two tables on the topic.

          Approach: The map phase reads all the published events and emits (topic, event) pairs. The reduce's input essentially is all events for a topic - the reduce reads all the subscriptions of that topic and matches. Now, it would save a lot of communication if the reduce (for topic A) were scheduled on the same node that had the subscriptions for the same topic A. Hence, the need for reduce data-locality.

          We achieved this data locality through ugly hacks to the JT to store HBase region (key-range): host mapping and overloading the partitioner to push each <key, value> pair to appropriate reducers. I don't remember the exact speedups, but it was quite significant. (if my memory is not wrong ~2x)

          Show
          Karthik Kambatla added a comment - This might not be the use case Harsh was thinking of, but here is a use case from my summer internship a couple of years ago: Our use case: We were building a topic-based pub/sub system. The published events were in one HBase table, and the subscriptions were in another table. While the published events were stored by their published time-stamp, the subscriptions were stored by <Topic ID: Subscription ID> as the key. Matching the published events to subscriptions required a join of the two tables on the topic. Approach: The map phase reads all the published events and emits (topic, event) pairs. The reduce's input essentially is all events for a topic - the reduce reads all the subscriptions of that topic and matches. Now, it would save a lot of communication if the reduce (for topic A) were scheduled on the same node that had the subscriptions for the same topic A. Hence, the need for reduce data-locality. We achieved this data locality through ugly hacks to the JT to store HBase region (key-range): host mapping and overloading the partitioner to push each <key, value> pair to appropriate reducers. I don't remember the exact speedups, but it was quite significant. (if my memory is not wrong ~2x)
          Hide
          Harsh J added a comment -

          Harsh - I'm not familiar with the HBase case; can you please add more colour?

          Surely!

          In this case, won't it be sufficient to schedule maps on the RS? If the data is already sorted, but would you try schedule reduces instead?

          We have this concept of bulkloads, for example, in HBase, where the Maps read in data from a raw source (such as a delimited text file) and passes it to a reducer (partitioned by TotalOrderPartitioner based on the region distribution of the table in HBase). The sorted data is then written onto a file on HDFS and later, injected into the /hbase directory structure for serving.

          There's cheap gains (but gains nevertheless) if the data written by the reducer is local to the RegionServer hosting that specific partition (region) itself, before we bulkload it in.

          Likewise, if people have HBase jobs doing a reduce phase for whatever reason, and wish to achieve locality such that the reducer task (which emit the keys) are local to the regionserver serving the same region for those keys, they can do so via a pre-configured job.

          There are some use-cases out of HBase as well (I'll let those who've desired this comment), but maybe YARN can change those to be outside of MR today.

          Or maybe HBase can get a custom AM to do their work in more efficient manner than the current MR (MR is easy to use though) - in the long term.

          I just think using YARN to write a new app for everything is a slightly longer path to take if MR can be harmlessly tweaked a bit more to do the same thing along with the other good things it already does.

          My concern adding apis/config is that it becomes part of the user interface and I'd like to think through it's implications, and whether it's really necessary, before we commit to it. Makes sense?

          Yes, makes sense on the API side. Partly why I went with a simple config-based option on doing this.

          Show
          Harsh J added a comment - Harsh - I'm not familiar with the HBase case; can you please add more colour? Surely! In this case, won't it be sufficient to schedule maps on the RS? If the data is already sorted, but would you try schedule reduces instead? We have this concept of bulkloads, for example, in HBase, where the Maps read in data from a raw source (such as a delimited text file) and passes it to a reducer (partitioned by TotalOrderPartitioner based on the region distribution of the table in HBase). The sorted data is then written onto a file on HDFS and later, injected into the /hbase directory structure for serving. There's cheap gains (but gains nevertheless) if the data written by the reducer is local to the RegionServer hosting that specific partition (region) itself, before we bulkload it in. Likewise, if people have HBase jobs doing a reduce phase for whatever reason, and wish to achieve locality such that the reducer task (which emit the keys) are local to the regionserver serving the same region for those keys, they can do so via a pre-configured job. There are some use-cases out of HBase as well (I'll let those who've desired this comment), but maybe YARN can change those to be outside of MR today. Or maybe HBase can get a custom AM to do their work in more efficient manner than the current MR (MR is easy to use though) - in the long term. I just think using YARN to write a new app for everything is a slightly longer path to take if MR can be harmlessly tweaked a bit more to do the same thing along with the other good things it already does. My concern adding apis/config is that it becomes part of the user interface and I'd like to think through it's implications, and whether it's really necessary, before we commit to it. Makes sense? Yes, makes sense on the API side. Partly why I went with a simple config-based option on doing this.
          Hide
          Arun C Murthy added a comment -

          Harsh - I'm not familiar with the HBase case; can you please add more colour?

          in HBase-land we would benefit by direct control if the reducer can be scheduled directly onto the RegionServer that hosts the sorted area of keys the reducer is going to process, or even fetch.

          In this case, won't it be sufficient to schedule maps on the RS? If the data is already sorted, but would you try schedule reduces instead?

          My concern adding apis/config is that it becomes part of the user interface and I'd like to think through it's implications, and whether it's really necessary, before we commit to it. Makes sense?

          Show
          Arun C Murthy added a comment - Harsh - I'm not familiar with the HBase case; can you please add more colour? in HBase-land we would benefit by direct control if the reducer can be scheduled directly onto the RegionServer that hosts the sorted area of keys the reducer is going to process, or even fetch. In this case, won't it be sufficient to schedule maps on the RS? If the data is already sorted, but would you try schedule reduces instead? My concern adding apis/config is that it becomes part of the user interface and I'd like to think through it's implications, and whether it's really necessary, before we commit to it. Makes sense?
          Hide
          Harsh J added a comment -

          I'm not sure I see the value in user-specifying 'hints', the way to get this to work is to 'figure' where the map-outputs are (the AM knows it) and then try to pick the right hosts/racks.

          This is good too, as an auto-optimization of regular MR apps. However, in HBase-land we would benefit by direct control if the reducer can be scheduled directly onto the RegionServer that hosts the sorted area of keys the reducer is going to process, or even fetch.

          Seems like we can go for two things:

          1. Auto-optimize by default, so that all users benefit somehow.
          2. Provide a way to override the automation via API supplied partition->host mappings, to allow those who want to control for other odd purposes.

          Arun - Would this be good?

          Show
          Harsh J added a comment - I'm not sure I see the value in user-specifying 'hints', the way to get this to work is to 'figure' where the map-outputs are (the AM knows it) and then try to pick the right hosts/racks. This is good too, as an auto-optimization of regular MR apps. However, in HBase-land we would benefit by direct control if the reducer can be scheduled directly onto the RegionServer that hosts the sorted area of keys the reducer is going to process, or even fetch. Seems like we can go for two things: Auto-optimize by default, so that all users benefit somehow. Provide a way to override the automation via API supplied partition->host mappings, to allow those who want to control for other odd purposes. Arun - Would this be good?
          Hide
          Arun C Murthy added a comment -

          I'm not sure I see the value in user-specifying 'hints', the way to get this to work is to 'figure' where the map-outputs are (the AM knows it) and then try to pick the right hosts/racks.

          Show
          Arun C Murthy added a comment - I'm not sure I see the value in user-specifying 'hints', the way to get this to work is to 'figure' where the map-outputs are (the AM knows it) and then try to pick the right hosts/racks.
          Hide
          Harsh J added a comment -

          Thanks for the feedback Ted, I'll come up with another patch that does it in the same way as the InputSplit structures as soon as I get some more free cycles.

          Show
          Harsh J added a comment - Thanks for the feedback Ted, I'll come up with another patch that does it in the same way as the InputSplit structures as soon as I get some more free cycles.
          Hide
          Ted Yu added a comment -

          That's more considerate. +1.

          Show
          Ted Yu added a comment - That's more considerate. +1.
          Hide
          Harsh J added a comment -

          It may make more sense to have a separate serialized structure for multi-hosts (for partitions), rather than leveraging the config object? Since the host list can grow and there are limits on the config object's serialization size.

          Show
          Harsh J added a comment - It may make more sense to have a separate serialized structure for multi-hosts (for partitions), rather than leveraging the config object? Since the host list can grow and there are limits on the config object's serialization size.
          Hide
          Ted Yu added a comment -

          Shall we consider extending support, syntactically, for multi-host locality hint ?
          Meaning some kind of delimiter needs to be defined which should be different from ',' used by StringUtils#getStringCollection()

          Show
          Ted Yu added a comment - Shall we consider extending support, syntactically, for multi-host locality hint ? Meaning some kind of delimiter needs to be defined which should be different from ',' used by StringUtils#getStringCollection()
          Hide
          Harsh J added a comment -
          • Fixed a blooper in test (Reduce task attempt impl. constructor had some args switched, failing the test)
          • Added a precondition check to the static method of extracting reducer hints, just as a completeness check. Thanks to Sho for this.
          Show
          Harsh J added a comment - Fixed a blooper in test (Reduce task attempt impl. constructor had some args switched, failing the test) Added a precondition check to the static method of extracting reducer hints, just as a completeness check. Thanks to Sho for this.
          Hide
          Harsh J added a comment -

          Here's a first shot it. Adds support for locality hints via Job configuration (rather than an API). Supports one host hint per partition at the moment.

          For those who feel this would benefit: Would you like to have multi-host locality hinting support for each partition, akin to maps?

          Patch still needs some work, but hopefully the approach is right. Yet to test with a built cluster to observe the locality pick-up, but the framework around these areas is pretty nice (compared to MR1's JIP classes and such).

          Show
          Harsh J added a comment - Here's a first shot it. Adds support for locality hints via Job configuration (rather than an API). Supports one host hint per partition at the moment. For those who feel this would benefit: Would you like to have multi-host locality hinting support for each partition, akin to maps? Patch still needs some work, but hopefully the approach is right. Yet to test with a built cluster to observe the locality pick-up, but the framework around these areas is pretty nice (compared to MR1's JIP classes and such).
          Hide
          Arun C Murthy added a comment -

          This should be much more feasible with MR2 and should yield significant benefits for small jobs.

          Show
          Arun C Murthy added a comment - This should be much more feasible with MR2 and should yield significant benefits for small jobs.
          Hide
          Harsh J added a comment -

          But please reopen or open a new issue if need be (/me looks at HBase).

          Show
          Harsh J added a comment - But please reopen or open a new issue if need be (/me looks at HBase).
          Hide
          Harsh J added a comment -

          This has been debated many times and the end reason has always been that there just isn't any merit in trying to schedule reducers with given locality (or hints of).

          Resolving this old old one but am sure there were more recent tickets closed with the same result.

          Show
          Harsh J added a comment - This has been debated many times and the end reason has always been that there just isn't any merit in trying to schedule reducers with given locality (or hints of). Resolving this old old one but am sure there were more recent tickets closed with the same result.

            People

            • Assignee:
              Harsh J
              Reporter:
              Benjamin Reed
            • Votes:
              6 Vote for this issue
              Watchers:
              26 Start watching this issue

              Dates

              • Created:
                Updated:

                Development