Uploaded image for project: 'Giraph'
  1. Giraph
  2. GIRAPH-114

Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor



    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 0.1.0
    • Fix Version/s: 0.1.0
    • Component/s: None
    • Labels:


      I'm currently implementing a simple algorithm to identify all the connected components of a graph. The algorithm ran well in a local IDE unit tests on toy data and in a local single node hadoop instance using a graph of ~100k edges.

      When I tested it on a real cluster with the wikipedia pagelink graph (5.7M vertices, 130M edges), I ran into strange exceptions like this:

      2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
      	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
      	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
      	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
      	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:396)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
      	at org.apache.hadoop.mapred.Child.main(Child.java:253)
      Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
      	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
      	at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
      	at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
      	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
      	... 7 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
      	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
      	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
      	... 10 more
      Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
      	at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)

      The exception is thrown because a vertex with no message to send to is found in the datastructure holding the outgoing messages.

      I tracked this behavior down:

      In BasicRPCCommunications:541-546 the map holding the outgoing messages for vertices of a particular machine is created. It's stored in two places BasicRPCCommunications.outMessages and as member variable outMessagesPerPeer of its PeerConnection :

      outMsgMap = new HashMap<I, MsgList<M>>();
      outMessages.put(addrUnresolved, outMsgMap);
      PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);

      In case that there are a lot of messages available for a particular vertex, a large flush is trigged via LargeMessageFlushExecutor (I guess this only happened in the wikipedia test). During this flush the list of messages for the vertex is sent out and replaced with an empty list in BasicRPCCommunications:341

      outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
      peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());

      Now in the last flush that is trigggered at the end of the superstep we encounter an empty message list for the vertex and therefore the exception is thrown in BasicRPCCommunications:228-247

      for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) {
        if (entry.getValue().isEmpty()) {
          throw new IllegalStateException(...);

      Simply removing the list for the vertex when executing the large flush solved the issue (patch to come).

      I'd like to note that it is generally very dangerous to let different classes have access to a datastructure directly and it produces subtle bugs like this. It would be better to think of a centralized way of handling the datastructure.


        1. GIRAPH-114.patch
          0.7 kB
          Sebastian Schelter



            • Assignee:
              ssc Sebastian Schelter
              ssc Sebastian Schelter
            • Votes:
              0 Vote for this issue
              0 Start watching this issue


              • Created: