Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1412032) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -72,7 +71,7 @@ private Combiner combiner; private Partitioner partitioner; - private Map> vertices = new HashMap>(); + private List> vertices = new ArrayList>(); private boolean updated = true; private int globalUpdateCounts = 0; @@ -144,8 +143,8 @@ public final void cleanup( BSPPeer peer) throws IOException { - for (Entry> e : vertices.entrySet()) { - peer.write(e.getValue().getVertexID(), e.getValue().getValue()); + for (Vertex e : vertices) { + peer.write(e.getVertexID(), e.getValue()); } } @@ -180,7 +179,7 @@ BSPPeer peer) throws IOException { int activeVertices = 0; - for (Vertex vertex : vertices.values()) { + for (Vertex vertex : vertices) { List msgs = messages.get(vertex.getVertexID()); // If there are newly received messages, restart. if (vertex.isHalted() && msgs != null) { @@ -216,7 +215,7 @@ private void doInitialSuperstep( BSPPeer peer) throws IOException { - for (Vertex vertex : vertices.values()) { + for (Vertex vertex : vertices) { List singletonList = Collections.singletonList(vertex.getValue()); M lastValue = vertex.getValue(); vertex.compute(singletonList.iterator()); @@ -341,7 +340,7 @@ peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex)); } else { vertex.setup(conf); - vertices.put(vertex.getVertexID(), vertex); + vertices.add(vertex); } vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; @@ -355,7 +354,7 @@ Vertex messagedVertex = (Vertex) msg.getVertex(); messagedVertex.runner = this; messagedVertex.setup(conf); - vertices.put(messagedVertex.getVertexID(), messagedVertex); + vertices.add(messagedVertex); } startPos = peer.getPos(); } @@ -370,7 +369,7 @@ Vertex messagedVertex = (Vertex) msg.getVertex(); messagedVertex.runner = this; messagedVertex.setup(conf); - vertices.put(messagedVertex.getVertexID(), messagedVertex); + vertices.add(messagedVertex); } } LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); @@ -384,89 +383,77 @@ */ if (repairNeeded) { LOG.debug("Starting repair of this graph!"); + repair(peer, partitioningSteps, selfReference); + } - int multiSteps = 0; - MapWritable ssize = new MapWritable(); - ssize.put(new IntWritable(peer.getPeerIndex()), - new IntWritable(vertices.size())); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); + LOG.debug("Starting Vertex processing!"); + } - if (isMasterTask(peer)) { - int minVerticesSize = Integer.MAX_VALUE; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - int curr = ((IntWritable) e.getValue()).get(); - if (minVerticesSize > curr) { - minVerticesSize = curr; - } - } - } + @SuppressWarnings("unchecked") + private void repair( + BSPPeer peer2, + int partitioningSteps, boolean selfReference) throws IOException, + SyncException, InterruptedException { - if (minVerticesSize < (partitioningSteps * 2)) { - multiSteps = minVerticesSize; - } else { - multiSteps = (partitioningSteps * 2); - } + int multiSteps = 0; + MapWritable ssize = new MapWritable(); + ssize.put(new IntWritable(peer.getPeerIndex()), + new IntWritable(vertices.size())); + peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); + ssize = null; + peer.sync(); - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("steps"), new IntWritable(multiSteps)); - peer.send(peerName, new GraphJobMessage(temp)); + if (isMasterTask(peer)) { + int minVerticesSize = Integer.MAX_VALUE; + GraphJobMessage received = null; + while ((received = peer.getCurrentMessage()) != null) { + MapWritable x = received.getMap(); + for (Entry e : x.entrySet()) { + int curr = ((IntWritable) e.getValue()).get(); + if (minVerticesSize > curr) { + minVerticesSize = curr; + } } } - peer.sync(); - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); + if (minVerticesSize < (partitioningSteps * 2)) { + multiSteps = minVerticesSize; + } else { + multiSteps = (partitioningSteps * 2); } - Set keys = vertices.keySet(); - Map> tmp = new HashMap>(); + for (String peerName : peer.getAllPeerNames()) { + MapWritable temp = new MapWritable(); + temp.put(new Text("steps"), new IntWritable(multiSteps)); + peer.send(peerName, new GraphJobMessage(temp)); + } + } + peer.sync(); - int i = 0; - int syncs = 0; - for (V v : keys) { - Vertex vertex2 = vertices.get(v); - for (Edge e : vertices.get(v).getEdges()) { - peer.send(vertex2.getDestinationPeerName(e), - new GraphJobMessage(e.getDestinationVertexID())); - } + GraphJobMessage received = peer.getCurrentMessage(); + MapWritable x = received.getMap(); + for (Entry e : x.entrySet()) { + multiSteps = ((IntWritable) e.getValue()).get(); + } - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) { - peer.sync(); - syncs++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - V vertexName = (V) msg.getVertexId(); - if (!vertices.containsKey(vertexName)) { - Vertex newVertex = newVertexInstance(vertexClass, conf); - newVertex.setVertexID(vertexName); - newVertex.runner = this; - if (selfReference) { - newVertex.setEdges(Collections.singletonList(new Edge( - newVertex.getVertexID(), null))); - } else { - newVertex.setEdges(new ArrayList>(0)); - } - newVertex.setup(conf); - tmp.put(vertexName, newVertex); - } - } - } - i++; + Map> tmp = new HashMap>(); + + int i = 0; + int syncs = 0; + + for (Vertex v : vertices) { + for (Edge e : v.getEdges()) { + peer.send(v.getDestinationPeerName(e), + new GraphJobMessage(e.getDestinationVertexID())); } - peer.sync(); - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - V vertexName = (V) msg.getVertexId(); - if (!vertices.containsKey(vertexName)) { + if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) { + peer.sync(); + syncs++; + GraphJobMessage msg = null; + while ((msg = peer.getCurrentMessage()) != null) { + V vertexName = (V) msg.getVertexId(); + Vertex newVertex = newVertexInstance(vertexClass, conf); newVertex.setVertexID(vertexName); newVertex.runner = this; @@ -477,18 +464,41 @@ newVertex.setEdges(new ArrayList>(0)); } newVertex.setup(conf); - vertices.put(vertexName, newVertex); - newVertex = null; + tmp.put(vertexName, newVertex); + } } + i++; + } + + peer.sync(); + GraphJobMessage msg = null; + while ((msg = peer.getCurrentMessage()) != null) { + V vertexName = (V) msg.getVertexId(); - for (Map.Entry> e : tmp.entrySet()) { - vertices.put(e.getKey(), e.getValue()); + Vertex newVertex = newVertexInstance(vertexClass, conf); + newVertex.setVertexID(vertexName); + newVertex.runner = this; + if (selfReference) { + newVertex.setEdges(Collections.singletonList(new Edge(newVertex + .getVertexID(), null))); + } else { + newVertex.setEdges(new ArrayList>(0)); } - tmp.clear(); + newVertex.setup(conf); + tmp.put(vertexName, newVertex); + newVertex = null; + } - LOG.debug("Starting Vertex processing!"); + for (Vertex e : vertices) { + if (tmp.containsKey((e.getVertexID()))) { + tmp.remove(e.getVertexID()); + } + } + + vertices.addAll(tmp.values()); + tmp.clear(); } /**