Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1674799) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -59,7 +59,6 @@ conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS, OutgoingVertexMessageManager.class, OutgoingMessageManager.class); - this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true); this.setBspClass(GraphJobRunner.class); this.setJarByClass(exampleClass); this.setVertexIDClass(Text.class); Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1674799) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -27,8 +27,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -194,41 +193,45 @@ vertices.clear(); } - /** - * The master task is going to check the number of updated vertices and do - * master aggregation. In case of no aggregators defined, we save a sync by - * reading multiple typed messages. - */ - private void doAggregationUpdates( + @SuppressWarnings("unchecked") + private void setupFields( BSPPeer peer) - throws IOException, SyncException, InterruptedException { + throws IOException { + this.peer = peer; + this.conf = peer.getConfiguration(); + maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", + -1); - // this is only done in every second iteration - if (isMasterTask(peer)) { - MapWritable updatedCnt = new MapWritable(); - // send total number of vertices. - updatedCnt.put( - FLAG_VERTEX_TOTAL_VERTICES, - new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) - .getCounter()))); - // exit if there's no update made - if (globalUpdateCounts == 0) { - updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); - } else { - getAggregationRunner().doMasterAggregation(updatedCnt); + GraphJobRunner. initClasses(conf); + + partitioner = (Partitioner) org.apache.hadoop.util.ReflectionUtils + .newInstance( + conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), + conf); + + Class outputWriter = conf.getClass( + GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class); + vertexOutputWriter = (VertexOutputWriter) ReflectionUtils + .newInstance(outputWriter); + + setAggregationRunner(new AggregationRunner()); + getAggregationRunner().setupAggregators(peer); + + Class> verticesInfoClass = (Class>) conf + .getClass("hama.graph.vertices.info", MapVerticesInfo.class, + VerticesInfo.class); + vertices = ReflectionUtils.newInstance(verticesInfoClass); + vertices.init(this, conf, peer.getTaskId()); + + final String combinerName = conf.get(Constants.COMBINER_CLASS); + if (combinerName != null) { + try { + combiner = (Combiner) ReflectionUtils + .newInstance(combinerName); + } catch (ClassNotFoundException e) { + e.printStackTrace(); } - // send the updates from the master tasks back to the slaves - for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); - } } - - if (getAggregationRunner().isEnabled()) { - peer.sync(); - // now the map message must be read that might be send from the master - updated = getAggregationRunner().receiveAggregatedValues( - peer.getCurrentMessage().getMap(), iteration); - } } /** @@ -245,9 +248,7 @@ this.changedVertexCnt = 0; vertices.startSuperstep(); - ExecutorService executor = Executors.newFixedThreadPool((peer - .getNumCurrentMessages() / conf.getInt( - "hama.graph.threadpool.percentage", 20)) + 1); + ExecutorService executor = Executors.newCachedThreadPool(); long loopStartTime = System.currentTimeMillis(); while (currentMessage != null) { @@ -256,9 +257,8 @@ currentMessage = peer.getCurrentMessage(); } - LOG.info("Total time spent for superstep-" + peer.getSuperstepCount() - + " looping: " + (System.currentTimeMillis() - loopStartTime) - + " ms"); + LOG.info("Total time spent for superstep-" + peer.getSuperstepCount() + + " looping: " + (System.currentTimeMillis() - loopStartTime) + " ms"); executor.shutdown(); while (!executor.isTerminated()) { @@ -296,15 +296,13 @@ this.changedVertexCnt = 0; vertices.startSuperstep(); - ExecutorService executor = Executors - .newFixedThreadPool((vertices.size() / conf.getInt( - "hama.graph.threadpool.percentage", 20)) + 1); + ExecutorService executor = Executors.newCachedThreadPool(); for (Vertex v : vertices.getValues()) { Runnable worker = new ComputeRunnable(v); executor.execute(worker); } - + executor.shutdown(); while (!executor.isTerminated()) { } @@ -321,7 +319,8 @@ @SuppressWarnings("unchecked") public ComputeRunnable(GraphJobMessage msg) { this.vertex = vertices.get((V) msg.getVertexId()); - this.msgs = (Iterable) getIterableMessages(msg.getValuesBytes(), msg.getNumOfValues()); + this.msgs = (Iterable) getIterableMessages(msg.getValuesBytes(), + msg.getNumOfValues()); } public ComputeRunnable(Vertex v) { @@ -345,47 +344,43 @@ } } - @SuppressWarnings("unchecked") - private void setupFields( + /** + * The master task is going to check the number of updated vertices and do + * master aggregation. In case of no aggregators defined, we save a sync by + * reading multiple typed messages. + */ + private void doAggregationUpdates( BSPPeer peer) - throws IOException { - this.peer = peer; - this.conf = peer.getConfiguration(); - maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", - -1); + throws IOException, SyncException, InterruptedException { - GraphJobRunner. initClasses(conf); - - partitioner = (Partitioner) org.apache.hadoop.util.ReflectionUtils - .newInstance( - conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), - conf); - - Class outputWriter = conf.getClass( - GraphJob.VERTEX_OUTPUT_WRITER_CLASS_ATTR, VertexOutputWriter.class); - vertexOutputWriter = (VertexOutputWriter) ReflectionUtils - .newInstance(outputWriter); - - setAggregationRunner(new AggregationRunner()); - getAggregationRunner().setupAggregators(peer); - - Class> verticesInfoClass = (Class>) conf - .getClass("hama.graph.vertices.info", MapVerticesInfo.class, - VerticesInfo.class); - vertices = ReflectionUtils.newInstance(verticesInfoClass); - vertices.init(this, conf, peer.getTaskId()); - - final String combinerName = conf.get(Constants.COMBINER_CLASS); - if (combinerName != null) { - try { - combiner = (Combiner) ReflectionUtils - .newInstance(combinerName); - } catch (ClassNotFoundException e) { - e.printStackTrace(); + // this is only done in every second iteration + if (isMasterTask(peer)) { + MapWritable updatedCnt = new MapWritable(); + // send total number of vertices. + updatedCnt.put( + FLAG_VERTEX_TOTAL_VERTICES, + new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) + .getCounter()))); + // exit if there's no update made + if (globalUpdateCounts == 0) { + updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); + } else { + getAggregationRunner().doMasterAggregation(updatedCnt); } + // send the updates from the master tasks back to the slaves + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, new GraphJobMessage(updatedCnt)); + } } + + if (getAggregationRunner().isEnabled()) { + peer.sync(); + // now the map message must be read that might be send from the master + updated = getAggregationRunner().receiveAggregatedValues( + peer.getCurrentMessage().getMap(), iteration); + } } - + @SuppressWarnings("unchecked") public static , E extends Writable, M extends Writable> void initClasses( Configuration conf) { @@ -406,7 +401,7 @@ EDGE_VALUE_CLASS = edgeValueClass; } - Map messages = new HashMap(); + private Map messages = new HashMap(); /** * Loads vertices into memory of each peer. @@ -464,8 +459,7 @@ DataInputStream dis = new DataInputStream(bis); for (int i = 0; i < msg.getNumOfValues(); i++) { - Vertex vertex = GraphJobRunner - . newVertexInstance(VERTEX_CLASS); + Vertex vertex = newVertexInstance(VERTEX_CLASS); vertex.readFields(dis); Runnable worker = new LoadWorker(vertex); @@ -633,7 +627,7 @@ vertices.finishAdditions(); } - private final ConcurrentNavigableMap storage = new ConcurrentSkipListMap(); + private final ConcurrentHashMap storage = new ConcurrentHashMap(); public void sendMessage(V vertexID, byte[] msg) throws IOException { if (storage.containsKey(vertexID)) { @@ -646,24 +640,20 @@ public void finishSuperstep() throws IOException { vertices.finishSuperstep(); - for (Map.Entry m : storage.entrySet()) { - // Combining messages - if (combiner != null) { - if (m.getValue().getNumOfValues() > 1) { - peer.send( - getHostName(m.getKey()), - new GraphJobMessage(m.getKey(), serialize(combiner - .combine(getIterableMessages(m.getValue().getValuesBytes(), m - .getValue().getNumOfValues()))))); - } else { - peer.send(getHostName(m.getKey()), m.getValue()); - } + Iterator> it = storage.entrySet().iterator(); + while (it.hasNext()) { + Entry e = it.next(); + if (combiner != null && e.getValue().getNumOfValues() > 1) { + peer.send( + getHostName(e.getKey()), + new GraphJobMessage(e.getKey(), serialize(combiner + .combine(getIterableMessages(e.getValue().getValuesBytes(), e + .getValue().getNumOfValues()))))); } else { - peer.send(getHostName(m.getKey()), m.getValue()); + peer.send(getHostName(e.getKey()), e.getValue()); } + it.remove(); } - - storage.clear(); } public static byte[] serialize(Writable writable) throws IOException { Index: graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (revision 1674799) +++ graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (working copy) @@ -70,7 +70,8 @@ public void add(GraphJobMessage item) { if (item.isVertexMessage()) { if (storage.containsKey(item.getVertexId())) { - storage.get(item.getVertexId()).addValuesBytes(item.getValuesBytes(), item.size()); + storage.get(item.getVertexId()).addValuesBytes(item.getValuesBytes(), + item.size()); } else { storage.put(item.getVertexId(), item); } @@ -86,18 +87,20 @@ } Iterator it; - + @Override public GraphJobMessage poll() { if (mapMessages.size() > 0) { return mapMessages.poll(); } else { - if(it == null) { + if (it == null) { it = storage.values().iterator(); } - - if(it.hasNext()) { - return it.next(); + + if (it.hasNext()) { + GraphJobMessage m = it.next(); + it.remove(); + return m; } else { storage.clear(); return null;