Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() – see
GIRAPH-322, GIRAPH-314. You get the idea.
This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
Comments/ideas/help welcome! Thanks