Details
-
Bug
-
Status: Closed
-
Minor
-
Resolution: Fixed
-
3.1.1-incubating
-
None
-
Mac OSX
Description
When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.
e.g. if you send message A on out edges, and message B on in edges, then A and B will instead be sent over both in and out edges.
The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.
An example of this behaviour is below:
public class TinkerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { TinkerGraph graph = TinkerGraph.open(); Vertex a = graph.addVertex("a"); Vertex b = graph.addVertex("b"); Vertex c = graph.addVertex("c"); a.addEdge("edge", b); b.addEdge("edge", c); // Simple graph: // a -> b -> c // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b" // then each vertex sums any received messages ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get(); // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3} System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next()); } } class MyVertexProgram implements VertexProgram<Long> { private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE); private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE); private static final String MEMORY_KEY = "count"; private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY); @Override public void setup(final Memory memory) {} @Override public GraphComputer.Persist getPreferredPersist() { return GraphComputer.Persist.VERTEX_PROPERTIES; } @Override public Set<String> getElementComputeKeys() { return COMPUTE_KEYS; } @Override public Set<MessageScope> getMessageScopes(final Memory memory) { return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut); } @Override public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) { switch (memory.getIteration()) { case 0: if (vertex.label().equals("b")) { messenger.sendMessage(this.countMessageScopeIn, 2L); messenger.sendMessage(this.countMessageScopeOut, 1L); } break; case 1: long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b); vertex.property(MEMORY_KEY, edgeCount); break; } } @Override public boolean terminate(final Memory memory) { return memory.getIteration() == 1; } @Override public GraphComputer.ResultGraph getPreferredResultGraph() { return GraphComputer.ResultGraph.NEW; } @Override public MyVertexProgram clone() { try { return (MyVertexProgram) super.clone(); } catch (final CloneNotSupportedException e) { throw new RuntimeException(e); } } }
Attachments
Issue Links
- links to