Index: core/src/main/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJob.java (revision 1229466) +++ core/src/main/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -230,7 +230,7 @@ } public int getNumBspTask() { - // default is 1, because with zero, we will hang in infinity + // default is 1, because with zero, we will hang in infinity return conf.getInt("bsp.peers.num", 1); } Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1229466) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -897,9 +897,19 @@ throw new IOException("Expect one token as the result of " + Shell.USER_NAME_COMMAND + ": " + toString(result)); } - return result[0]; + String fixResult = fixCygwinName(result[0]); + return fixResult; } + private static String fixCygwinName(String in) { + String string = in; + if (string.contains("\\")) { + // this is for cygwin systems + string = string.substring(string.indexOf("\\")); + } + return string; + } + static String getUnixUserGroupName(String user) throws IOException { String[] result = executeShellCommand(new String[] { "bash", "-c", "id -Gn " + user }); Index: examples/src/main/java/org/apache/hama/examples/ExampleDriver.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (revision 1229466) +++ examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (working copy) @@ -30,6 +30,7 @@ pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path"); pgd.addClass("cmb", CombineExample.class, "Combine"); pgd.addClass("bench", RandBench.class, "Random Benchmark"); + pgd.addClass("pagerank", PageRank.class, "PageRank"); pgd.driver(args); } catch (Throwable e) { e.printStackTrace(); Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1229466) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -28,25 +28,24 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.DoubleMessage; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.graph.VertexArrayWritable; import org.apache.hama.graph.VertexWritable; import org.apache.hama.util.KeyValuePair; public class PageRank extends - BSP { + BSP { + public static final Log LOG = LogFactory.getLog(PageRank.class); private final HashMap adjacencyList = new HashMap(); @@ -64,29 +63,40 @@ @Override public void setup( - BSPPeer peer) + BSPPeer peer) throws IOException { - // map our stuff into ram - KeyValuePair next = null; + DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); + EPSILON = Double.parseDouble(conf.get("epsilon.error")); + MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); + masterTaskName = peer.getPeerName(0); + + // map our stuff into ram + KeyValuePair next = null; while ((next = peer.readNext()) != null) { - adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue() + adjacencyList.put(next.getKey(), (VertexWritable[]) next.getValue() .toArray()); vertexLookupMap.put(next.getKey().getName(), next.getKey()); } - // normally this is the global number of vertices + // normally this should be the global number of vertices numOfVertices = vertexLookupMap.size(); - DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; - EPSILON = Double.parseDouble(conf.get("epsilon.error")); - MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); - masterTaskName = peer.getPeerName(0); + + // reread the input to save ram + peer.reopenInput(); + VertexWritable key = new VertexWritable(); + VertexArrayWritable value = new VertexArrayWritable(); + while (peer.readNext(key, value)) { + VertexWritable vertexWritable = vertexLookupMap.get(key.getName()); + tentativePagerank + .put(vertexWritable, Double.valueOf(1.0 / numOfVertices)); + } } @Override public void bsp( - BSPPeer peer) + BSPPeer peer) throws IOException, SyncException, InterruptedException { // while the error not converges against epsilon do the pagerank stuff @@ -142,11 +152,11 @@ @Override public void cleanup( - BSPPeer peer) { + BSPPeer peer) { try { for (Entry row : tentativePagerank.entrySet()) { - peer.write(new Text(row.getKey().getName()), new DoubleWritable(row - .getValue())); + peer.write(new Text(row.getKey().getName()), + new DoubleWritable(row.getValue())); } } catch (IOException e) { e.printStackTrace(); @@ -154,7 +164,7 @@ } private double broadcastError( - BSPPeer peer, + BSPPeer peer, double error) throws IOException, SyncException, InterruptedException { peer.send(masterTaskName, new DoubleMessage("", error)); peer.sync(); @@ -194,7 +204,7 @@ } private void sendMessageToNeighbors( - BSPPeer peer, + BSPPeer peer, VertexWritable v) throws IOException { VertexWritable[] outgoingEdges = adjacencyList.get(v); for (VertexWritable adjacent : outgoingEdges) { @@ -206,10 +216,32 @@ } } + static void printOutput(FileSystem fs, Configuration conf) throws IOException { + LOG.info("-------------------- RESULTS --------------------"); + FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir"))); + for (FileStatus status : stati) { + if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) { + Path path = status.getPath(); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + Text key = new Text(); + DoubleWritable value = new DoubleWritable(); + int count = 0; + while (reader.next(key, value)) { + LOG.info(key.toString() + " | " + value.get()); + count++; + if (count > 5) + break; + } + reader.close(); + } + } + } + public static void printUsage() { System.out.println("PageRank Example:"); System.out - .println(" "); + .println(" [damping factor] [epsilon error] [tasks]"); + } public static void main(String[] args) throws IOException, @@ -221,77 +253,34 @@ } HamaConfiguration conf = new HamaConfiguration(new Configuration()); - BSPJob job = new BSPJob(conf); - job.setOutputPath(new Path("pagerank/output")); + BSPJob job = new BSPJob(conf, PageRank.class); + job.setJobName("Pagerank"); - // set the defaults - conf.set("damping.factor", "0.85"); - conf.set("epsilon.error", "0.000001"); - - boolean inputGiven = false; - if (args.length < 2) { - System.out.println("You have to provide a damping factor and an error!"); - System.out.println("Try using 0.85 0.001 as parameter!"); - System.exit(-1); - } else { - conf.set("damping.factor", args[0]); - conf.set("epsilon.error", args[1]); - LOG.info("Set damping factor to " + args[0]); - LOG.info("Set epsilon error to " + args[1]); - if (args.length > 2) { - LOG.info("Set output path to " + args[2]); - job.setOutputPath(new Path(args[2])); - if (args.length == 4) { - job.setInputPath(new Path(args[3])); - LOG.info("Using custom input at " + args[3]); - inputGiven = true; - } - } + job.setInputPath(new Path(args[0])); + job.setOutputPath(new Path(args[1])); + + conf.set("damping.factor", (args.length > 2) ? args[2] : "0.85"); + conf.set("epsilon.error", (args.length > 3) ? args[3] : "0.000001"); + if (args.length == 5) { + job.setNumBspTask(Integer.parseInt(args[4])); } - BSPJobClient jobClient = new BSPJobClient(conf); - ClusterStatus cluster = jobClient.getClusterStatus(true); - // leave the iterations on default conf.set("max.iterations", "0"); - if (!inputGiven) { - Path tmp = new Path("pagerank/input"); - FileSystem.get(conf).delete(tmp, true); - // ShortestPathsGraphLoader.loadGraph(conf, tmp); - job.setInputPath(tmp); - } - job.setInputFormat(SequenceFileInputFormat.class); job.setPartitioner(HashPartitioner.class); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumBspTask(cluster.getMaxTasks()); + job.setOutputValueClass(DoubleWritable.class); job.setBspClass(PageRank.class); - job.setJarByClass(PageRank.class); - job.setJobName("Pagerank"); + + long startTime = System.currentTimeMillis(); if (job.waitForCompletion(true)) { printOutput(FileSystem.get(conf), conf); + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds"); } } - - static void printOutput(FileSystem fs, Configuration conf) throws IOException { - LOG.info("-------------------- RESULTS --------------------"); - FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir"))); - for (FileStatus status : stati) { - if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) { - Path path = status.getPath(); - SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); - Text key = new Text(); - DoubleWritable value = new DoubleWritable(); - while (reader.next(key, value)) { - LOG.info(key.toString() + " | " + value.get()); - } - reader.close(); - } - } - } - } Index: examples/src/main/java/org/apache/hama/examples/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (revision 1229466) +++ examples/src/main/java/org/apache/hama/examples/ShortestPaths.java (working copy) @@ -199,7 +199,7 @@ } public static void printUsage() { - System.out.println("Usage: [numTasks]"); + System.out.println("Usage: [tasks]"); } public static void main(String[] args) throws IOException, @@ -218,8 +218,8 @@ bsp.setJobName("Single Source Shortest Path"); conf.set(START_VERTEX, args[0]); - bsp.setOutputPath(new Path(args[1])); - bsp.setInputPath(new Path(args[2])); + bsp.setInputPath(new Path(args[1])); + bsp.setOutputPath(new Path(args[2])); if(args.length == 4) { bsp.setNumBspTask(Integer.parseInt(args[3]));