Apache Tez
  1. Apache Tez
  2. TEZ-145

Support a combiner processor that can run non-local to map/reduce nodes

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      For aggregate operators that can benefit by running in multi-level trees, support of being able to run a combiner in a non-local mode would allow performance efficiencies to be gained by running a combiner at a rack-level.

      1. TEZ-145.2.patch
        44 kB
        Tsuyoshi Ozawa
      2. WIP-TEZ-145-001.patch
        44 kB
        Tsuyoshi Ozawa

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Patch Available Patch Available
        660d 20h 14m 1 Tsuyoshi Ozawa 11/Mar/15 16:06
        Patch Available Patch Available Open Open
        31d 6h 26m 1 Tsuyoshi Ozawa 11/Apr/15 23:33
        Hide
        Tsuyoshi Ozawa added a comment -

        Gopal V Bikas Saha thank you for your comments. The name "Transducer" sounds straight forward to me since the semantics looks similar to clojure's one. Should we create subtasks to go advance this issue?

        Subtasks I come up with is:
        1. Implementing Transducer.
        2. Implementing PartitionPreservingMergedInput.
        3. Implementing PreorderedPartitionedOutput.
        4. (optional) short circuit read.

        What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Gopal V Bikas Saha thank you for your comments. The name "Transducer" sounds straight forward to me since the semantics looks similar to clojure's one. Should we create subtasks to go advance this issue? Subtasks I come up with is: 1. Implementing Transducer. 2. Implementing PartitionPreservingMergedInput. 3. Implementing PreorderedPartitionedOutput. 4. (optional) short circuit read. What do you think?
        Hide
        Gopal V added a comment -

        Not sure why pipelining is required for this? Essentially we are introducing another vertex that is doing some partial grouping.

        Pipelining is perhaps the wrong word, but generating multiple partial events from a single task is the performance improvement.

        That allows a locality missed task to go from start to finish without moving any data across.

        Reducers tend to be spread across racks (which might be another thing to tune for when using a small queue on a big cluster).

        Transducers don't have that problem, they're optional - to avoid producing a cross-rack fetches for an intermediate stage, the AM would have to excise any task that runs with sub-optimal locality after the runtime-expansion & scheduling of the downstream with TaskSpecs.

        The addition of the state machine to handle partial/final events in order for Inputs, effectively allows forwarding of the entire "split" as-is with a final=false flag, making that an entirely local decision in the Task rather than re-adjusting vertex fan-out during runtime.

        Show
        Gopal V added a comment - Not sure why pipelining is required for this? Essentially we are introducing another vertex that is doing some partial grouping. Pipelining is perhaps the wrong word, but generating multiple partial events from a single task is the performance improvement. That allows a locality missed task to go from start to finish without moving any data across. Reducers tend to be spread across racks (which might be another thing to tune for when using a small queue on a big cluster). Transducers don't have that problem, they're optional - to avoid producing a cross-rack fetches for an intermediate stage, the AM would have to excise any task that runs with sub-optimal locality after the runtime-expansion & scheduling of the downstream with TaskSpecs. The addition of the state machine to handle partial/final events in order for Inputs, effectively allows forwarding of the entire "split" as-is with a final=false flag, making that an entirely local decision in the Task rather than re-adjusting vertex fan-out during runtime.
        Hide
        Bikas Saha added a comment -

        Right, like I said in a previous comment, the transducer needs to maintain partition boundaries while doing its work for this to be useful.
        This would need a single vertex with its vertex manager (to do the rack aware grouping) and a single EdgeManager that does the custom routing from grouped maps to their transducer. This would be a fairly asymmetric edge because of arbitrary groupings.
        Not sure why pipelining is required for this? Essentially we are introducing another vertex that is doing some partial grouping. In fact, it could be done today in user land without Tez changes and we should be able to accomplish that in this jira. The completed map outputs are being aggregated transparently for the next stage.
        Where Tez support could be needed for efficiency is to be able to short circuit this stage. Lets say, the vertex manager figures out that the transducer stage is going to be useless (given data distribution, size and latency). Then Tez could allow removing this stage from the DAG so that the real consumer stage can be started with no overhead.

        Show
        Bikas Saha added a comment - Right, like I said in a previous comment, the transducer needs to maintain partition boundaries while doing its work for this to be useful. This would need a single vertex with its vertex manager (to do the rack aware grouping) and a single EdgeManager that does the custom routing from grouped maps to their transducer. This would be a fairly asymmetric edge because of arbitrary groupings. Not sure why pipelining is required for this? Essentially we are introducing another vertex that is doing some partial grouping. In fact, it could be done today in user land without Tez changes and we should be able to accomplish that in this jira. The completed map outputs are being aggregated transparently for the next stage. Where Tez support could be needed for efficiency is to be able to short circuit this stage. Lets say, the vertex manager figures out that the transducer stage is going to be useless (given data distribution, size and latency). Then Tez could allow removing this stage from the DAG so that the real consumer stage can be started with no overhead.
        Tsuyoshi Ozawa made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Hide
        Gopal V added a comment - - edited

        Bikas Saha: The figure 7 is identical to the runtime expansion I have drawn on my whiteboard.

        FYI, we've already implemented the simplest case in Tez when implementing the keep-alive shuffle_handler. We stream >1 map-outputs out of a host one after the other on a single shuffle request by collapsing many requests into a multi-get according to the tez.runtime.shuffle.fetch.max.task.output.at.once parameter.

        For Tsuyoshi Ozawa, I'm still behind on my design doc, but explaining here anyway.

        The Combiner vertex has strange inputs and outputs compared to today's vertices - I've been thinking about using a new term "Transducer" instead overloading on the idea of the older implementation (Clojure inspired name).

        A Transducer task accepts all partitions from a subset of mappers which it shares some sort of locality with & merges them depending on the destination. It processes entire map outputs instead of reading a single partition, to maximize sequential reads on the first pass.

        A Transducer task outputs data out of it in the same order as it ingested, resulting in a sort avoidance in case the next stage requires merged input.

        The entire vertex reads all the map outputs (in locality-aware "splits") and produces all events required for the downstream vertex.

        This is going to be strange, since the input edge is a combination of treating map-output as splits, but with the additional information about the input partition - since we lose the partition after the initial collect(), we need to merge the different partitions independently while feeding into the combiner to preserve the final output order between partitions (i.e "A" could be in partition 7 and "Z" in partition 0, and we want the sort order to be (partition,key) just like DefaultSorter output).

        We need a "PartitionPreservingMergedInput" with a OrderedPartitionedOutput on the upstream, while we can go with the regular Unordered for the unordered cases.

        On the output side, we'll have to combine a UnorderedPartitionedOutput (call it a "PreorderedPartitionedOutput" to indicate the sorted nature still) with a OrderedGroupedMergedKVInput on the same edge. And therefore we need to take care to not to reorder the data when writing it out.

        Bikas Saha/Hitesh Shah can correct me, but that needs a minimum of 2 edge managers and 1 vertex manager needed for the Tez impl of this (& then translate the MR DAG into this Tez DAG).

        This needed the Pipelined shuffle to be implemented first to be performant.

        The performance corner case kicks in when we can't afford to wait too long for rack-local allocations, so when we start up a task we cannot be sure it will start up in the right rack - we only get to give YARN a hint about allocation and end up with a TaskSpec + allocation which might not have the right locality.

        Therefore, a combiner task which gets an off-rack URL should immediately duplicate out the same composite DME to the final destination, with a clear marker stating that this is a partial chunk (i.e the pipelined data movement needs to be implemented for combiners to function without accidental miss costs).

        This short-circuits the accidental sub-optimal scenario due to cluster occupancy - so we're finally ready for this now that pipelined fetchers are in trunk.

        Show
        Gopal V added a comment - - edited Bikas Saha : The figure 7 is identical to the runtime expansion I have drawn on my whiteboard. FYI, we've already implemented the simplest case in Tez when implementing the keep-alive shuffle_handler. We stream >1 map-outputs out of a host one after the other on a single shuffle request by collapsing many requests into a multi-get according to the tez.runtime.shuffle.fetch.max.task.output.at.once parameter. For Tsuyoshi Ozawa , I'm still behind on my design doc, but explaining here anyway. The Combiner vertex has strange inputs and outputs compared to today's vertices - I've been thinking about using a new term "Transducer" instead overloading on the idea of the older implementation (Clojure inspired name). A Transducer task accepts all partitions from a subset of mappers which it shares some sort of locality with & merges them depending on the destination. It processes entire map outputs instead of reading a single partition, to maximize sequential reads on the first pass. A Transducer task outputs data out of it in the same order as it ingested, resulting in a sort avoidance in case the next stage requires merged input. The entire vertex reads all the map outputs (in locality-aware "splits") and produces all events required for the downstream vertex. This is going to be strange, since the input edge is a combination of treating map-output as splits, but with the additional information about the input partition - since we lose the partition after the initial collect(), we need to merge the different partitions independently while feeding into the combiner to preserve the final output order between partitions (i.e "A" could be in partition 7 and "Z" in partition 0, and we want the sort order to be (partition,key) just like DefaultSorter output). We need a "PartitionPreservingMergedInput" with a OrderedPartitionedOutput on the upstream, while we can go with the regular Unordered for the unordered cases. On the output side, we'll have to combine a UnorderedPartitionedOutput (call it a "PreorderedPartitionedOutput" to indicate the sorted nature still) with a OrderedGroupedMergedKVInput on the same edge. And therefore we need to take care to not to reorder the data when writing it out. Bikas Saha / Hitesh Shah can correct me, but that needs a minimum of 2 edge managers and 1 vertex manager needed for the Tez impl of this (& then translate the MR DAG into this Tez DAG). This needed the Pipelined shuffle to be implemented first to be performant. The performance corner case kicks in when we can't afford to wait too long for rack-local allocations, so when we start up a task we cannot be sure it will start up in the right rack - we only get to give YARN a hint about allocation and end up with a TaskSpec + allocation which might not have the right locality. Therefore, a combiner task which gets an off-rack URL should immediately duplicate out the same composite DME to the final destination, with a clear marker stating that this is a partial chunk (i.e the pipelined data movement needs to be implemented for combiners to function without accidental miss costs). This short-circuits the accidental sub-optimal scenario due to cluster occupancy - so we're finally ready for this now that pipelined fetchers are in trunk.
        Hide
        Bikas Saha added a comment -

        What I am describing is the concept of partial aggregation (see figure 7 in http://research.microsoft.com/pubs/63785/eurosys07.pdf) in which applying a combiner becomes a special case that may result in further data reduction depending on the combine function. In the degenerate case the combine function is the concatenation function which simply creates a smaller number of large sized chunks from a large number of small sized chunks within cheaper network domains.

        Show
        Bikas Saha added a comment - What I am describing is the concept of partial aggregation (see figure 7 in http://research.microsoft.com/pubs/63785/eurosys07.pdf ) in which applying a combiner becomes a special case that may result in further data reduction depending on the combine function. In the degenerate case the combine function is the concatenation function which simply creates a smaller number of large sized chunks from a large number of small sized chunks within cheaper network domains.
        Hide
        Tsuyoshi Ozawa added a comment - - edited

        Bikas Saha Gopal V As Gopal mentioned, this feature can target 3 and 4. This is a benchmark result of prototype of MAPREDCE-4502: http://www.slideshare.net/ozax86/prestrata-hadoop-word-meetup/11
        On MAPREDUCE-4502, I tried to run combiner after spilling tasks: it causes performance trade off between aggregation ratio vs disk IO. So, Gopal's comment as follows makes sense to me.

        So tuning it to have no extra spills produced bad shuffle performance, which is what the Tez approach is not vulnerable to, since it is meant to combine host-local data (plus skip merges via pipelining).

        If we can implement in-memory combiner or such kind of DAG support in Tez layer, we can improve performance more. However, we need to change the semantics of fault tolerance to support the feature since fault tolerance won't be task-level in this case.

        Show
        Tsuyoshi Ozawa added a comment - - edited Bikas Saha Gopal V As Gopal mentioned, this feature can target 3 and 4. This is a benchmark result of prototype of MAPREDCE-4502: http://www.slideshare.net/ozax86/prestrata-hadoop-word-meetup/11 On MAPREDUCE-4502 , I tried to run combiner after spilling tasks: it causes performance trade off between aggregation ratio vs disk IO. So, Gopal's comment as follows makes sense to me. So tuning it to have no extra spills produced bad shuffle performance, which is what the Tez approach is not vulnerable to, since it is meant to combine host-local data (plus skip merges via pipelining). If we can implement in-memory combiner or such kind of DAG support in Tez layer, we can improve performance more. However, we need to change the semantics of fault tolerance to support the feature since fault tolerance won't be task-level in this case.
        Hide
        Gopal V added a comment -

        Combiner have only made sense in case of 3/4.

        #3 is the true use case for this, because combiners are written only for those scenarios.

        The old MRv2 model did a re-merge + combine() only if there were > 3 spills per task.

        So tuning it to have no extra spills produced bad shuffle performance, which is what the Tez approach is not vulnerable to, since it is meant to combine host-local data (plus skip merges via pipelining).

        The original scenario where I discovered a need for this was when I was trying to find the first/last transaction of sessions across a time window, to look for overlapped session-ids for the same user to detect multiple device usage or stolen tokens.

        Show
        Gopal V added a comment - Combiner have only made sense in case of 3/4. #3 is the true use case for this, because combiners are written only for those scenarios. The old MRv2 model did a re-merge + combine() only if there were > 3 spills per task. So tuning it to have no extra spills produced bad shuffle performance, which is what the Tez approach is not vulnerable to, since it is meant to combine host-local data (plus skip merges via pipelining). The original scenario where I discovered a need for this was when I was trying to find the first/last transaction of sessions across a time window, to look for overlapped session-ids for the same user to detect multiple device usage or stolen tokens.
        Hide
        Bikas Saha added a comment - - edited

        Taking a step back, lets figure out the scenarios for this.
        Do we agree that
        1) Small jobs (small data) - this is not going to be helpful because we will be adding an extra stage latency for small combiner benefits.
        2) Large job (large data) with no data reduction in the map side combiner - this is not going to be helpful because the extra combiner will not reduce the data further.
        3) Large job (large data) with high data reduction in the map side combiner - this is going to be useful because the extra combiner will reduce the data further and also decrease the number of data shards by aggregating small outputs from the map tasks into smaller number of combiner tasks.
        4) Large job (large data) with lot of filtering (no combiner) - this may be useful, not because their is a combine operation) but to reduce the large number of small outputs produced by the map tasks into a smaller number of shards due to the combiner tasks.

        For 3/4 this may be useful if we can run aggregation combiner tasks at the rack level to coalesce the data within a rack (cheap) compared to having to pull that data across racks in the final reducer. Even in these cases, given better networks, we need to understand the trade off between pulling the data across to the final reducer vs the cost of running the extra combiner stage. Essentially, what is the killer scenario for this?

        Show
        Bikas Saha added a comment - - edited Taking a step back, lets figure out the scenarios for this. Do we agree that 1) Small jobs (small data) - this is not going to be helpful because we will be adding an extra stage latency for small combiner benefits. 2) Large job (large data) with no data reduction in the map side combiner - this is not going to be helpful because the extra combiner will not reduce the data further. 3) Large job (large data) with high data reduction in the map side combiner - this is going to be useful because the extra combiner will reduce the data further and also decrease the number of data shards by aggregating small outputs from the map tasks into smaller number of combiner tasks. 4) Large job (large data) with lot of filtering (no combiner) - this may be useful, not because their is a combine operation) but to reduce the large number of small outputs produced by the map tasks into a smaller number of shards due to the combiner tasks. For 3/4 this may be useful if we can run aggregation combiner tasks at the rack level to coalesce the data within a rack (cheap) compared to having to pull that data across racks in the final reducer. Even in these cases, given better networks, we need to understand the trade off between pulling the data across to the final reducer vs the cost of running the extra combiner stage. Essentially, what is the killer scenario for this?
        Hide
        Tsuyoshi Ozawa added a comment -

        Gopal V thanks for taking a look at my patch and your comment!

        but the edge connectivity is still shuffle + total-order merged for both edges

        You're right. We can use UnorderedPartitionedKVEdge for optimization since aggregation tasks don't need sorting as you know.

        I will write a more detailed design document tomorrow and upload it here which will expand on Bikas's earlier comment and I will draw out the runtime expansion graphs to indicate the sort-preserving combiner instead of re-sorting data along the way (since the combiner never mutates the keys or output ordering).

        OK, Looking forward.

        Show
        Tsuyoshi Ozawa added a comment - Gopal V thanks for taking a look at my patch and your comment! but the edge connectivity is still shuffle + total-order merged for both edges You're right. We can use UnorderedPartitionedKVEdge for optimization since aggregation tasks don't need sorting as you know. I will write a more detailed design document tomorrow and upload it here which will expand on Bikas's earlier comment and I will draw out the runtime expansion graphs to indicate the sort-preserving combiner instead of re-sorting data along the way (since the combiner never mutates the keys or output ordering). OK, Looking forward.
        Hide
        Gopal V added a comment -

        Tsuyoshi Ozawa: Got some time to test this out.

        The current implementation is just adding a new reducer stage, while it is not entirely the same a combiner stage.

        The implementation we want to produce is not exactly a sort-merged reducer stage, since that only fixes the parallelism part of the reducer shuffle (that could be achieved by increasing reducer count anyway).

        The current implementation is a translation of an old MR task into an MRR task, but the edge connectivity is still shuffle + total-order merged for both edges (I see OrderedPartitionedKVEdgeConfig for both M -> R -> R edges).

        I will write a more detailed design document tomorrow and upload it here which will expand on Bikas's earlier comment and I will draw out the runtime expansion graphs to indicate the sort-preserving combiner instead of re-sorting data along the way (since the combiner never mutates the keys or output ordering).

        Show
        Gopal V added a comment - Tsuyoshi Ozawa : Got some time to test this out. The current implementation is just adding a new reducer stage, while it is not entirely the same a combiner stage. The implementation we want to produce is not exactly a sort-merged reducer stage, since that only fixes the parallelism part of the reducer shuffle (that could be achieved by increasing reducer count anyway). The current implementation is a translation of an old MR task into an MRR task, but the edge connectivity is still shuffle + total-order merged for both edges (I see OrderedPartitionedKVEdgeConfig for both M -> R -> R edges). I will write a more detailed design document tomorrow and upload it here which will expand on Bikas's earlier comment and I will draw out the runtime expansion graphs to indicate the sort-preserving combiner instead of re-sorting data along the way (since the combiner never mutates the keys or output ordering).
        Hide
        Tsuyoshi Ozawa added a comment -

        @Gopal V Bikas Saha How can we go ahead with this issue? I'll implement lacking parts if we need it.

        Show
        Tsuyoshi Ozawa added a comment - @Gopal V Bikas Saha How can we go ahead with this issue? I'll implement lacking parts if we need it.
        Hide
        Tsuyoshi Ozawa added a comment -

        Gopal V Bikas Saha thank you for the feedback! I have a question about where to handle the locality. Do you mean that we should use InputReadyVertexManager? Or, should we create new abstraction layer to deal with the locality? As a reference, on MAPREDUCE-4502, I changed AppMaster to handle locality of inputs and when to launch the node-level combiner.

        Show
        Tsuyoshi Ozawa added a comment - Gopal V Bikas Saha thank you for the feedback! I have a question about where to handle the locality. Do you mean that we should use InputReadyVertexManager? Or, should we create new abstraction layer to deal with the locality? As a reference, on MAPREDUCE-4502 , I changed AppMaster to handle locality of inputs and when to launch the node-level combiner.
        Hide
        Bikas Saha added a comment -

        I know what you are talking about but let me restate to check if we are on the same page.
        Combining can be at multiple levels - task, host, rack etc.
        Doing these combines in theory requires maintaining partition boundaries per combining level. However, if tasks are maintaining partition boundaries then there is a task explosion (== level-arity * partition count). Hence, an efficient, multi-level combine operation, needs to operate on multiple partitions per task at each level. Such that a reasonable number of tasks can be used to process a large number of partitions. This statement can be true even for the final reducer. Partially, that is what happens with auto-reduce except that the tasks lost their partition boundaries.
        If the processor can find a way to process multiple partitions while keeping them logically separate then we could de-link physical tasks from physical partitioning. If that is supported by the processor, the edge manager can be set up to do the correct routing of N output/partition indeces to the same task.

        Show
        Bikas Saha added a comment - I know what you are talking about but let me restate to check if we are on the same page. Combining can be at multiple levels - task, host, rack etc. Doing these combines in theory requires maintaining partition boundaries per combining level. However, if tasks are maintaining partition boundaries then there is a task explosion (== level-arity * partition count). Hence, an efficient, multi-level combine operation, needs to operate on multiple partitions per task at each level. Such that a reasonable number of tasks can be used to process a large number of partitions. This statement can be true even for the final reducer. Partially, that is what happens with auto-reduce except that the tasks lost their partition boundaries. If the processor can find a way to process multiple partitions while keeping them logically separate then we could de-link physical tasks from physical partitioning. If that is supported by the processor, the edge manager can be set up to do the correct routing of N output/partition indeces to the same task.
        Hide
        Gopal V added a comment -

        This is a question for Bikas Saha.

        There is an combiner edge & vertex manager that needs to go along with this to which converts all partitions from all local input into one combine processor (i.e if it finds it has remote fetches to do, it should just forward the DME events using the pipelined mode of >1 event per-attempt).

        To be able to bail-out with a no-op like that, all partitioning through-out has to be exactly the reducer partition count.

        This is the most optimal mode, but this makes everything extra complex.

        Assume you have 600 hosts over 30 racks which ran a map-task + 2000 partitions in the reducer.

        The host-level combiner input count is actually 600 x 2000 partitions, which can be grouped into 600 x m groups - not 2000 groups.

        The rack-level combiner input count is actually 30 x 2000 partitions, which can be grouped into 30 x n groups - not 2000 groups.

        Yet, all the inputs are actually always partitioned into 2000 partitions and the destination task-index is determined by something other than the partition.

        So, how practical is that?

        Show
        Gopal V added a comment - This is a question for Bikas Saha . There is an combiner edge & vertex manager that needs to go along with this to which converts all partitions from all local input into one combine processor (i.e if it finds it has remote fetches to do, it should just forward the DME events using the pipelined mode of >1 event per-attempt). To be able to bail-out with a no-op like that, all partitioning through-out has to be exactly the reducer partition count. This is the most optimal mode, but this makes everything extra complex. Assume you have 600 hosts over 30 racks which ran a map-task + 2000 partitions in the reducer. The host-level combiner input count is actually 600 x 2000 partitions, which can be grouped into 600 x m groups - not 2000 groups. The rack-level combiner input count is actually 30 x 2000 partitions, which can be grouped into 30 x n groups - not 2000 groups. Yet, all the inputs are actually always partitioned into 2000 partitions and the destination task-index is determined by something other than the partition. So, how practical is that?
        Hide
        Gopal V added a comment -

        Tsuyoshi Ozawa: the CombineProcessor patch looks good.

        This will help applications which do no in-memory aggregations, but you're effectively moving the data over racks ~3x.

        So this is a necessary part of the fix, but not the complete part as long as the ShuffleVertexManager is being used to connect them up.

        Because that vertex manager has no way to provide locality of tasks when spinning up tasks (for rack-local or host-local).

        Show
        Gopal V added a comment - Tsuyoshi Ozawa : the CombineProcessor patch looks good. This will help applications which do no in-memory aggregations, but you're effectively moving the data over racks ~3x. So this is a necessary part of the fix, but not the complete part as long as the ShuffleVertexManager is being used to connect them up. Because that vertex manager has no way to provide locality of tasks when spinning up tasks (for rack-local or host-local).
        Hide
        Hadoop QA added a comment -

        +1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12705400/TEZ-145.2.patch
        against master revision 9b845f2.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 1 new or modified test files.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 javadoc. There were no new javadoc warning messages.

        +1 findbugs. The patch does not introduce any new Findbugs (version 2.0.3) warnings.

        +1 release audit. The applied patch does not increase the total number of release audit warnings.

        +1 core tests. The patch passed unit tests in .

        Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/310//testReport/
        Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/310//console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - +1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12705400/TEZ-145.2.patch against master revision 9b845f2. +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. +1 javac . The applied patch does not increase the total number of javac compiler warnings. +1 javadoc . There were no new javadoc warning messages. +1 findbugs . The patch does not introduce any new Findbugs (version 2.0.3) warnings. +1 release audit . The applied patch does not increase the total number of release audit warnings. +1 core tests . The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/310//testReport/ Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/310//console This message is automatically generated.
        Tsuyoshi Ozawa made changes -
        Attachment TEZ-145.2.patch [ 12705400 ]
        Hide
        Tsuyoshi Ozawa added a comment -

        Fix warnings by findbugs.

        Show
        Tsuyoshi Ozawa added a comment - Fix warnings by findbugs.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Sure, no problem. Gopal V looking forward your feedback

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Sure, no problem. Gopal V looking forward your feedback
        Hide
        Bikas Saha added a comment -

        Tsuyoshi Ozawa I will wait for a few days for Gopal to respond before I review this code. Please let me know if that works for you.

        Show
        Bikas Saha added a comment - Tsuyoshi Ozawa I will wait for a few days for Gopal to respond before I review this code. Please let me know if that works for you.
        Hide
        Bikas Saha added a comment -

        Gopal V You may have more ideas on optimal combine processor implementation that may be of relevance here.

        Show
        Bikas Saha added a comment - Gopal V You may have more ideas on optimal combine processor implementation that may be of relevance here.
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12703928/WIP-TEZ-145-001.patch
        against master revision 59529ab.

        +1 @author. The patch does not contain any @author tags.

        +1 tests included. The patch appears to include 1 new or modified test files.

        +1 javac. The applied patch does not increase the total number of javac compiler warnings.

        +1 javadoc. There were no new javadoc warning messages.

        -1 findbugs. The patch appears to introduce 2 new Findbugs (version 2.0.3) warnings.

        +1 release audit. The applied patch does not increase the total number of release audit warnings.

        +1 core tests. The patch passed unit tests in .

        Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/278//testReport/
        Findbugs warnings: https://builds.apache.org/job/PreCommit-TEZ-Build/278//artifact/patchprocess/newPatchFindbugsWarningstez-mapreduce.html
        Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/278//console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall . Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12703928/WIP-TEZ-145-001.patch against master revision 59529ab. +1 @author . The patch does not contain any @author tags. +1 tests included . The patch appears to include 1 new or modified test files. +1 javac . The applied patch does not increase the total number of javac compiler warnings. +1 javadoc . There were no new javadoc warning messages. -1 findbugs . The patch appears to introduce 2 new Findbugs (version 2.0.3) warnings. +1 release audit . The applied patch does not increase the total number of release audit warnings. +1 core tests . The patch passed unit tests in . Test results: https://builds.apache.org/job/PreCommit-TEZ-Build/278//testReport/ Findbugs warnings: https://builds.apache.org/job/PreCommit-TEZ-Build/278//artifact/patchprocess/newPatchFindbugsWarningstez-mapreduce.html Console output: https://builds.apache.org/job/PreCommit-TEZ-Build/278//console This message is automatically generated.
        Hide
        Tsuyoshi Ozawa added a comment -
        +    if (hasFinalReduceStage) {
        +      boolean hasHostLocalReduceStage =
        +          conf.getBoolean(MRJobConfig.MRR_HOST_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_HOST_LOCAL_REDUCE);
        +      if (hasHostLocalReduceStage) {
        +        numIntermediateStages++;
        +      }
        +      boolean hasRackLocalReduceStage =
        +          conf.getBoolean(MRJobConfig.MRR_RACK_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_RACK_LOCAL_REDUCE);
        +      if (hasRackLocalReduceStage) {
        +        numIntermediateStages++;
        +      }
        +    }
        

        These options are not used for now.

        Show
        Tsuyoshi Ozawa added a comment - + if (hasFinalReduceStage) { + boolean hasHostLocalReduceStage = + conf.getBoolean(MRJobConfig.MRR_HOST_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_HOST_LOCAL_REDUCE); + if (hasHostLocalReduceStage) { + numIntermediateStages++; + } + boolean hasRackLocalReduceStage = + conf.getBoolean(MRJobConfig.MRR_RACK_LOCAL_REDUCE, MRJobConfig.DEFAULT_MRR_RACK_LOCAL_REDUCE); + if (hasRackLocalReduceStage) { + numIntermediateStages++; + } + } These options are not used for now.
        Hitesh Shah made changes -
        Labels gsoc gsoc2015 hadoop java tez
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Hitesh Shah I prototyped CombineProcessor and TestOrderedWordCountWithCombineProcessor as an application.

        • CombineProcessor accepts OrderedGroupedInputLegacy and run Combiner#combineWithKVWriter.
        • MRCombiner has a new method, combineWithKVWriter, for accepting KeyValueWriter instead of IFile#Writer.
        • TestOrderedWordCountWithCombineProcessor uses CombineProcessor for an example. TestOrderedWordCountWithCombineProcessor finished in 3m40.838s with CombineProcessor, , while the TestOrderedWordCount finished in 6m53.447s. (Note that this result is unfair as you know - TestOrderedWordCountWithCombineProcessor works with 10 combiners thought TestOrderedWordCount works with only 2 reducers.)

        I appreciate if you give me a feedback!

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Hitesh Shah I prototyped CombineProcessor and TestOrderedWordCountWithCombineProcessor as an application. CombineProcessor accepts OrderedGroupedInputLegacy and run Combiner#combineWithKVWriter. MRCombiner has a new method, combineWithKVWriter, for accepting KeyValueWriter instead of IFile#Writer. TestOrderedWordCountWithCombineProcessor uses CombineProcessor for an example. TestOrderedWordCountWithCombineProcessor finished in 3m40.838s with CombineProcessor, , while the TestOrderedWordCount finished in 6m53.447s. (Note that this result is unfair as you know - TestOrderedWordCountWithCombineProcessor works with 10 combiners thought TestOrderedWordCount works with only 2 reducers.) I appreciate if you give me a feedback!
        Tsuyoshi Ozawa made changes -
        Attachment WIP-TEZ-145-001.patch [ 12703931 ]
        Tsuyoshi Ozawa made changes -
        Attachment WIP-TEZ-145-001.patch [ 12703928 ]
        Tsuyoshi Ozawa made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Tsuyoshi Ozawa made changes -
        Attachment WIP-TEZ-145-001.patch [ 12703928 ]
        Hide
        Tsuyoshi Ozawa added a comment -

        Attaching WIP patch.

        Show
        Tsuyoshi Ozawa added a comment - Attaching WIP patch.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas, thank you for the comment. Make sense. As a first prototyping, I'll try to create CombinerProcessor.

        Show
        Tsuyoshi Ozawa added a comment - Bikas, thank you for the comment. Make sense. As a first prototyping, I'll try to create CombinerProcessor.
        Hide
        Bikas Saha added a comment -

        Tsuyoshi Ozawa Since DAG creation is a user land operation, Tez cannot insert this optimization by itself. This would most likely have to be a combination of a CombinerProcessor that can be added to a Combiner vertex in the DAG. Its inputs and outputs would have to be assigned based on the producers (sorted etc). It would run the combine to reduce data and then output (in desired format - sorted etc) to the next vertex. The users would choose to add this vertex when they expect significant data reduction. There is likely to be an associated VertexManager that can group input data to combiner tasks to combine data on a machine or on a rack dynamically at runtime (instead of TaskLocationHint). Different combiner tasks may have different number of input edges.

        Show
        Bikas Saha added a comment - Tsuyoshi Ozawa Since DAG creation is a user land operation, Tez cannot insert this optimization by itself. This would most likely have to be a combination of a CombinerProcessor that can be added to a Combiner vertex in the DAG. Its inputs and outputs would have to be assigned based on the producers (sorted etc). It would run the combine to reduce data and then output (in desired format - sorted etc) to the next vertex. The users would choose to add this vertex when they expect significant data reduction. There is likely to be an associated VertexManager that can group input data to combiner tasks to combine data on a machine or on a rack dynamically at runtime (instead of TaskLocationHint). Different combiner tasks may have different number of input edges.
        Hide
        Tsuyoshi Ozawa added a comment -

        Bikas Saha Gopal V I meant following configurations:

        • Creating DAGs with CombinerProcessor via YARNRunner#createDAG only when configuration is enabled.
        • CombinerProcessor works with TaskLocationHint.

        What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Bikas Saha Gopal V I meant following configurations: Creating DAGs with CombinerProcessor via YARNRunner#createDAG only when configuration is enabled. CombinerProcessor works with TaskLocationHint. What do you think?
        Hide
        Tsuyoshi Ozawa added a comment -

        Hitesh Shah Sorry for the delay. Maybe I can throw a patch in a few weeks. Thanks for your notification.

        Show
        Tsuyoshi Ozawa added a comment - Hitesh Shah Sorry for the delay. Maybe I can throw a patch in a few weeks. Thanks for your notification.
        Hide
        Hitesh Shah added a comment - - edited

        Tsuyoshi Ozawa In case you have not started working on this, I have tagged this jira as a potential idea for Google Summer of code.

        Show
        Hitesh Shah added a comment - - edited Tsuyoshi Ozawa In case you have not started working on this, I have tagged this jira as a potential idea for Google Summer of code.
        Hitesh Shah made changes -
        Labels gsoc gsoc2015 hadoop tez yarn gsoc gsoc2015 hadoop java tez
        Hitesh Shah made changes -
        Labels TEZ-1 gsoc gsoc2015 hadoop tez yarn
        Hide
        Bikas Saha added a comment -

        Not sure what you mean by YARNRunner#createDAG with TaskLocationHint. It seems like a VertexManager that does this at runtime (after being setup at compile time) would be the way to go. Maybe it will clear things up if there is a doc describing the solution so that we are all on the same page.

        Gopal, it maybe that the notion of combiners is composable. So a combiner is a commutative and associative function that can be applied in any order and any number of times. Then the combiner could be run just after the mapper (in-proc), independently (in combiner tasks), or just before the reducer (in-proc) to trade off pure combiner work with overhead of doing it. Are we thinking of the same thing?

        Show
        Bikas Saha added a comment - Not sure what you mean by YARNRunner#createDAG with TaskLocationHint. It seems like a VertexManager that does this at runtime (after being setup at compile time) would be the way to go. Maybe it will clear things up if there is a doc describing the solution so that we are all on the same page. Gopal, it maybe that the notion of combiners is composable. So a combiner is a commutative and associative function that can be applied in any order and any number of times. Then the combiner could be run just after the mapper (in-proc), independently (in combiner tasks), or just before the reducer (in-proc) to trade off pure combiner work with overhead of doing it. Are we thinking of the same thing?
        Hide
        Tsuyoshi Ozawa added a comment -

        Thanks for good suggestions, Gopal V and Bikas Saha. I took some days to benchmark base line of Tez. Now I started to implement this feature in YARNRunner#createDAG with TaskLocationHint - host-local R and racklocal R as Gopal mentioned. I'll share you when I have any progress. Thanks!

        Show
        Tsuyoshi Ozawa added a comment - Thanks for good suggestions, Gopal V and Bikas Saha . I took some days to benchmark base line of Tez. Now I started to implement this feature in YARNRunner#createDAG with TaskLocationHint - host-local R and racklocal R as Gopal mentioned. I'll share you when I have any progress. Thanks!
        Hide
        Gopal V added a comment -

        Yes, this would be harder to implement if we retain the MR notions of combiners/mappers running in-proc.

        I think the an intermediate step would be to pair this idea with a chunked shuffle, which shuffles 1 sort buffer at a time.

        A combine task is then easier to model, as the map tasks themselves will never run any combiner tasks in that model.

        Logically, this turns MR into M-R-R-R with M-(host-local)R-(rack-local)R-(final)R.

        Once the user is producing that DAG with custom edges, the complexity reduces into just the scheduling/event-routing for those the edges from the topology information in the Edge/Vertex managers.

        Show
        Gopal V added a comment - Yes, this would be harder to implement if we retain the MR notions of combiners/mappers running in-proc. I think the an intermediate step would be to pair this idea with a chunked shuffle, which shuffles 1 sort buffer at a time. A combine task is then easier to model, as the map tasks themselves will never run any combiner tasks in that model. Logically, this turns MR into M-R-R-R with M-(host-local)R-(rack-local)R-(final)R. Once the user is producing that DAG with custom edges, the complexity reduces into just the scheduling/event-routing for those the edges from the topology information in the Edge/Vertex managers.
        Hide
        Bikas Saha added a comment -

        We can look at a combination of vertex manager/edge (possibly only vertexmanager) that can do the following job. Given a set of distributed data (typically produced by the vertex that reads input but can be any vertex), how can it be made less distributed so that we have a balance of parallelism and cost of computation. The cost of launching 10K tasks to process 10K small pieces of data might be less than launching 1K tasks to process the same data if it was in 1K pieces. If the disttibuted data is already small then it can be aggregated to a smaller set of locations, via an aggregation tree, so that each location has a meaningful size wrt the cost of computation. If the distributed data is large but we are going to apply a reducing aggregating function on it (eg. sum or count) then it may make sense to apply that aggreagtion function on it while executing that aggregation tree because then the data volume will decrease as it funnels down that tree. MR combiner is simulating 0 level trees in some sense. However, if the data volume is large and there is no reducing function then we should skip the aggregation tree and directly do the subsequent operation because the aggregation tree provides no benefit. So having the ability of to create these kind of aggregation trees via a vertex manager would be one promising way to go about doing this. However we are not ready for that yet because we dont support dynamic addition/removal of vertices in the DAG yet. Still, we can make a first step where the used has the option of statically creating the aggregation vertex with a rack level aggregation manager that can do rack level aggregations of data. This compile time choice can be made by the user when they are going to process a large volume of data and apply a reducing function to it.

        Show
        Bikas Saha added a comment - We can look at a combination of vertex manager/edge (possibly only vertexmanager) that can do the following job. Given a set of distributed data (typically produced by the vertex that reads input but can be any vertex), how can it be made less distributed so that we have a balance of parallelism and cost of computation. The cost of launching 10K tasks to process 10K small pieces of data might be less than launching 1K tasks to process the same data if it was in 1K pieces. If the disttibuted data is already small then it can be aggregated to a smaller set of locations, via an aggregation tree, so that each location has a meaningful size wrt the cost of computation. If the distributed data is large but we are going to apply a reducing aggregating function on it (eg. sum or count) then it may make sense to apply that aggreagtion function on it while executing that aggregation tree because then the data volume will decrease as it funnels down that tree. MR combiner is simulating 0 level trees in some sense. However, if the data volume is large and there is no reducing function then we should skip the aggregation tree and directly do the subsequent operation because the aggregation tree provides no benefit. So having the ability of to create these kind of aggregation trees via a vertex manager would be one promising way to go about doing this. However we are not ready for that yet because we dont support dynamic addition/removal of vertices in the DAG yet. Still, we can make a first step where the used has the option of statically creating the aggregation vertex with a rack level aggregation manager that can do rack level aggregations of data. This compile time choice can be made by the user when they are going to process a large volume of data and apply a reducing function to it.
        Hide
        Tsuyoshi Ozawa added a comment -

        Hi Bikas Saha, can we add node/rack-local processor or edge? It's generic and useful to process large amount data. What do you think?

        Show
        Tsuyoshi Ozawa added a comment - Hi Bikas Saha , can we add node/rack-local processor or edge? It's generic and useful to process large amount data. What do you think?
        Hide
        Bikas Saha added a comment -

        Is this going to be MR specific? Not sure how we can add a processor thats independent of a higher level application.

        Show
        Bikas Saha added a comment - Is this going to be MR specific? Not sure how we can add a processor thats independent of a higher level application.
        Tsuyoshi Ozawa made changes -
        Assignee Hitesh Shah [ hitesh ] Tsuyoshi OZAWA [ ozawa ]
        Hide
        Tsuyoshi Ozawa added a comment -

        Thank you for sharing useful points, Hitesh! I'm starting to read code and think the design. I'll share a design when it's ready.

        Show
        Tsuyoshi Ozawa added a comment - Thank you for sharing useful points, Hitesh! I'm starting to read code and think the design. I'll share a design when it's ready.
        Hide
        Hitesh Shah added a comment -

        Tsuyoshi Ozawa Added you as a contributor. Feel free to re-assign the jira to yourself if you would like to take this on. Thanks.

        Show
        Hitesh Shah added a comment - Tsuyoshi Ozawa Added you as a contributor. Feel free to re-assign the jira to yourself if you would like to take this on. Thanks.
        Hide
        Hitesh Shah added a comment - - edited

        Tsuyoshi Ozawa Please go ahead and take a crack at it if you like. I have not started on any work related to this jira as of now.

        I will propose what I was thinking of as a possible option for a solution with respect to the current features of Tez. Feel free to change/suggest/tear apart the proposal/suggest improvements for core functionality in tez.

        Possible design:

        Background info: Tez supports concepts of vertices and edges. As part of each edge and vertex, there is possibility for a user to plug-in some user logic to affect different aspects of the run-time. Currently, there are some pieces of this implemented to support things such as dynamic sizing of the no. of reduce tasks to run. Via events, information of the outputs of a map stage can be sampled in the AM to determine how many reducers to run. Once this is decided, the user logic in the AM can then route the information of map outputs ( within DataMovementEvents) to the appropriate reducer to ensure that partitions are assigned correctly.

        Today, a MapReduce job consists of a Map vertex connected to a Reduce vertex via a Shuffle edge. For the above, I was thinking along the lines of a Map vertex followed by a Combiner Vertex which is then connected to the Reduce Vertex. The edge between the Map and combiner vertex could also just be a shuffle.
        Using a similar approach for reducer dynamism, the combiner vertex could use events generated by the framework to learn about the locations of where the Map tasks are running. Based on this, the user logic could then decide how many tasks to create for the combiner vertex ( For example, one per physical node or one per rack ) and also define the locality requirements. Note, the shuffle edge works by the map task generating an event publishing the location of the map output which is then passed to the next stage's input. Using this, there could be various optimizations done too. In some cases, the combiner vertex may decide to do no work and therefore pass the event generated by the map directly to the reduce without doing any work. This may require changes in the current shuffle input/output pairs though.

        Tez is still sometime away before we can dynamically introduce new vertices into the DAG. At some point, the combiner vertex would be dynamically introduced by user-logic but at this time, it might be a good start to implement it via a static DAG with optimizations to bypass it as needed.

        There is some reference information here: http://hortonworks.com/hadoop/tez/. ( We plan to create better docs and publish to the apache tez website soon ).

        Show
        Hitesh Shah added a comment - - edited Tsuyoshi Ozawa Please go ahead and take a crack at it if you like. I have not started on any work related to this jira as of now. I will propose what I was thinking of as a possible option for a solution with respect to the current features of Tez. Feel free to change/suggest/tear apart the proposal/suggest improvements for core functionality in tez. Possible design: Background info: Tez supports concepts of vertices and edges. As part of each edge and vertex, there is possibility for a user to plug-in some user logic to affect different aspects of the run-time. Currently, there are some pieces of this implemented to support things such as dynamic sizing of the no. of reduce tasks to run. Via events, information of the outputs of a map stage can be sampled in the AM to determine how many reducers to run. Once this is decided, the user logic in the AM can then route the information of map outputs ( within DataMovementEvents) to the appropriate reducer to ensure that partitions are assigned correctly. Today, a MapReduce job consists of a Map vertex connected to a Reduce vertex via a Shuffle edge. For the above, I was thinking along the lines of a Map vertex followed by a Combiner Vertex which is then connected to the Reduce Vertex. The edge between the Map and combiner vertex could also just be a shuffle. Using a similar approach for reducer dynamism, the combiner vertex could use events generated by the framework to learn about the locations of where the Map tasks are running. Based on this, the user logic could then decide how many tasks to create for the combiner vertex ( For example, one per physical node or one per rack ) and also define the locality requirements. Note, the shuffle edge works by the map task generating an event publishing the location of the map output which is then passed to the next stage's input. Using this, there could be various optimizations done too. In some cases, the combiner vertex may decide to do no work and therefore pass the event generated by the map directly to the reduce without doing any work. This may require changes in the current shuffle input/output pairs though. Tez is still sometime away before we can dynamically introduce new vertices into the DAG. At some point, the combiner vertex would be dynamically introduced by user-logic but at this time, it might be a good start to implement it via a static DAG with optimizations to bypass it as needed. There is some reference information here: http://hortonworks.com/hadoop/tez/ . ( We plan to create better docs and publish to the apache tez website soon ).
        Hide
        Tsuyoshi Ozawa added a comment -

        Hi Hitesh Shah, what's going on this JIRA? I'm a developer of MAPREDUCE-4502, a node level aggregation for MapReduce. If Tez is the phase to develop this feature, I'd like to tackle this feature.

        Show
        Tsuyoshi Ozawa added a comment - Hi Hitesh Shah , what's going on this JIRA? I'm a developer of MAPREDUCE-4502 , a node level aggregation for MapReduce. If Tez is the phase to develop this feature, I'd like to tackle this feature.
        Jake Farrell made changes -
        Field Original Value New Value
        Workflow jira [ 12782816 ] no-reopen-closed, patch-avail [ 12807622 ]
        Hitesh Shah created issue -

          People

          • Assignee:
            Tsuyoshi Ozawa
            Reporter:
            Hitesh Shah
          • Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

            • Created:
              Updated:

              Development