Apache Tez
  1. Apache Tez
  2. TEZ-145

Support a combiner processor that can run non-local to map/reduce nodes

    Details

    • Type: Bug Bug
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      For aggregate operators that can benefit by running in multi-level trees, support of being able to run a combiner in a non-local mode would allow performance efficiencies to be gained by running a combiner at a rack-level.

      1. WIP-TEZ-145-001.patch
        44 kB
        Tsuyoshi Ozawa
      2. TEZ-145.2.patch
        44 kB
        Tsuyoshi Ozawa

        Activity

        Hide
        Tsuyoshi Ozawa added a comment -

        Gopal V Bikas Saha thank you for the feedback! I have a question about where to handle the locality. Do you mean that we should use InputReadyVertexManager? Or, should we create new abstraction layer to deal with the locality? As a reference, on MAPREDUCE-4502, I changed AppMaster to handle locality of inputs and when to launch the node-level combiner.

        Show
        Tsuyoshi Ozawa added a comment - Gopal V Bikas Saha thank you for the feedback! I have a question about where to handle the locality. Do you mean that we should use InputReadyVertexManager? Or, should we create new abstraction layer to deal with the locality? As a reference, on MAPREDUCE-4502 , I changed AppMaster to handle locality of inputs and when to launch the node-level combiner.
        Hide
        Bikas Saha added a comment -

        I know what you are talking about but let me restate to check if we are on the same page.
        Combining can be at multiple levels - task, host, rack etc.
        Doing these combines in theory requires maintaining partition boundaries per combining level. However, if tasks are maintaining partition boundaries then there is a task explosion (== level-arity * partition count). Hence, an efficient, multi-level combine operation, needs to operate on multiple partitions per task at each level. Such that a reasonable number of tasks can be used to process a large number of partitions. This statement can be true even for the final reducer. Partially, that is what happens with auto-reduce except that the tasks lost their partition boundaries.
        If the processor can find a way to process multiple partitions while keeping them logically separate then we could de-link physical tasks from physical partitioning. If that is supported by the processor, the edge manager can be set up to do the correct routing of N output/partition indeces to the same task.

        Show
        Bikas Saha added a comment - I know what you are talking about but let me restate to check if we are on the same page. Combining can be at multiple levels - task, host, rack etc. Doing these combines in theory requires maintaining partition boundaries per combining level. However, if tasks are maintaining partition boundaries then there is a task explosion (== level-arity * partition count). Hence, an efficient, multi-level combine operation, needs to operate on multiple partitions per task at each level. Such that a reasonable number of tasks can be used to process a large number of partitions. This statement can be true even for the final reducer. Partially, that is what happens with auto-reduce except that the tasks lost their partition boundaries. If the processor can find a way to process multiple partitions while keeping them logically separate then we could de-link physical tasks from physical partitioning. If that is supported by the processor, the edge manager can be set up to do the correct routing of N output/partition indeces to the same task.
        Hide
        Gopal V added a comment -

        This is a question for Bikas Saha.

        There is an combiner edge & vertex manager that needs to go along with this to which converts all partitions from all local input into one combine processor (i.e if it finds it has remote fetches to do, it should just forward the DME events using the pipelined mode of >1 event per-attempt).

        To be able to bail-out with a no-op like that, all partitioning through-out has to be exactly the reducer partition count.

        This is the most optimal mode, but this makes everything extra complex.

        Assume you have 600 hosts over 30 racks which ran a map-task + 2000 partitions in the reducer.

        The host-level combiner input count is actually 600 x 2000 partitions, which can be grouped into 600 x m groups - not 2000 groups.

        The rack-level combiner input count is actually 30 x 2000 partitions, which can be grouped into 30 x n groups - not 2000 groups.

        Yet, all the inputs are actually always partitioned into 2000 partitions and the destination task-index is determined by something other than the partition.

        So, how practical is that?

        Show
        Gopal V added a comment - This is a question for Bikas Saha . There is an combiner edge & vertex manager that needs to go along with this to which converts all partitions from all local input into one combine processor (i.e if it finds it has remote fetches to do, it should just forward the DME events using the pipelined mode of >1 event per-attempt). To be able to bail-out with a no-op like that, all partitioning through-out has to be exactly the reducer partition count. This is the most optimal mode, but this makes everything extra complex. Assume you have 600 hosts over 30 racks which ran a map-task + 2000 partitions in the reducer. The host-level combiner input count is actually 600 x 2000 partitions, which can be grouped into 600 x m groups - not 2000 groups. The rack-level combiner input count is actually 30 x 2000 partitions, which can be grouped into 30 x n groups - not 2000 groups. Yet, all the inputs are actually always partitioned into 2000 partitions and the destination task-index is determined by something other than the partition. So, how practical is that?
        Hide
        Gopal V added a comment -

        Tsuyoshi Ozawa: the CombineProcessor patch looks good.

        This will help applications which do no in-memory aggregations, but you're effectively moving the data over racks ~3x.

        So this is a necessary part of the fix, but not the complete part as long as the ShuffleVertexManager is being used to connect them up.

        Because that vertex manager has no way to provide locality of tasks when spinning up tasks (for rack-local or host-local).

        Show
        Gopal V added a comment - Tsuyoshi Ozawa : the CombineProcessor patch looks good. This will help applications which do no in-memory aggregations, but you're effectively moving the data over racks ~3x. So this is a necessary part of the fix, but not the complete part as long as the ShuffleVertexManager is being used to connect them up. Because that vertex manager has no way to provide locality of tasks when spinning up tasks (for rack-local or host-local).
        Hide
        Hadoop QA added a comment -

        +1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12705400/TEZ-145.2.patch
        against master revision 9b845f2.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 1 new or modified test files.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 javadoc. There were no new javadoc warning messages.

        +1 findbugs. The patch does not introduce any new Findbugs (version 2.0.3) warnings.

        +1 release audit. The applied patch does not increase the total number of release audit warnings.

        +1 core tests. The patch passed unit tests in .

        Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/310//testReport/
        Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/310//console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - +1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12705400/TEZ-145.2.patch against master revision 9b845f2. +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. +1 javac . The applied patch does not increase the total number of javac compiler warnings. +1 javadoc . There were no new javadoc warning messages. +1 findbugs . The patch does not introduce any new Findbugs (version 2.0.3) warnings. +1 release audit . The applied patch does not increase the total number of release audit warnings. +1 core tests . The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/310//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/310//console This message is automatically generated.
        Hide
        Tsuyoshi Ozawa added a comment -

        Fix warnings by findbugs.

        Show
        Tsuyoshi Ozawa added a comment - Fix warnings by findbugs.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Sure, no problem. Gopal V looking forward your feedback

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Sure, no problem. Gopal V looking forward your feedback
        Hide
        Bikas Saha added a comment -

        Tsuyoshi Ozawa I will wait for a few days for Gopal to respond before I review this code. Please let me know if that works for you.

        Show
        Bikas Saha added a comment - Tsuyoshi Ozawa I will wait for a few days for Gopal to respond before I review this code. Please let me know if that works for you.
        Hide
        Bikas Saha added a comment -

        Gopal V You may have more ideas on optimal combine processor implementation that may be of relevance here.

        Show
        Bikas Saha added a comment - Gopal V You may have more ideas on optimal combine processor implementation that may be of relevance here.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12703928/WIP-TEZ-145-001.patch
        against master revision 59529ab.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 1 new or modified test files.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 javadoc. There were no new javadoc warning messages.

        -1 findbugs. The patch appears to introduce 2 new Findbugs (version 2.0.3) warnings.

        +1 release audit. The applied patch does not increase the total number of release audit warnings.

        +1 core tests. The patch passed unit tests in .

        Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/278//testReport/
        Findbugs warnings: https://builds.apache.org/job/PreCommit-TEZ-Build/278//artifact/patchprocess/newPatchFindbugsWarningstez-mapreduce.html
        Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/278//console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12703928/WIP-TEZ-145-001.patch against master revision 59529ab. +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. +1 javac . The applied patch does not increase the total number of javac compiler warnings. +1 javadoc . There were no new javadoc warning messages. -1 findbugs . The patch appears to introduce 2 new Findbugs (version 2.0.3) warnings. +1 release audit . The applied patch does not increase the total number of release audit warnings. +1 core tests . The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/278//testReport/ Findbugs warnings: https://builds.apache.org/job/PreCommit-TEZ-Build/278//artifact/patchprocess/newPatchFindbugsWarningstez-mapreduce.html Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/278//console This message is automatically generated.
        Hide
        Tsuyoshi Ozawa added a comment -
        +    if (hasFinalReduceStage) {
        +      boolean hasHostLocalReduceStage =
        +          conf.getBoolean(MRJobConfig.MRR_HOST_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_HOST_LOCAL_REDUCE);
        +      if (hasHostLocalReduceStage) {
        +        numIntermediateStages++;
        +      }
        +      boolean hasRackLocalReduceStage =
        +          conf.getBoolean(MRJobConfig.MRR_RACK_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_RACK_LOCAL_REDUCE);
        +      if (hasRackLocalReduceStage) {
        +        numIntermediateStages++;
        +      }
        +    }
        

        These options are not used for now.

        Show
        Tsuyoshi Ozawa added a comment - + if (hasFinalReduceStage) { + boolean hasHostLocalReduceStage = + conf.getBoolean(MRJobConfig.MRR_HOST_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_HOST_LOCAL_REDUCE); + if (hasHostLocalReduceStage) { + numIntermediateStages++; + } + boolean hasRackLocalReduceStage = + conf.getBoolean(MRJobConfig.MRR_RACK_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_RACK_LOCAL_REDUCE); + if (hasRackLocalReduceStage) { + numIntermediateStages++; + } + } These options are not used for now.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Hitesh Shah I prototyped CombineProcessor and TestOrderedWordCountWithCombineProcessor as an application.

        • CombineProcessor accepts OrderedGroupedInputLegacy and run Combiner#combineWithKVWriter.
        • MRCombiner has a new method, combineWithKVWriter, for accepting KeyValueWriter instead of IFile#Writer.
        • TestOrderedWordCountWithCombineProcessor uses CombineProcessor for an example. TestOrderedWordCountWithCombineProcessor finished in 3m40.838s with CombineProcessor, , while the TestOrderedWordCount finished in 6m53.447s. (Note that this result is unfair as you know - TestOrderedWordCountWithCombineProcessor works with 10 combiners thought TestOrderedWordCount works with only 2 reducers.)

        I appreciate if you give me a feedback!

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Hitesh Shah I prototyped CombineProcessor and TestOrderedWordCountWithCombineProcessor as an application. CombineProcessor accepts OrderedGroupedInputLegacy and run Combiner#combineWithKVWriter. MRCombiner has a new method, combineWithKVWriter, for accepting KeyValueWriter instead of IFile#Writer. TestOrderedWordCountWithCombineProcessor uses CombineProcessor for an example. TestOrderedWordCountWithCombineProcessor finished in 3m40.838s with CombineProcessor, , while the TestOrderedWordCount finished in 6m53.447s. (Note that this result is unfair as you know - TestOrderedWordCountWithCombineProcessor works with 10 combiners thought TestOrderedWordCount works with only 2 reducers.) I appreciate if you give me a feedback!
        Hide
        Tsuyoshi Ozawa added a comment -

        Attaching WIP patch.

        Show
        Tsuyoshi Ozawa added a comment - Attaching WIP patch.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas, thank you for the comment. Make sense. As a first prototyping, I'll try to create CombinerProcessor.

        Show
        Tsuyoshi Ozawa added a comment - Bikas, thank you for the comment. Make sense. As a first prototyping, I'll try to create CombinerProcessor.
        Hide
        Bikas Saha added a comment -

        Tsuyoshi Ozawa Since DAG creation is a user land operation, Tez cannot insert this optimization by itself. This would most likely have to be a combination of a CombinerProcessor that can be added to a Combiner vertex in the DAG. Its inputs and outputs would have to be assigned based on the producers (sorted etc). It would run the combine to reduce data and then output (in desired format - sorted etc) to the next vertex. The users would choose to add this vertex when they expect significant data reduction. There is likely to be an associated VertexManager that can group input data to combiner tasks to combine data on a machine or on a rack dynamically at runtime (instead of TaskLocationHint). Different combiner tasks may have different number of input edges.

        Show
        Bikas Saha added a comment - Tsuyoshi Ozawa Since DAG creation is a user land operation, Tez cannot insert this optimization by itself. This would most likely have to be a combination of a CombinerProcessor that can be added to a Combiner vertex in the DAG. Its inputs and outputs would have to be assigned based on the producers (sorted etc). It would run the combine to reduce data and then output (in desired format - sorted etc) to the next vertex. The users would choose to add this vertex when they expect significant data reduction. There is likely to be an associated VertexManager that can group input data to combiner tasks to combine data on a machine or on a rack dynamically at runtime (instead of TaskLocationHint). Different combiner tasks may have different number of input edges.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Gopal V I meant following configurations:

        • Creating DAGs with CombinerProcessor via YARNRunner#createDAG only when configuration is enabled.
        • CombinerProcessor works with TaskLocationHint.

        What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Gopal V I meant following configurations: Creating DAGs with CombinerProcessor via YARNRunner#createDAG only when configuration is enabled. CombinerProcessor works with TaskLocationHint. What do you think?
        Hide
        Tsuyoshi Ozawa added a comment -

        Hitesh Shah Sorry for the delay. Maybe I can throw a patch in a few weeks. Thanks for your notification.

        Show
        Tsuyoshi Ozawa added a comment - Hitesh Shah Sorry for the delay. Maybe I can throw a patch in a few weeks. Thanks for your notification.
        Hide
        Hitesh Shah added a comment - - edited

        Tsuyoshi Ozawa In case you have not started working on this, I have tagged this jira as a potential idea for Google Summer of code.

        Show
        Hitesh Shah added a comment - - edited Tsuyoshi Ozawa In case you have not started working on this, I have tagged this jira as a potential idea for Google Summer of code.
        Hide
        Bikas Saha added a comment -

        Not sure what you mean by YARNRunner#createDAG with TaskLocationHint. It seems like a VertexManager that does this at runtime (after being setup at compile time) would be the way to go. Maybe it will clear things up if there is a doc describing the solution so that we are all on the same page.

        Gopal, it maybe that the notion of combiners is composable. So a combiner is a commutative and associative function that can be applied in any order and any number of times. Then the combiner could be run just after the mapper (in-proc), independently (in combiner tasks), or just before the reducer (in-proc) to trade off pure combiner work with overhead of doing it. Are we thinking of the same thing?

        Show
        Bikas Saha added a comment - Not sure what you mean by YARNRunner#createDAG with TaskLocationHint. It seems like a VertexManager that does this at runtime (after being setup at compile time) would be the way to go. Maybe it will clear things up if there is a doc describing the solution so that we are all on the same page. Gopal, it maybe that the notion of combiners is composable. So a combiner is a commutative and associative function that can be applied in any order and any number of times. Then the combiner could be run just after the mapper (in-proc), independently (in combiner tasks), or just before the reducer (in-proc) to trade off pure combiner work with overhead of doing it. Are we thinking of the same thing?
        Hide
        Tsuyoshi Ozawa added a comment -

        Thanks for good suggestions, Gopal V and Bikas Saha. I took some days to benchmark base line of Tez. Now I started to implement this feature in YARNRunner#createDAG with TaskLocationHint - host-local R and racklocal R as Gopal mentioned. I'll share you when I have any progress. Thanks!

        Show
        Tsuyoshi Ozawa added a comment - Thanks for good suggestions, Gopal V and Bikas Saha . I took some days to benchmark base line of Tez. Now I started to implement this feature in YARNRunner#createDAG with TaskLocationHint - host-local R and racklocal R as Gopal mentioned. I'll share you when I have any progress. Thanks!
        Hide
        Gopal V added a comment -

        Yes, this would be harder to implement if we retain the MR notions of combiners/mappers running in-proc.

        I think the an intermediate step would be to pair this idea with a chunked shuffle, which shuffles 1 sort buffer at a time.

        A combine task is then easier to model, as the map tasks themselves will never run any combiner tasks in that model.

        Logically, this turns MR into M-R-R-R with M-(host-local)R-(rack-local)R-(final)R.

        Once the user is producing that DAG with custom edges, the complexity reduces into just the scheduling/event-routing for those the edges from the topology information in the Edge/Vertex managers.

        Show
        Gopal V added a comment - Yes, this would be harder to implement if we retain the MR notions of combiners/mappers running in-proc. I think the an intermediate step would be to pair this idea with a chunked shuffle, which shuffles 1 sort buffer at a time. A combine task is then easier to model, as the map tasks themselves will never run any combiner tasks in that model. Logically, this turns MR into M-R-R-R with M-(host-local)R-(rack-local)R-(final)R. Once the user is producing that DAG with custom edges, the complexity reduces into just the scheduling/event-routing for those the edges from the topology information in the Edge/Vertex managers.
        Hide
        Bikas Saha added a comment -

        We can look at a combination of vertex manager/edge (possibly only vertexmanager) that can do the following job. Given a set of distributed data (typically produced by the vertex that reads input but can be any vertex), how can it be made less distributed so that we have a balance of parallelism and cost of computation. The cost of launching 10K tasks to process 10K small pieces of data might be less than launching 1K tasks to process the same data if it was in 1K pieces. If the disttibuted data is already small then it can be aggregated to a smaller set of locations, via an aggregation tree, so that each location has a meaningful size wrt the cost of computation. If the distributed data is large but we are going to apply a reducing aggregating function on it (eg. sum or count) then it may make sense to apply that aggreagtion function on it while executing that aggregation tree because then the data volume will decrease as it funnels down that tree. MR combiner is simulating 0 level trees in some sense. However, if the data volume is large and there is no reducing function then we should skip the aggregation tree and directly do the subsequent operation because the aggregation tree provides no benefit. So having the ability of to create these kind of aggregation trees via a vertex manager would be one promising way to go about doing this. However we are not ready for that yet because we dont support dynamic addition/removal of vertices in the DAG yet. Still, we can make a first step where the used has the option of statically creating the aggregation vertex with a rack level aggregation manager that can do rack level aggregations of data. This compile time choice can be made by the user when they are going to process a large volume of data and apply a reducing function to it.

        Show
        Bikas Saha added a comment - We can look at a combination of vertex manager/edge (possibly only vertexmanager) that can do the following job. Given a set of distributed data (typically produced by the vertex that reads input but can be any vertex), how can it be made less distributed so that we have a balance of parallelism and cost of computation. The cost of launching 10K tasks to process 10K small pieces of data might be less than launching 1K tasks to process the same data if it was in 1K pieces. If the disttibuted data is already small then it can be aggregated to a smaller set of locations, via an aggregation tree, so that each location has a meaningful size wrt the cost of computation. If the distributed data is large but we are going to apply a reducing aggregating function on it (eg. sum or count) then it may make sense to apply that aggreagtion function on it while executing that aggregation tree because then the data volume will decrease as it funnels down that tree. MR combiner is simulating 0 level trees in some sense. However, if the data volume is large and there is no reducing function then we should skip the aggregation tree and directly do the subsequent operation because the aggregation tree provides no benefit. So having the ability of to create these kind of aggregation trees via a vertex manager would be one promising way to go about doing this. However we are not ready for that yet because we dont support dynamic addition/removal of vertices in the DAG yet. Still, we can make a first step where the used has the option of statically creating the aggregation vertex with a rack level aggregation manager that can do rack level aggregations of data. This compile time choice can be made by the user when they are going to process a large volume of data and apply a reducing function to it.
        Hide
        Tsuyoshi Ozawa added a comment -

        Hi Bikas Saha, can we add node/rack-local processor or edge? It's generic and useful to process large amount data. What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Hi Bikas Saha , can we add node/rack-local processor or edge? It's generic and useful to process large amount data. What do you think?
        Hide
        Bikas Saha added a comment -

        Is this going to be MR specific? Not sure how we can add a processor thats independent of a higher level application.

        Show
        Bikas Saha added a comment - Is this going to be MR specific? Not sure how we can add a processor thats independent of a higher level application.
        Hide
        Tsuyoshi Ozawa added a comment -

        Thank you for sharing useful points, Hitesh! I'm starting to read code and think the design. I'll share a design when it's ready.

        Show
        Tsuyoshi Ozawa added a comment - Thank you for sharing useful points, Hitesh! I'm starting to read code and think the design. I'll share a design when it's ready.
        Hide
        Hitesh Shah added a comment -

        Tsuyoshi Ozawa Added you as a contributor. Feel free to re-assign the jira to yourself if you would like to take this on. Thanks.

        Show
        Hitesh Shah added a comment - Tsuyoshi Ozawa Added you as a contributor. Feel free to re-assign the jira to yourself if you would like to take this on. Thanks.
        Hide
        Hitesh Shah added a comment - - edited

        Tsuyoshi Ozawa Please go ahead and take a crack at it if you like. I have not started on any work related to this jira as of now.

        I will propose what I was thinking of as a possible option for a solution with respect to the current features of Tez. Feel free to change/suggest/tear apart the proposal/suggest improvements for core functionality in tez.

        Possible design:

        Background info: Tez supports concepts of vertices and edges. As part of each edge and vertex, there is possibility for a user to plug-in some user logic to affect different aspects of the run-time. Currently, there are some pieces of this implemented to support things such as dynamic sizing of the no. of reduce tasks to run. Via events, information of the outputs of a map stage can be sampled in the AM to determine how many reducers to run. Once this is decided, the user logic in the AM can then route the information of map outputs ( within DataMovementEvents) to the appropriate reducer to ensure that partitions are assigned correctly.

        Today, a MapReduce job consists of a Map vertex connected to a Reduce vertex via a Shuffle edge. For the above, I was thinking along the lines of a Map vertex followed by a Combiner Vertex which is then connected to the Reduce Vertex. The edge between the Map and combiner vertex could also just be a shuffle.
        Using a similar approach for reducer dynamism, the combiner vertex could use events generated by the framework to learn about the locations of where the Map tasks are running. Based on this, the user logic could then decide how many tasks to create for the combiner vertex ( For example, one per physical node or one per rack ) and also define the locality requirements. Note, the shuffle edge works by the map task generating an event publishing the location of the map output which is then passed to the next stage's input. Using this, there could be various optimizations done too. In some cases, the combiner vertex may decide to do no work and therefore pass the event generated by the map directly to the reduce without doing any work. This may require changes in the current shuffle input/output pairs though.

        Tez is still sometime away before we can dynamically introduce new vertices into the DAG. At some point, the combiner vertex would be dynamically introduced by user-logic but at this time, it might be a good start to implement it via a static DAG with optimizations to bypass it as needed.

        There is some reference information here: http://hortonworks.com/hadoop/tez/. ( We plan to create better docs and publish to the apache tez website soon ).

        Show
        Hitesh Shah added a comment - - edited Tsuyoshi Ozawa Please go ahead and take a crack at it if you like. I have not started on any work related to this jira as of now. I will propose what I was thinking of as a possible option for a solution with respect to the current features of Tez. Feel free to change/suggest/tear apart the proposal/suggest improvements for core functionality in tez. Possible design: Background info: Tez supports concepts of vertices and edges. As part of each edge and vertex, there is possibility for a user to plug-in some user logic to affect different aspects of the run-time. Currently, there are some pieces of this implemented to support things such as dynamic sizing of the no. of reduce tasks to run. Via events, information of the outputs of a map stage can be sampled in the AM to determine how many reducers to run. Once this is decided, the user logic in the AM can then route the information of map outputs ( within DataMovementEvents) to the appropriate reducer to ensure that partitions are assigned correctly. Today, a MapReduce job consists of a Map vertex connected to a Reduce vertex via a Shuffle edge. For the above, I was thinking along the lines of a Map vertex followed by a Combiner Vertex which is then connected to the Reduce Vertex. The edge between the Map and combiner vertex could also just be a shuffle. Using a similar approach for reducer dynamism, the combiner vertex could use events generated by the framework to learn about the locations of where the Map tasks are running. Based on this, the user logic could then decide how many tasks to create for the combiner vertex ( For example, one per physical node or one per rack ) and also define the locality requirements. Note, the shuffle edge works by the map task generating an event publishing the location of the map output which is then passed to the next stage's input. Using this, there could be various optimizations done too. In some cases, the combiner vertex may decide to do no work and therefore pass the event generated by the map directly to the reduce without doing any work. This may require changes in the current shuffle input/output pairs though. Tez is still sometime away before we can dynamically introduce new vertices into the DAG. At some point, the combiner vertex would be dynamically introduced by user-logic but at this time, it might be a good start to implement it via a static DAG with optimizations to bypass it as needed. There is some reference information here: http://hortonworks.com/hadoop/tez/ . ( We plan to create better docs and publish to the apache tez website soon ).
        Hide
        Tsuyoshi Ozawa added a comment -

        Hi Hitesh Shah, what's going on this JIRA? I'm a developer of MAPREDUCE-4502, a node level aggregation for MapReduce. If Tez is the phase to develop this feature, I'd like to tackle this feature.

        Show
        Tsuyoshi Ozawa added a comment - Hi Hitesh Shah , what's going on this JIRA? I'm a developer of MAPREDUCE-4502 , a node level aggregation for MapReduce. If Tez is the phase to develop this feature, I'd like to tackle this feature.

          People

          • Assignee:
            Tsuyoshi Ozawa
            Reporter:
            Hitesh Shah
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:

              Development