Bikas Saha: The figure 7 is identical to the runtime expansion I have drawn on my whiteboard.
FYI, we've already implemented the simplest case in Tez when implementing the keep-alive shuffle_handler. We stream >1 map-outputs out of a host one after the other on a single shuffle request by collapsing many requests into a multi-get according to the tez.runtime.shuffle.fetch.max.task.output.at.once parameter.
For Tsuyoshi Ozawa, I'm still behind on my design doc, but explaining here anyway.
The Combiner vertex has strange inputs and outputs compared to today's vertices - I've been thinking about using a new term "Transducer" instead overloading on the idea of the older implementation (Clojure inspired name).
A Transducer task accepts all partitions from a subset of mappers which it shares some sort of locality with & merges them depending on the destination. It processes entire map outputs instead of reading a single partition, to maximize sequential reads on the first pass.
A Transducer task outputs data out of it in the same order as it ingested, resulting in a sort avoidance in case the next stage requires merged input.
The entire vertex reads all the map outputs (in locality-aware "splits") and produces all events required for the downstream vertex.
This is going to be strange, since the input edge is a combination of treating map-output as splits, but with the additional information about the input partition - since we lose the partition after the initial collect(), we need to merge the different partitions independently while feeding into the combiner to preserve the final output order between partitions (i.e "A" could be in partition 7 and "Z" in partition 0, and we want the sort order to be (partition,key) just like DefaultSorter output).
We need a "PartitionPreservingMergedInput" with a OrderedPartitionedOutput on the upstream, while we can go with the regular Unordered for the unordered cases.
On the output side, we'll have to combine a UnorderedPartitionedOutput (call it a "PreorderedPartitionedOutput" to indicate the sorted nature still) with a OrderedGroupedMergedKVInput on the same edge. And therefore we need to take care to not to reorder the data when writing it out.
Bikas Saha/Hitesh Shah can correct me, but that needs a minimum of 2 edge managers and 1 vertex manager needed for the Tez impl of this (& then translate the MR DAG into this Tez DAG).
This needed the Pipelined shuffle to be implemented first to be performant.
The performance corner case kicks in when we can't afford to wait too long for rack-local allocations, so when we start up a task we cannot be sure it will start up in the right rack - we only get to give YARN a hint about allocation and end up with a TaskSpec + allocation which might not have the right locality.
Therefore, a combiner task which gets an off-rack URL should immediately duplicate out the same composite DME to the final destination, with a clear marker stating that this is a partial chunk (i.e the pipelined data movement needs to be implemented for combiners to function without accidental miss costs).
This short-circuits the accidental sub-optimal scenario due to cluster occupancy - so we're finally ready for this now that pipelined fetchers are in trunk.