Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1554784) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -352,6 +352,7 @@ IDSkippingIterator skippingIterator = vertices.skippingIterator(); while (skippingIterator.hasNext()) { Vertex vertex = skippingIterator.next(); + M lastValue = vertex.getValue(); vertex.compute(Collections.singleton(vertex.getValue())); getAggregationRunner().aggregateVertex(lastValue, vertex); @@ -456,9 +457,6 @@ vertex.addEdge(edge); } } else { - vertex.setRunner(this); - vertex.setup(conf); - if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); } @@ -469,8 +467,6 @@ } } // add last vertex. - vertex.setRunner(this); - vertex.setup(conf); if (selfReference) { vertex.addEdge(new Edge(vertex.getVertexID(), null)); } Index: graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (revision 1554784) +++ graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (working copy) @@ -17,10 +17,14 @@ */ package org.apache.hama.graph; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -36,52 +40,98 @@ */ public final class ListVerticesInfo, E extends Writable, M extends Writable> implements VerticesInfo { + private GraphJobRunner runner; + + private final Map verticesMap = new TreeMap(); - private final SortedSet> vertices = new TreeSet>(); - // We will use this variable to make vertex removals, so we don't invoke GC too many times. - private final Vertex vertexTemplate = GraphJobRunner. newVertexInstance(GraphJobRunner.VERTEX_CLASS); + private ByteArrayOutputStream bos = null; + private DataOutputStream dos = null; + private ByteArrayInputStream bis = null; + private DataInputStream dis = null; @Override - public void addVertex(Vertex vertex) { - if (!vertices.add(vertex)) { - throw new UnsupportedOperationException("Vertex with ID: " + vertex.getVertexID() + " already exists!"); + public void init(GraphJobRunner runner, Configuration conf, + TaskAttemptID attempt) throws IOException { + this.runner = runner; + } + + @Override + public void addVertex(Vertex vertex) throws IOException { + if (verticesMap.containsKey(vertex.getVertexID())) { + throw new UnsupportedOperationException("Vertex with ID: " + + vertex.getVertexID() + " already exists!"); + } else { + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + vertex.write(dos); + byte[] bytesVertex = bos.toByteArray(); + + verticesMap.put(vertex.getVertexID(), bytesVertex); } } @Override public void removeVertex(V vertexID) throws UnsupportedOperationException { - vertexTemplate.setVertexID(vertexID); - - if (!vertices.remove(vertexTemplate)) { - throw new UnsupportedOperationException("Vertex with ID: " + vertexID + " not found on this peer."); + if (verticesMap.containsKey(vertexID)) { + verticesMap.remove(vertexID); + } else { + throw new UnsupportedOperationException("Vertex with ID: " + vertexID + + " not found on this peer."); } } public void clear() { - vertices.clear(); + verticesMap.clear(); } @Override public int size() { - return this.vertices.size(); + return this.verticesMap.size(); } @Override public IDSkippingIterator skippingIterator() { return new IDSkippingIterator() { - Iterator> it = vertices.iterator(); - Vertex v; + Iterator it = verticesMap.keySet().iterator(); + Vertex v = GraphJobRunner + . newVertexInstance(GraphJobRunner.VERTEX_CLASS); @Override public boolean hasNext(V msgId, org.apache.hama.graph.IDSkippingIterator.Strategy strat) { if (it.hasNext()) { - v = it.next(); + V vertexKey = it.next(); + byte[] bytesVertex = verticesMap.get(vertexKey); + bis = new ByteArrayInputStream(bytesVertex); + dis = new DataInputStream(bis); + + try { + v = GraphJobRunner + . newVertexInstance(GraphJobRunner.VERTEX_CLASS); + v.readFields(dis); + v.setRunner(runner); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } while (!strat.accept(v, msgId)) { if (it.hasNext()) { - v = it.next(); + vertexKey = it.next(); + bytesVertex = verticesMap.get(vertexKey); + bis = new ByteArrayInputStream(bytesVertex); + dis = new DataInputStream(bis); + + try { + v = GraphJobRunner + . newVertexInstance(GraphJobRunner.VERTEX_CLASS); + v.readFields(dis); + v.setRunner(runner); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } } else { return false; } @@ -97,7 +147,8 @@ @Override public Vertex next() { if (v == null) { - throw new UnsupportedOperationException("You must invoke hasNext before ask for the next vertex."); + throw new UnsupportedOperationException( + "You must invoke hasNext before ask for the next vertex."); } Vertex tmp = v; @@ -109,8 +160,14 @@ } @Override - public void finishVertexComputation(Vertex vertex) { + public void finishVertexComputation(Vertex vertex) + throws IOException { + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + vertex.write(dos); + byte[] bytesVertex = bos.toByteArray(); + verticesMap.put(vertex.getVertexID(), bytesVertex); } @Override @@ -122,7 +179,7 @@ public void finishRemovals() { } - @Override + @Override public void finishSuperstep() { } @@ -137,11 +194,4 @@ public void startSuperstep() throws IOException { } - - @Override - public void init(GraphJobRunner runner, Configuration conf, - TaskAttemptID attempt) throws IOException { - - } - } Index: graph/src/main/java/org/apache/hama/graph/VerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (revision 1554784) +++ graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (working copy) @@ -92,5 +92,4 @@ public int size(); public IDSkippingIterator skippingIterator(); - }