Index: examples/src/main/java/org/apache/hama/examples/DynamicGraph.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (revision 0) +++ examples/src/main/java/org/apache/hama/examples/DynamicGraph.java (working copy) @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.IOException; +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextOutputFormat; +import org.apache.hama.graph.Edge; +import org.apache.hama.graph.GraphJob; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexInputReader; +import org.apache.hama.graph.GraphJobRunner.GraphJobCounter; + +/** + * This is an example of how to manipulate Graphs dynamically. + * The input of this example is a number in each row. We assume + * that the is a vertex with ID:1 which is responsible to create + * a sum vertex that will aggregate the values of the other + * vertices. During the aggregation, sum vertex will delete all + * other vertices. + * + * Input example: + * 1 + * 2 + * 3 + * 4 + * + * Output example: + * sum 10 + */ +public class DynamicGraph { + + public static class GraphTextReader extends + VertexInputReader { + + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) throws Exception { + + vertex.setVertexID(value); + vertex.setValue(new IntWritable(Integer.parseInt(value.toString()))); + + return true; + } + } + + public static class GraphVertex extends + Vertex { + + private void createSumVertex() throws IOException { + if (this.getVertexID().toString().equals("1")) { + Text new_id = new Text("sum"); + this.addVertex(new_id, new ArrayList>(), new IntWritable(0)); + } + } + + private void sendAllValuesToSumAndRemove() throws IOException { + if (!this.getVertexID().toString().equals("sum")) { + this.sendMessage(new Text("sum"), this.getValue()); + this.remove(); + } + } + + // this must run only on "sum" vertex + private void calculateSum(Iterable msgs) throws IOException { + if (this.getVertexID().toString().equals("sum")) { + int s = 0; + for (IntWritable i : msgs) { + s += i.get(); + } + s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter(); + this.setValue(new IntWritable(this.getValue().get() +s)); + } else { + throw new UnsupportedOperationException("We have more vertecies than we expected: " + this.getVertexID() + " " + this.getValue()); + } + } + + @Override + public void compute(Iterable msgs) throws IOException { + if (this.getSuperstepCount() == 0) { + createSumVertex(); + } else if (this.getSuperstepCount() == 1) { + sendAllValuesToSumAndRemove(); + } else if (this.getSuperstepCount() == 2) { + calculateSum(msgs); + } else if (this.getSuperstepCount() == 3) { + this.voteToHalt(); + } + } + } + + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + if (args.length != 2) { + printUsage(); + } + HamaConfiguration conf = new HamaConfiguration(new Configuration()); + GraphJob graphJob = createJob(args, conf); + long startTime = System.currentTimeMillis(); + if (graphJob.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + } + } + + private static void printUsage() { + System.out.println("Usage: "); + System.exit(-1); + } + + private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException { + GraphJob graphJob = new GraphJob(conf, DynamicGraph.class); + graphJob.setJobName("Dynamic Graph"); + graphJob.setVertexClass(GraphVertex.class); + + graphJob.setInputPath(new Path(args[0])); + graphJob.setOutputPath(new Path(args[1])); + + graphJob.setVertexIDClass(Text.class); + graphJob.setVertexValueClass(IntWritable.class); + graphJob.setEdgeValueClass(NullWritable.class); + + graphJob.setInputFormat(TextInputFormat.class); + + graphJob.setVertexInputReaderClass(GraphTextReader.class); + graphJob.setPartitioner(HashPartitioner.class); + + graphJob.setOutputFormat(TextOutputFormat.class); + graphJob.setOutputKeyClass(Text.class); + graphJob.setOutputValueClass(IntWritable.class); + + return graphJob; + } + +} Index: examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java (revision 0) +++ examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java (working copy) @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hama.HamaConfiguration; +import org.junit.Test; + +/** + * Testcase for {@link org.apache.hama.examples.DynamicGraph} + */ +public class DynamicGraphTest extends TestCase { + private static String OUTPUT = "/tmp/page-out"; + private Configuration conf = new HamaConfiguration(); + private FileSystem fs; + + private void deleteTempDirs() { + try { + if (fs.exists(new Path(OUTPUT))) + fs.delete(new Path(OUTPUT), true); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void verifyResult() throws IOException { + FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*")); + for (FileStatus fts : globStatus) { + BufferedReader reader = new BufferedReader(new InputStreamReader( + fs.open(fts.getPath()))); + String line = null; + while ((line = reader.readLine()) != null) { + String[] split = line.split("\t"); + assertTrue(split[0].equals("sum")); + assertTrue(split[1].equals("11")); + System.out.println(split[0] + " : " + split[1]); + } + } + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + fs = FileSystem.get(conf); + } + + @Test + public void test() throws IOException, InterruptedException, ClassNotFoundException { + try { + DynamicGraph.main(new String[] {"src/test/resources/dg.txt", OUTPUT }); + verifyResult(); + } finally { + deleteTempDirs(); + } + } + +} Index: examples/src/test/resources/dg.txt =================================================================== --- examples/src/test/resources/dg.txt (revision 0) +++ examples/src/test/resources/dg.txt (working copy) @@ -0,0 +1,4 @@ +1 +2 +3 +4 \ No newline at end of file Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -90,14 +91,18 @@ /** * Runs the aggregators by sending their values to the master task. + * @param changedVertexCnt */ public void sendAggregatorValues( BSPPeer peer, - int activeVertices) throws IOException { + int activeVertices, int changedVertexCnt) throws IOException { // send msgCounts to the master task MapWritable updatedCnt = new MapWritable(); updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable( activeVertices)); + // send total number of vertices changes + updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable( + changedVertexCnt)); // also send aggregated values to the master if (aggregators != null) { for (int i = 0; i < this.aggregators.length; i++) { Index: graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (working copy) @@ -120,6 +120,11 @@ size++; } + @Override + public void removeVertex(V vertexID) { + throw new UnsupportedOperationException ("Not yet implemented"); + } + /** * Serializes the vertex's soft parts to its file. If the vertex does not have * an index yet (e.G. at startup) you can provide -1 and it will be added to @@ -169,6 +174,11 @@ lockedAdditions = true; } + @Override + public void finishRemovals() { + throw new UnsupportedOperationException ("Not yet implemented"); + } + private static long[] copy(ArrayList lst) { long[] arr = new long[lst.size()]; for (int i = 0; i < arr.length; i++) { Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -63,7 +64,15 @@ public static final String S_FLAG_MESSAGE_COUNTS = "hama.0"; public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1"; public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2"; + public static final String S_FLAG_VERTEX_INCREASE = "hama.3"; + public static final String S_FLAG_VERTEX_DECREASE = "hama.4"; + public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5"; + public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6"; public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS); + public static final Text FLAG_VERTEX_INCREASE = new Text(S_FLAG_VERTEX_INCREASE); + public static final Text FLAG_VERTEX_DECREASE = new Text(S_FLAG_VERTEX_DECREASE); + public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text(S_FLAG_VERTEX_ALTER_COUNTER); + public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(S_FLAG_VERTEX_TOTAL_VERTICES); public static final String MESSAGE_COMBINER_CLASS_KEY = "hama.vertex.message.combiner.class"; public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class"; @@ -81,6 +90,7 @@ private VerticesInfo vertices; private boolean updated = true; private int globalUpdateCounts = 0; + private int changedVertexCnt = 0; private long numberVertices = 0; // -1 is deactivated @@ -165,9 +175,20 @@ BSPPeer peer) throws IOException, SyncException, InterruptedException { + if (isMasterTask(peer) && iteration == 1) { + MapWritable updatedCnt = new MapWritable(); + updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()))); + // send the updates from the master tasks back to the slaves + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, new GraphJobMessage(updatedCnt)); + } + } + // this is only done in every second iteration if (isMasterTask(peer) && iteration > 1) { MapWritable updatedCnt = new MapWritable(); + // send total number of vertices. + updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()))); // exit if there's no update made if (globalUpdateCounts == 0) { updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE)); @@ -210,6 +231,7 @@ BSPPeer peer) throws IOException { int activeVertices = 0; + this.changedVertexCnt = 0; vertices.startSuperstep(); /* * We iterate over our messages and vertices in sorted order. That means @@ -255,7 +277,7 @@ } vertices.finishSuperstep(); - aggregationRunner.sendAggregatorValues(peer, activeVertices); + aggregationRunner.sendAggregatorValues(peer, activeVertices, this.changedVertexCnt); iteration++; } @@ -310,6 +332,7 @@ BSPPeer peer) throws IOException { vertices.startSuperstep(); + this.changedVertexCnt = 0; IDSkippingIterator skippingIterator = vertices.skippingIterator(); while (skippingIterator.hasNext()) { Vertex vertex = skippingIterator.next(); @@ -319,7 +342,7 @@ vertices.finishVertexComputation(vertex); } vertices.finishSuperstep(); - aggregationRunner.sendAggregatorValues(peer, 1); + aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt); iteration++; } @@ -426,6 +449,51 @@ } /** + * Add new vertex into memory of each peer. + * @throws IOException + */ + private void addVertex(Vertex vertex) throws IOException { + vertex.runner = this; + vertex.setup(conf); + + if (conf.getBoolean("hama.graph.self.ref", false)) { + vertex.addEdge(new Edge(vertex.getVertexID(), null)); + } + + LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer " + peer.getPeerName()); + vertices.addVertex(vertex); + } + + /** + * Remove vertex from this peer. + * @throws IOException + */ + private void removeVertex(V vertexID) { + vertices.removeVertex(vertexID); + LOG.debug("Removed VertexID: " + vertexID + " in peer " + peer.getPeerName()); + } + + /** + * After all inserts are done, we must finalize the VertexInfo data structure. + * @throws IOException + */ + private void finishAdditions() throws IOException { + vertices.finishAdditions(); + // finish the "superstep" because we have written a new file here + vertices.finishSuperstep(); + } + + /** + * After all inserts are done, we must finalize the VertexInfo data structure. + * @throws IOException + */ + private void finishRemovals() throws IOException { + vertices.finishRemovals(); + // finish the "superstep" because we have written a new file here + vertices.finishSuperstep(); + } + + /** * Counts vertices globally by sending the count of vertices in the map to the * other peers. */ @@ -461,6 +529,9 @@ BSPPeer peer) throws IOException, SyncException, InterruptedException { GraphJobMessage msg = null; + boolean dynamicAdditions = false; + boolean dynamicRemovals = false; + while ((msg = peer.getCurrentMessage()) != null) { // either this is a vertex message or a directive that must be read // as map @@ -486,6 +557,20 @@ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { aggregationRunner.masterReadAggregatedIncrementalValue(vertexID, (M) e.getValue()); + } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) { + dynamicAdditions = true; + addVertex((Vertex) e.getValue()); + } else if (FLAG_VERTEX_DECREASE.equals(vertexID)) { + dynamicRemovals = true; + removeVertex((V) e.getValue()); + } else if (FLAG_VERTEX_TOTAL_VERTICES.equals(vertexID)) { + this.numberVertices = ((LongWritable) e.getValue()).get(); + } else if (FLAG_VERTEX_ALTER_COUNTER.equals(vertexID)) { + if (isMasterTask(peer)) { + peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable) e.getValue()).get()); + } else { + throw new UnsupportedOperationException("A message to increase vertex count is in a wrong place: " + peer); + } } } @@ -494,6 +579,15 @@ } } + + // If we applied any changes to vertices, we need to call finishAdditions and finishRemovals in the end. + if (dynamicAdditions) { + finishAdditions(); + } + if (dynamicRemovals) { + finishRemovals(); + } + return msg; } @@ -605,4 +699,12 @@ return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS); } + public int getChangedVertexCnt() { + return changedVertexCnt; + } + + public void setChangedVertexCnt(int changedVertexCnt) { + this.changedVertexCnt = changedVertexCnt; + } + } Index: graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (working copy) @@ -18,9 +18,9 @@ package org.apache.hama.graph; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -37,14 +37,26 @@ public final class ListVerticesInfo, E extends Writable, M extends Writable> implements VerticesInfo { - private final List> vertices = new ArrayList>( - 100); + private final SortedSet> vertices = new TreeSet>(); + // We will use this variable to make vertex removals, so we don't invoke GC too many times. + private final Vertex vertexTemplate = GraphJobRunner. newVertexInstance(GraphJobRunner.VERTEX_CLASS); @Override public void addVertex(Vertex vertex) { - vertices.add(vertex); + if (!vertices.add(vertex)) { + throw new UnsupportedOperationException("Vertex with ID: " + vertex.getVertexID() + " already exists!"); + } } + @Override + public void removeVertex(V vertexID) throws UnsupportedOperationException { + vertexTemplate.setVertexID(vertexID); + + if (!vertices.remove(vertexTemplate)) { + throw new UnsupportedOperationException("Vertex with ID: " + vertexID + " not found on this peer."); + } + } + public void clear() { vertices.clear(); } @@ -57,26 +69,40 @@ @Override public IDSkippingIterator skippingIterator() { return new IDSkippingIterator() { - int currentIndex = 0; + Iterator> it = vertices.iterator(); + Vertex v; @Override - public boolean hasNext(V e, + public boolean hasNext(V msgId, org.apache.hama.graph.IDSkippingIterator.Strategy strat) { - if (currentIndex < vertices.size()) { - while (!strat.accept(vertices.get(currentIndex), e)) { - currentIndex++; + if (it.hasNext()) { + v = it.next(); + + while (!strat.accept(v, msgId)) { + if (it.hasNext()) { + v = it.next(); + } else { + return false; + } } return true; } else { + v = null; return false; } } @Override public Vertex next() { - return vertices.get(currentIndex++); + if (v == null) { + throw new UnsupportedOperationException("You must invoke hasNext before ask for the next vertex."); + } + + Vertex tmp = v; + v = null; + return tmp; } }; @@ -89,10 +115,14 @@ @Override public void finishAdditions() { - Collections.sort(vertices); + } @Override + public void finishRemovals() { + } + + @Override public boolean isFinishedAdditions() { return false; } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hama.bsp.BSPPeer; @@ -112,7 +114,43 @@ new GraphJobMessage(destinationVertexID, msg)); } + private void alterVertexCounter(int i) throws IOException { + this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i); + } + @Override + public void addVertex(V vertexID, List> edges, M value) throws IOException { + MapWritable msg = new MapWritable(); + // Create the new vertex. + Vertex vertex = GraphJobRunner. newVertexInstance(GraphJobRunner.VERTEX_CLASS); + vertex.setEdges(edges); + vertex.setValue(value); + vertex.setVertexID(vertexID); + + msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex); + // Find the proper partition to host the new vertex. + int partition = getPartitioner().getPartition(vertexID, value, + runner.getPeer().getNumPeers()); + String destPeer = runner.getPeer().getAllPeerNames()[partition]; + + runner.getPeer().send(destPeer, new GraphJobMessage(msg)); + + alterVertexCounter(1); + } + + @Override + public void remove() throws IOException { + MapWritable msg = new MapWritable(); + msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID); + + // Get master task peer. + String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); + runner.getPeer().send(destPeer, new GraphJobMessage(msg)); + + alterVertexCounter(-1); + } + + @Override public long getSuperstepCount() { return runner.getNumberIterations(); } Index: graph/src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -78,6 +78,16 @@ public void sendMessage(V destinationVertexID, M msg) throws IOException; /** + * Sends a message to add a new vertex through the partitioner to the appropriate BSP peer + */ + public void addVertex(V vertexID, List> edges, M value) throws IOException; + + /** + * Removes current Vertex from local peer. + */ + public void remove() throws IOException; + + /** * @return the superstep number of the current superstep (starting from 0). */ public long getSuperstepCount(); Index: graph/src/main/java/org/apache/hama/graph/VerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (revision 1507969) +++ graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (working copy) @@ -52,12 +52,23 @@ public void addVertex(Vertex vertex) throws IOException; /** + * Remove a vertex to the underlying structure. + */ + public void removeVertex(V vertexID) throws UnsupportedOperationException; + + /** * Finish the additions, from this point on the implementations should close * the adds and throw exceptions in case something is added after this call. */ public void finishAdditions(); /** + * Finish the removals, from this point on the implementations should close + * the removes and throw exceptions in case something is removed after this call. + */ + public void finishRemovals(); + + /** * Called once a superstep starts. */ public void startSuperstep() throws IOException;