Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (Revision 1229940) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (Arbeitskopie) @@ -897,9 +897,19 @@ throw new IOException("Expect one token as the result of " + Shell.USER_NAME_COMMAND + ": " + toString(result)); } - return result[0]; + String fixResult = fixCygwinName(result[0]); + return fixResult; } + private static String fixCygwinName(String in) { + String string = in; + if (string.contains("\\")) { + // this is for cygwin systems + string = string.substring(string.indexOf("\\")); + } + return string; + } + static String getUnixUserGroupName(String user) throws IOException { String[] result = executeShellCommand(new String[] { "bash", "-c", "id -Gn " + user }); Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (Revision 1229940) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (Arbeitskopie) @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; @@ -42,11 +41,13 @@ import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.graph.VertexArrayWritable; import org.apache.hama.graph.VertexWritable; import org.apache.hama.util.KeyValuePair; public class PageRank extends - BSP { + BSP { + public static final Log LOG = LogFactory.getLog(PageRank.class); private final HashMap adjacencyList = new HashMap(); @@ -64,29 +65,40 @@ @Override public void setup( - BSPPeer peer) + BSPPeer peer) throws IOException { + + DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); + EPSILON = Double.parseDouble(conf.get("epsilon.error")); + MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); + masterTaskName = peer.getPeerName(0); + // map our stuff into ram - - KeyValuePair next = null; + KeyValuePair next = null; while ((next = peer.readNext()) != null) { - adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue() + adjacencyList.put(next.getKey(), (VertexWritable[]) next.getValue() .toArray()); vertexLookupMap.put(next.getKey().getName(), next.getKey()); } - // normally this is the global number of vertices + // normally this should be the global number of vertices numOfVertices = vertexLookupMap.size(); - DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; - EPSILON = Double.parseDouble(conf.get("epsilon.error")); - MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); - masterTaskName = peer.getPeerName(0); + + // reread the input to save ram + peer.reopenInput(); + VertexWritable key = new VertexWritable(); + VertexArrayWritable value = new VertexArrayWritable(); + while (peer.readNext(key, value)) { + VertexWritable vertexWritable = vertexLookupMap.get(key.getName()); + tentativePagerank + .put(vertexWritable, Double.valueOf(1.0 / numOfVertices)); + } } @Override public void bsp( - BSPPeer peer) + BSPPeer peer) throws IOException, SyncException, InterruptedException { // while the error not converges against epsilon do the pagerank stuff @@ -142,11 +154,11 @@ @Override public void cleanup( - BSPPeer peer) { + BSPPeer peer) { try { for (Entry row : tentativePagerank.entrySet()) { - peer.write(new Text(row.getKey().getName()), new DoubleWritable(row - .getValue())); + peer.write(new Text(row.getKey().getName()), + new DoubleWritable(row.getValue())); } } catch (IOException e) { e.printStackTrace(); @@ -154,7 +166,7 @@ } private double broadcastError( - BSPPeer peer, + BSPPeer peer, double error) throws IOException, SyncException, InterruptedException { peer.send(masterTaskName, new DoubleMessage("", error)); peer.sync(); @@ -194,7 +206,7 @@ } private void sendMessageToNeighbors( - BSPPeer peer, + BSPPeer peer, VertexWritable v) throws IOException { VertexWritable[] outgoingEdges = adjacencyList.get(v); for (VertexWritable adjacent : outgoingEdges) { @@ -209,7 +221,7 @@ public static void printUsage() { System.out.println("PageRank Example:"); System.out - .println(" "); + .println(" "); } public static void main(String[] args) throws IOException, @@ -228,7 +240,6 @@ conf.set("damping.factor", "0.85"); conf.set("epsilon.error", "0.000001"); - boolean inputGiven = false; if (args.length < 2) { System.out.println("You have to provide a damping factor and an error!"); System.out.println("Try using 0.85 0.001 as parameter!"); @@ -239,12 +250,13 @@ LOG.info("Set damping factor to " + args[0]); LOG.info("Set epsilon error to " + args[1]); if (args.length > 2) { - LOG.info("Set output path to " + args[2]); - job.setOutputPath(new Path(args[2])); + job.setInputPath(new Path(args[2])); + FileSystem.get(conf).delete(new Path(args[2], "hama-partitions"), true); + LOG.info("Set input path to " + args[2]); if (args.length == 4) { - job.setInputPath(new Path(args[3])); - LOG.info("Using custom input at " + args[3]); - inputGiven = true; + LOG.info("Using output at " + args[3]); + job.setOutputPath(new Path(args[3])); + FileSystem.get(conf).delete(new Path(args[3]), true); } } } @@ -255,18 +267,11 @@ // leave the iterations on default conf.set("max.iterations", "0"); - if (!inputGiven) { - Path tmp = new Path("pagerank/input"); - FileSystem.get(conf).delete(tmp, true); - // ShortestPathsGraphLoader.loadGraph(conf, tmp); - job.setInputPath(tmp); - } - job.setInputFormat(SequenceFileInputFormat.class); job.setPartitioner(HashPartitioner.class); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); + job.setOutputValueClass(DoubleWritable.class); job.setNumBspTask(cluster.getMaxTasks()); job.setBspClass(PageRank.class); @@ -286,8 +291,12 @@ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); Text key = new Text(); DoubleWritable value = new DoubleWritable(); + int count = 0; while (reader.next(key, value)) { LOG.info(key.toString() + " | " + value.get()); + count++; + if (count > 5) + break; } reader.close(); }