Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
-
None
Description
This is based on offline discussion with gopalv, hitesh, jrottinghuis and lohit w.r.t. the support for efficient processing of highly skewed unordered partitioned mapper output. Our use case is to demux highly skewed unordered category data partitioned by category name. Gopal and Hitesh mentioned dynamically shuffled join scenario.
One option we discussed is to leverage auto-parallelism feature with upfront over-partitioning. That means possible overhead to support large number partitions and unnecessary data movement as each reducer needs to get data from all mappers.
Another alternative is to use custom DataMovementType which doesn't require each reducer to fetch data from all mappers. In that way, a large partition will be processed by several reducers, each of which will fetch data from a portion of mappers.
For example, say there are 100 mappers each of which has 10 partitions (P1, ..., P10). Each mapper generates 100MB for its P10 and 1MB for each of its (P1, ... P9). The default SCATTER_GATHER routing means the reducer for P10 has to process 10GB of input and becomes the bottleneck of the job. With the fair custom data routing, The P10 belonging to the first 10 mappers will be processed by one reducer with 1GB input data. The P10 belonging to the second 10 mappers will be processed by another reducer, etc.
For further optimization, we can allocate the reducer on the same nodes as the mappers that it fetches data from.
To support this, we need TEZ-3206 as well as customized data routing based on VertexManagerPlugin and EdgeManagerPluginOnDemand.
Attachments
Attachments
Issue Links
1.
|
Provide a demuxer sample app that uses fair routing | Patch Available | Shohei Okumiya |
|
||||||||
2.
|
Provide basic fair routing and scheduling functionality via custom VertexManager and EdgeManager | Closed | Ming Ma | |||||||||
3.
|
Data locality based scheduling policy in fair routing | Open | Unassigned | |||||||||
4.
|
Refactor ShuffleVertexManager to make parts of it re-usable in other plugins | Closed | Ming Ma | |||||||||
5.
|
Fair routing support for multiple source vertices | Open | Unassigned | |||||||||
6.
|
Support a new data routing policy for small partitions | Patch Available | Ming Ma | |||||||||
7.
|
Allow the FAIR_PARALLELISM mode to accept multiple source vertices | Patch Available | Shohei Okumiya |
|
||||||||
8.
|
Partition stats should be always uncompressed size | Resolved | Shohei Okumiya |
|