Description
When aggregator is enabled, in each message receiving round, all received message have been resent to put them behind the aggregate message. doAggregationUpdates method of GraphJobRunner Class:
if (aggregationRunner.isEnabled() && iteration > 1) { // in case we need to sync, we need to replay the messages that already // are added to the queue. This prevents loosing messages when using // aggregators. if (firstVertexMessage != null) { peer.send(peer.getPeerName(), firstVertexMessage); } GraphJobMessage msg = null; while ((msg = peer.getCurrentMessage()) != null) { peer.send(peer.getPeerName(), msg); } // now sync peer.sync(); ...
Should we do some improvement here? Record the original receiveQueue before the sync, and after sync operation, read the aggregator updated value firstly, then restore the original receiveQueue such that avoiding the message resending operations.