Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.1.0
-
None
-
None
Description
I am trying to implement new feature (small class assignment) – voting to change computation "phase" – using MasterCompute.
Basically when every Vertex "votes to advance to next phase" I need to check conditions like in:
if (masterCompute.isHalted() || (globalStats.getFinishedVertexCount() == globalStats.getVertexCount() && globalStats.getMessageCount() == 0)) { globalStats.setHaltComputation(true); }
In my solution I check if all vertices have "voted" and there are no messages to deliver. If that is true I broadcast next phase number through an aggregator.
Now I have problems obtaining GlobalStats.getMessageCount(). I see that coordinateSuperstep() is performing those actions (this is example with superstep 15 and 16):
- Call AbstractComputation.compute() on every Vertex with superstep 15
- Call MasterCompute.compute() with superstep 16
- Aggregate GlobalStats from superstep 15
- Check halting conditions after superstep 15
- Copy GlobalStats to GiraphStats
- Go to step 1
I can access GiraphStats.getInstance().getSentMessages() in MasterCompute.compute()
I wrote small test with two nodes and two edges:
- superstep 0:
- MasterCompute does nothing
- each vertex sends one message to all its neighbours
- superstep 1:
- MasterCompute reads GiraphStats.getInstance().getSentMessages().getValue() and broadcasts it through an IntOverwriteAggregator
- each vertex sets its value to that aggregator's current value and halts
After that, every vertex has value of 0. I believe this is wrong. If I perform actions from superstep 1 in superstep 2, I got correct vertex value of 2.
Explanation is simple: MasterCompute.compute() called in superstep has GiraphStats from superstep-2. That is because it is called too early – before GlobalStats from superstep-1 are gathered and put in GiraphStats.
My proposed solution is:
GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); /* ... */ updateCounters(globalStats); /* ... */ doMasterCompute(); /* ... */ if (masterCompute.isHalted() || (globalStats.getFinishedVertexCount() == globalStats.getVertexCount() && globalStats.getMessageCount() == 0)) { globalStats.setHaltComputation(true); } /* ... */
I am new to Giraph. Maybe this beahvior is intended. Maybe it is not trivial to change. But for me it is not working as expected.