Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1338807) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -19,6 +19,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -57,7 +59,8 @@ private static final Text FLAG_AGGREGATOR_INCREMENT = new Text( S_FLAG_AGGREGATOR_INCREMENT); - private static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class"; + public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class"; + public static final String GRAPH_REPAIR = "hama.graph.repair"; private Configuration conf; private Combiner combiner; @@ -81,13 +84,14 @@ private int maxIteration = -1; private long iteration; - // TODO check if our graph is not broken and repair public void setup(BSPPeer peer) throws IOException, SyncException, InterruptedException { this.conf = peer.getConfiguration(); // Choose one as a master to collect global updates this.masterTask = peer.getPeerName(0); + boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false); + if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals( Combiner.class)) { LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS)); @@ -110,7 +114,7 @@ } } - loadVertices(peer); + loadVertices(peer, repairNeeded); numberVertices = vertices.size() * peer.getNumPeers(); // TODO refactor this to a single step for (Map.Entry e : vertices.entrySet()) { @@ -273,7 +277,8 @@ return msgMap; } - private void loadVertices(BSPPeer peer) throws IOException { + private void loadVertices(BSPPeer peer, boolean repairNeeded) + throws IOException { LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class")); boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); KeyValuePair next = null; @@ -304,6 +309,44 @@ vertex.setup(conf); vertices.put(next.getKey().getName(), vertex); } + + /* + * If the user want to repair the graph, it should traverse through that + * local chunk of adjancency list and message the corresponding peer to + * check whether that vertex exists. In real-life this may be dead-ending + * vertices, since we have no information about outgoing edges. Mainly this + * procedure is to prevent NullPointerExceptions from happening. + */ + if (repairNeeded) { + final Collection entries = vertices.values(); + for (Vertex entry : entries) { + List outEdges = entry.getOutEdges(); + for (Edge e : outEdges) { + peer.send(e.getDestVertexID(), new Text(e.getName())); + } + } + try { + peer.sync(); + } catch (SyncException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + Text vertexName = null; + while ((vertexName = (Text) peer.getCurrentMessage()) != null) { + String vName = vertexName.toString(); + if (!vertices.containsKey(vName)) { + Vertex vertex = (Vertex) ReflectionUtils + .newInstance( + conf.getClass("hama.graph.vertex.class", Vertex.class), conf); + vertex.setVertexID(vName); + vertex.setup(conf); + vertex.edges = Collections.emptyList(); + vertices.put(vName, vertex); + } + } + } + } /**