Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1671719) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -18,9 +18,11 @@ package org.apache.hama.graph; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import java.util.Set; @@ -33,6 +35,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hama.Constants; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; @@ -229,10 +232,13 @@ notComputedVertices = new HashSet(); notComputedVertices.addAll(vertices.keySet()); - Vertex vertex = null; + // Vertex vertex = null; + List runners = new ArrayList(); + while (currentMessage != null) { - vertex = vertices.get((V) currentMessage.getVertexId()); + GraphJobMessage msg = WritableUtils.clone(currentMessage, conf); + Vertex vertex = vertices.get((V) msg.getVertexId()); // reactivation if (vertex.isHalted()) { @@ -239,26 +245,89 @@ vertex.setActive(); } - if (!vertex.isHalted()) { - vertex.compute((Iterable) currentMessage.getIterableMessages()); - vertices.finishVertexComputation(vertex); - activeVertices++; + if (runners.size() > 100) { + runners.add(new Computer(vertex, (Iterable) msg + .getIterableMessages())); - notComputedVertices.remove(vertex.getVertexID()); + // execute + for (Thread computer : runners) { + computer.start(); + } + + for (Thread computer : runners) { + try { + computer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + runners = new ArrayList(); + } else { + runners.add(new Computer(vertex, (Iterable) msg + .getIterableMessages())); } + activeVertices++; + notComputedVertices.remove(vertex.getVertexID()); + currentMessage = peer.getCurrentMessage(); } + if (runners.size() > 0) { + for (Thread computer : runners) { + computer.start(); + try { + computer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + runners = new ArrayList(); for (V v : notComputedVertices) { - vertex = vertices.get(v); - if (!vertex.isHalted()) { - vertex.compute(Collections. emptyList()); - vertices.finishVertexComputation(vertex); + Vertex notComputedVertex = vertices.get(v); + if (!notComputedVertex.isHalted()) { + + if (runners.size() > 100) { + // add current vertex + runners.add(new Computer(notComputedVertex, Collections + . emptyList())); + // execute + for (Thread computer : runners) { + computer.start(); + } + + for (Thread computer : runners) { + try { + computer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + runners = new ArrayList(); + } else { + runners.add(new Computer(notComputedVertex, Collections + . emptyList())); + } + activeVertices++; } } + if (runners.size() > 0) { + for (Thread computer : runners) { + computer.start(); + try { + computer.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + vertices.finishSuperstep(); getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); @@ -265,6 +334,26 @@ this.iteration++; } + class Computer extends Thread { + Vertex vertex; + Iterable msgs; + + public Computer(Vertex vertex, Iterable msgs) { + this.vertex = vertex; + this.msgs = msgs; + } + + @Override + public void run() { + try { + vertex.compute(msgs); + vertices.finishVertexComputation(vertex); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + /** * Seed the vertices first with their own values in compute. This is the first * superstep after the vertices have been loaded.