Apache Tez
  1. Apache Tez
  2. TEZ-145

Support a combiner processor that can run non-local to map/reduce nodes

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None

      Description

      For aggregate operators that can benefit by running in multi-level trees, support of being able to run a combiner in a non-local mode would allow performance efficiencies to be gained by running a combiner at a rack-level.

        Activity

        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas, thank you for the comment. Make sense. As a first prototyping, I'll try to create CombinerProcessor.

        Show
        Tsuyoshi Ozawa added a comment - Bikas, thank you for the comment. Make sense. As a first prototyping, I'll try to create CombinerProcessor.
        Hide
        Bikas Saha added a comment -

        Tsuyoshi Ozawa Since DAG creation is a user land operation, Tez cannot insert this optimization by itself. This would most likely have to be a combination of a CombinerProcessor that can be added to a Combiner vertex in the DAG. Its inputs and outputs would have to be assigned based on the producers (sorted etc). It would run the combine to reduce data and then output (in desired format - sorted etc) to the next vertex. The users would choose to add this vertex when they expect significant data reduction. There is likely to be an associated VertexManager that can group input data to combiner tasks to combine data on a machine or on a rack dynamically at runtime (instead of TaskLocationHint). Different combiner tasks may have different number of input edges.

        Show
        Bikas Saha added a comment - Tsuyoshi Ozawa Since DAG creation is a user land operation, Tez cannot insert this optimization by itself. This would most likely have to be a combination of a CombinerProcessor that can be added to a Combiner vertex in the DAG. Its inputs and outputs would have to be assigned based on the producers (sorted etc). It would run the combine to reduce data and then output (in desired format - sorted etc) to the next vertex. The users would choose to add this vertex when they expect significant data reduction. There is likely to be an associated VertexManager that can group input data to combiner tasks to combine data on a machine or on a rack dynamically at runtime (instead of TaskLocationHint). Different combiner tasks may have different number of input edges.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Gopal V I meant following configurations:

        • Creating DAGs with CombinerProcessor via YARNRunner#createDAG only when configuration is enabled.
        • CombinerProcessor works with TaskLocationHint.

        What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Gopal V I meant following configurations: Creating DAGs with CombinerProcessor via YARNRunner#createDAG only when configuration is enabled. CombinerProcessor works with TaskLocationHint. What do you think?
        Hide
        Tsuyoshi Ozawa added a comment -

        Hitesh Shah Sorry for the delay. Maybe I can throw a patch in a few weeks. Thanks for your notification.

        Show
        Tsuyoshi Ozawa added a comment - Hitesh Shah Sorry for the delay. Maybe I can throw a patch in a few weeks. Thanks for your notification.
        Hide
        Hitesh Shah added a comment - - edited

        Tsuyoshi Ozawa In case you have not started working on this, I have tagged this jira as a potential idea for Google Summer of code.

        Show
        Hitesh Shah added a comment - - edited Tsuyoshi Ozawa In case you have not started working on this, I have tagged this jira as a potential idea for Google Summer of code.
        Hide
        Bikas Saha added a comment -

        Not sure what you mean by YARNRunner#createDAG with TaskLocationHint. It seems like a VertexManager that does this at runtime (after being setup at compile time) would be the way to go. Maybe it will clear things up if there is a doc describing the solution so that we are all on the same page.

        Gopal, it maybe that the notion of combiners is composable. So a combiner is a commutative and associative function that can be applied in any order and any number of times. Then the combiner could be run just after the mapper (in-proc), independently (in combiner tasks), or just before the reducer (in-proc) to trade off pure combiner work with overhead of doing it. Are we thinking of the same thing?

        Show
        Bikas Saha added a comment - Not sure what you mean by YARNRunner#createDAG with TaskLocationHint. It seems like a VertexManager that does this at runtime (after being setup at compile time) would be the way to go. Maybe it will clear things up if there is a doc describing the solution so that we are all on the same page. Gopal, it maybe that the notion of combiners is composable. So a combiner is a commutative and associative function that can be applied in any order and any number of times. Then the combiner could be run just after the mapper (in-proc), independently (in combiner tasks), or just before the reducer (in-proc) to trade off pure combiner work with overhead of doing it. Are we thinking of the same thing?
        Hide
        Tsuyoshi Ozawa added a comment -

        Thanks for good suggestions, Gopal V and Bikas Saha. I took some days to benchmark base line of Tez. Now I started to implement this feature in YARNRunner#createDAG with TaskLocationHint - host-local R and racklocal R as Gopal mentioned. I'll share you when I have any progress. Thanks!

        Show
        Tsuyoshi Ozawa added a comment - Thanks for good suggestions, Gopal V and Bikas Saha . I took some days to benchmark base line of Tez. Now I started to implement this feature in YARNRunner#createDAG with TaskLocationHint - host-local R and racklocal R as Gopal mentioned. I'll share you when I have any progress. Thanks!
        Hide
        Gopal V added a comment -

        Yes, this would be harder to implement if we retain the MR notions of combiners/mappers running in-proc.

        I think the an intermediate step would be to pair this idea with a chunked shuffle, which shuffles 1 sort buffer at a time.

        A combine task is then easier to model, as the map tasks themselves will never run any combiner tasks in that model.

        Logically, this turns MR into M-R-R-R with M-(host-local)R-(rack-local)R-(final)R.

        Once the user is producing that DAG with custom edges, the complexity reduces into just the scheduling/event-routing for those the edges from the topology information in the Edge/Vertex managers.

        Show
        Gopal V added a comment - Yes, this would be harder to implement if we retain the MR notions of combiners/mappers running in-proc. I think the an intermediate step would be to pair this idea with a chunked shuffle, which shuffles 1 sort buffer at a time. A combine task is then easier to model, as the map tasks themselves will never run any combiner tasks in that model. Logically, this turns MR into M-R-R-R with M-(host-local)R-(rack-local)R-(final)R. Once the user is producing that DAG with custom edges, the complexity reduces into just the scheduling/event-routing for those the edges from the topology information in the Edge/Vertex managers.
        Hide
        Bikas Saha added a comment -

        We can look at a combination of vertex manager/edge (possibly only vertexmanager) that can do the following job. Given a set of distributed data (typically produced by the vertex that reads input but can be any vertex), how can it be made less distributed so that we have a balance of parallelism and cost of computation. The cost of launching 10K tasks to process 10K small pieces of data might be less than launching 1K tasks to process the same data if it was in 1K pieces. If the disttibuted data is already small then it can be aggregated to a smaller set of locations, via an aggregation tree, so that each location has a meaningful size wrt the cost of computation. If the distributed data is large but we are going to apply a reducing aggregating function on it (eg. sum or count) then it may make sense to apply that aggreagtion function on it while executing that aggregation tree because then the data volume will decrease as it funnels down that tree. MR combiner is simulating 0 level trees in some sense. However, if the data volume is large and there is no reducing function then we should skip the aggregation tree and directly do the subsequent operation because the aggregation tree provides no benefit. So having the ability of to create these kind of aggregation trees via a vertex manager would be one promising way to go about doing this. However we are not ready for that yet because we dont support dynamic addition/removal of vertices in the DAG yet. Still, we can make a first step where the used has the option of statically creating the aggregation vertex with a rack level aggregation manager that can do rack level aggregations of data. This compile time choice can be made by the user when they are going to process a large volume of data and apply a reducing function to it.

        Show
        Bikas Saha added a comment - We can look at a combination of vertex manager/edge (possibly only vertexmanager) that can do the following job. Given a set of distributed data (typically produced by the vertex that reads input but can be any vertex), how can it be made less distributed so that we have a balance of parallelism and cost of computation. The cost of launching 10K tasks to process 10K small pieces of data might be less than launching 1K tasks to process the same data if it was in 1K pieces. If the disttibuted data is already small then it can be aggregated to a smaller set of locations, via an aggregation tree, so that each location has a meaningful size wrt the cost of computation. If the distributed data is large but we are going to apply a reducing aggregating function on it (eg. sum or count) then it may make sense to apply that aggreagtion function on it while executing that aggregation tree because then the data volume will decrease as it funnels down that tree. MR combiner is simulating 0 level trees in some sense. However, if the data volume is large and there is no reducing function then we should skip the aggregation tree and directly do the subsequent operation because the aggregation tree provides no benefit. So having the ability of to create these kind of aggregation trees via a vertex manager would be one promising way to go about doing this. However we are not ready for that yet because we dont support dynamic addition/removal of vertices in the DAG yet. Still, we can make a first step where the used has the option of statically creating the aggregation vertex with a rack level aggregation manager that can do rack level aggregations of data. This compile time choice can be made by the user when they are going to process a large volume of data and apply a reducing function to it.
        Hide
        Tsuyoshi Ozawa added a comment -

        Hi Bikas Saha, can we add node/rack-local processor or edge? It's generic and useful to process large amount data. What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Hi Bikas Saha , can we add node/rack-local processor or edge? It's generic and useful to process large amount data. What do you think?
        Hide
        Bikas Saha added a comment -

        Is this going to be MR specific? Not sure how we can add a processor thats independent of a higher level application.

        Show
        Bikas Saha added a comment - Is this going to be MR specific? Not sure how we can add a processor thats independent of a higher level application.
        Hide
        Tsuyoshi Ozawa added a comment -

        Thank you for sharing useful points, Hitesh! I'm starting to read code and think the design. I'll share a design when it's ready.

        Show
        Tsuyoshi Ozawa added a comment - Thank you for sharing useful points, Hitesh! I'm starting to read code and think the design. I'll share a design when it's ready.
        Hide
        Hitesh Shah added a comment -

        Tsuyoshi Ozawa Added you as a contributor. Feel free to re-assign the jira to yourself if you would like to take this on. Thanks.

        Show
        Hitesh Shah added a comment - Tsuyoshi Ozawa Added you as a contributor. Feel free to re-assign the jira to yourself if you would like to take this on. Thanks.
        Hide
        Hitesh Shah added a comment - - edited

        Tsuyoshi Ozawa Please go ahead and take a crack at it if you like. I have not started on any work related to this jira as of now.

        I will propose what I was thinking of as a possible option for a solution with respect to the current features of Tez. Feel free to change/suggest/tear apart the proposal/suggest improvements for core functionality in tez.

        Possible design:

        Background info: Tez supports concepts of vertices and edges. As part of each edge and vertex, there is possibility for a user to plug-in some user logic to affect different aspects of the run-time. Currently, there are some pieces of this implemented to support things such as dynamic sizing of the no. of reduce tasks to run. Via events, information of the outputs of a map stage can be sampled in the AM to determine how many reducers to run. Once this is decided, the user logic in the AM can then route the information of map outputs ( within DataMovementEvents) to the appropriate reducer to ensure that partitions are assigned correctly.

        Today, a MapReduce job consists of a Map vertex connected to a Reduce vertex via a Shuffle edge. For the above, I was thinking along the lines of a Map vertex followed by a Combiner Vertex which is then connected to the Reduce Vertex. The edge between the Map and combiner vertex could also just be a shuffle.
        Using a similar approach for reducer dynamism, the combiner vertex could use events generated by the framework to learn about the locations of where the Map tasks are running. Based on this, the user logic could then decide how many tasks to create for the combiner vertex ( For example, one per physical node or one per rack ) and also define the locality requirements. Note, the shuffle edge works by the map task generating an event publishing the location of the map output which is then passed to the next stage's input. Using this, there could be various optimizations done too. In some cases, the combiner vertex may decide to do no work and therefore pass the event generated by the map directly to the reduce without doing any work. This may require changes in the current shuffle input/output pairs though.

        Tez is still sometime away before we can dynamically introduce new vertices into the DAG. At some point, the combiner vertex would be dynamically introduced by user-logic but at this time, it might be a good start to implement it via a static DAG with optimizations to bypass it as needed.

        There is some reference information here: http://hortonworks.com/hadoop/tez/. ( We plan to create better docs and publish to the apache tez website soon ).

        Show
        Hitesh Shah added a comment - - edited Tsuyoshi Ozawa Please go ahead and take a crack at it if you like. I have not started on any work related to this jira as of now. I will propose what I was thinking of as a possible option for a solution with respect to the current features of Tez. Feel free to change/suggest/tear apart the proposal/suggest improvements for core functionality in tez. Possible design: Background info: Tez supports concepts of vertices and edges. As part of each edge and vertex, there is possibility for a user to plug-in some user logic to affect different aspects of the run-time. Currently, there are some pieces of this implemented to support things such as dynamic sizing of the no. of reduce tasks to run. Via events, information of the outputs of a map stage can be sampled in the AM to determine how many reducers to run. Once this is decided, the user logic in the AM can then route the information of map outputs ( within DataMovementEvents) to the appropriate reducer to ensure that partitions are assigned correctly. Today, a MapReduce job consists of a Map vertex connected to a Reduce vertex via a Shuffle edge. For the above, I was thinking along the lines of a Map vertex followed by a Combiner Vertex which is then connected to the Reduce Vertex. The edge between the Map and combiner vertex could also just be a shuffle. Using a similar approach for reducer dynamism, the combiner vertex could use events generated by the framework to learn about the locations of where the Map tasks are running. Based on this, the user logic could then decide how many tasks to create for the combiner vertex ( For example, one per physical node or one per rack ) and also define the locality requirements. Note, the shuffle edge works by the map task generating an event publishing the location of the map output which is then passed to the next stage's input. Using this, there could be various optimizations done too. In some cases, the combiner vertex may decide to do no work and therefore pass the event generated by the map directly to the reduce without doing any work. This may require changes in the current shuffle input/output pairs though. Tez is still sometime away before we can dynamically introduce new vertices into the DAG. At some point, the combiner vertex would be dynamically introduced by user-logic but at this time, it might be a good start to implement it via a static DAG with optimizations to bypass it as needed. There is some reference information here: http://hortonworks.com/hadoop/tez/ . ( We plan to create better docs and publish to the apache tez website soon ).
        Hide
        Tsuyoshi Ozawa added a comment -

        Hi Hitesh Shah, what's going on this JIRA? I'm a developer of MAPREDUCE-4502, a node level aggregation for MapReduce. If Tez is the phase to develop this feature, I'd like to tackle this feature.

        Show
        Tsuyoshi Ozawa added a comment - Hi Hitesh Shah , what's going on this JIRA? I'm a developer of MAPREDUCE-4502 , a node level aggregation for MapReduce. If Tez is the phase to develop this feature, I'd like to tackle this feature.

          People

          • Assignee:
            Tsuyoshi Ozawa
            Reporter:
            Hitesh Shah
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:

              Development