Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1437242) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -25,12 +25,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.AverageAggregator; @@ -44,34 +45,6 @@ */ public class PageRank { - public static class PagerankTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - public static class PageRankVertex extends Vertex { @@ -153,24 +126,44 @@ } - private static void printUsage() { - System.out.println("Usage: [tasks]"); - System.exit(-1); + public static class DanglingNodeAggregator + extends + AbstractAggregator> { + + double danglingNodeSum; + + @Override + public void aggregate(Vertex vertex, + DoubleWritable value) { + if (vertex != null) { + if (vertex.getEdges().size() == 0) { + danglingNodeSum += value.get(); + } + } else { + danglingNodeSum += value.get(); + } + } + + @Override + public DoubleWritable getValue() { + return new DoubleWritable(danglingNodeSum); + } + } - public static void main(String[] args) throws IOException, - InterruptedException, ClassNotFoundException { - if (args.length < 2) - printUsage(); + public static class PagerankTextReader + extends + VertexInputReader { + @Override + public boolean parseVertex(Text key, TextArrayWritable value, + Vertex vertex) throws Exception { + vertex.setVertexID(key); - HamaConfiguration conf = new HamaConfiguration(new Configuration()); - GraphJob pageJob = createJob(args, conf); - pageJob.setVertexInputReaderClass(PagerankTextReader.class); + for (Writable v : value.get()) { + vertex.addEdge(new Edge((Text) v, null)); + } - long startTime = System.currentTimeMillis(); - if (pageJob.waitForCompletion(true)) { - System.out.println("Job Finished in " - + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); + return true; } } @@ -179,7 +172,7 @@ throws IOException { GraphJob pageJob = new GraphJob(conf, PageRank.class); pageJob.setJobName("Pagerank"); - + pageJob.setVertexClass(PageRankVertex.class); pageJob.setInputPath(new Path(args[0])); pageJob.setOutputPath(new Path(args[1])); @@ -197,6 +190,9 @@ pageJob.setAggregatorClass(AverageAggregator.class, DanglingNodeAggregator.class); + // Vertex reader + pageJob.setVertexInputReaderClass(PagerankTextReader.class); + pageJob.setVertexIDClass(Text.class); pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); @@ -210,28 +206,23 @@ return pageJob; } - public static class DanglingNodeAggregator - extends - AbstractAggregator> { + private static void printUsage() { + System.out.println("Usage: [tasks]"); + System.exit(-1); + } - double danglingNodeSum; + public static void main(String[] args) throws IOException, + InterruptedException, ClassNotFoundException { + if (args.length < 2) + printUsage(); - @Override - public void aggregate(Vertex vertex, - DoubleWritable value) { - if (vertex != null) { - if (vertex.getEdges().size() == 0) { - danglingNodeSum += value.get(); - } - } else { - danglingNodeSum += value.get(); - } - } + HamaConfiguration conf = new HamaConfiguration(new Configuration()); + GraphJob pageJob = createJob(args, conf); - @Override - public DoubleWritable getValue() { - return new DoubleWritable(danglingNodeSum); + long startTime = System.currentTimeMillis(); + if (pageJob.waitForCompletion(true)) { + System.out.println("Job Finished in " + + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); } - } } Index: examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java (revision 1437242) +++ examples/src/main/java/org/apache/hama/examples/util/VertexInputGen.java (working copy) @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.util; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Random; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.FileOutputFormat; -import org.apache.hama.bsp.NullInputFormat; -import org.apache.hama.bsp.SequenceFileOutputFormat; -import org.apache.hama.bsp.TextArrayWritable; -import org.apache.hama.bsp.sync.SyncException; -import org.apache.hama.examples.CombineExample; -import org.apache.hama.examples.PageRank.PageRankVertex; -import org.apache.hama.graph.Edge; -import org.apache.hama.graph.Vertex; -import org.apache.hama.util.ReflectionUtils; - -public class VertexInputGen { - - public static final String SIZE_OF_MATRIX = "size.of.matrix"; - public static final String DENSITY = "density.of.matrix"; - - public static interface VertexCreator { - @SuppressWarnings("rawtypes") - Vertex createVertex(Text id, Text[] edges, Text value); - } - - public static class PageRankVertexCreatorImpl implements VertexCreator { - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public Vertex createVertex(Text id, Text[] edges, Text value) { - Vertex v = new PageRankVertex(); - v.setVertexID(id); - for (Text t : edges) { - v.addEdge(new Edge(t, null)); - } - return v; - } - - } - - public static int getVertexCaseId(Class classObj) { - if (classObj.getCanonicalName().equals( - PageRankVertexCreatorImpl.class.getCanonicalName())) { - return 1; - } - - return -1; - } - - @SuppressWarnings("rawtypes") - public static class VertexInputGenBSP extends - BSP { - - private Configuration conf; - private int sizeN; - private int density; - private Map> list = new HashMap>(); - private VertexCreator vertexCreator; - - @Override - public void setup( - BSPPeer peer) { - this.conf = peer.getConfiguration(); - sizeN = conf.getInt(SIZE_OF_MATRIX, 10); - density = conf.getInt(DENSITY, 1); - - int vertexCase = conf.getInt("hama.test.vertexcreatorid", -1); - if (vertexCase == 1) { - vertexCreator = new PageRankVertexCreatorImpl(); - } else { - throw new RuntimeException("No vertex creator specified"); - } - - } - - @Override - public void bsp( - BSPPeer peer) - throws IOException, SyncException, InterruptedException { - int interval = sizeN / peer.getNumPeers(); - int startID = peer.getPeerIndex() * interval; - int endID; - if (peer.getPeerIndex() == peer.getNumPeers() - 1) - endID = sizeN; - else - endID = startID + interval; - - // Generate N*(N+1) elements for lower triangular - for (int i = startID; i < endID; i++) { - HashSet edges = new HashSet(); - for (int j = 0; j <= i; j++) { - boolean nonZero = new Random().nextInt(density) == 0; - if (nonZero && !edges.contains(j) && i != j) { - edges.add(j); - - // TODO please refactor this. - int peerIndex = j / interval; - if (peerIndex == peer.getNumPeers()) - peerIndex = peerIndex - 1; - - peer.send(peer.getPeerName(j / interval), new Text(j + "," + i)); - } - } - - list.put(i, edges); - } - - // Synchronize the upper and lower - peer.sync(); - Text received; - while ((received = peer.getCurrentMessage()) != null) { - String[] kv = received.toString().split(","); - HashSet nList = list.get(Integer.parseInt(kv[0])); - nList.add(Integer.parseInt(kv[1])); - list.put(Integer.parseInt(kv[0]), nList); - } - } - - @Override - public void cleanup( - BSPPeer peer) - throws IOException { - for (Map.Entry> e : list.entrySet()) { - Text[] values = new Text[e.getValue().size()]; - if (values.length > 0) { - int i = 0; - for (Integer v : e.getValue()) { - values[i] = new Text(String.valueOf(v)); - i++; - } - peer.write( - (Vertex)this.vertexCreator.createVertex( - new Text(String.valueOf(e.getKey())), values, new Text()), - NullWritable.get()); - } - } - } - } - - public static void runJob(HamaConfiguration conf, int numTasks, String output, Class cls) - throws IOException, InterruptedException, ClassNotFoundException { - BSPJob bsp = new BSPJob(conf, VertexInputGen.class); - // Set the job name - bsp.setJobName("Random Vertex Input Generator"); - bsp.setBspClass(VertexInputGenBSP.class); - bsp.setInputFormat(NullInputFormat.class); - bsp.setOutputKeyClass(cls); - bsp.setOutputValueClass(NullWritable.class); - bsp.setOutputFormat(SequenceFileOutputFormat.class); - FileOutputFormat.setOutputPath(bsp, new Path(output)); - bsp.setNumBspTask(numTasks); - - long startTime = System.currentTimeMillis(); - if (bsp.waitForCompletion(true)) { - System.out.println("Job Finished in " - + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); - } - } - - public static void main(String[] args) throws InterruptedException, - IOException, ClassNotFoundException { - if (args.length < 4) { - System.out - .println("Usage: <1/x density> "); - System.exit(1); - } - - // BSP job configuration - HamaConfiguration conf = new HamaConfiguration(); - - conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(args[0])); - conf.setInt(DENSITY, Integer.parseInt(args[1])); - runJob(conf, Integer.parseInt(args[3]), args[2], Vertex.class); - - } - -} Index: examples/src/test/java/org/apache/hama/examples/PageRankTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/PageRankTest.java (revision 1437242) +++ examples/src/test/java/org/apache/hama/examples/PageRankTest.java (working copy) @@ -28,8 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hama.HamaConfiguration; -import org.apache.hama.examples.PageRank.PageRankVertex; -import org.apache.hama.examples.util.VertexInputGen; +import org.apache.hama.examples.util.SymmetricMatrixGen; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.GraphJobRunner; @@ -87,11 +86,7 @@ private void generateTestData() throws ClassNotFoundException, InterruptedException, IOException { - HamaConfiguration conf = new HamaConfiguration(); - conf.setInt(VertexInputGen.SIZE_OF_MATRIX, 40); - conf.setInt(VertexInputGen.DENSITY, 10); - conf.setInt("hama.test.vertexcreatorid", 1); - VertexInputGen.runJob(conf, 3, INPUT, PageRankVertex.class); + SymmetricMatrixGen.main(new String[] { "40", "10", INPUT, "2" }); } private void deleteTempDirs() {