Index: core/pom.xml
===================================================================
--- core/pom.xml (revision 1673341)
+++ core/pom.xml (working copy)
@@ -154,7 +154,12 @@
io.netty
netty-all
4.0.21.Final
-
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1
+
Index: examples/src/main/java/org/apache/hama/examples/PageRank.java
===================================================================
--- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1673341)
+++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy)
@@ -18,22 +18,22 @@
package org.apache.hama.examples;
import java.io.IOException;
+import java.util.Iterator;
+import org.apache.commons.cli.*;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.commons.io.TextArrayWritable;
import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
+import org.json.simple.JSONArray;
+import org.json.simple.parser.JSONParser;
/**
* Real pagerank with dangling node contribution.
@@ -87,16 +87,21 @@
}
}
- public static class PagerankSeqReader
- extends
- VertexInputReader {
+ public static class PagerankTextReader extends
+ VertexInputReader {
+
@Override
- public boolean parseVertex(Text key, TextArrayWritable value,
+ public boolean parseVertex(LongWritable key, Text value,
Vertex vertex) throws Exception {
- vertex.setVertexID(key);
- for (Writable v : value.get()) {
- vertex.addEdge(new Edge((Text) v, null));
+ String[] tokenArray = value.toString().split("\t");
+ String vtx = tokenArray[0].trim();
+ String[] edges = tokenArray[1].trim().split(" ");
+
+ vertex.setVertexID(new Text(vtx));
+
+ for (String v : edges) {
+ vertex.addEdge(new Edge(new Text(v), null));
}
return true;
@@ -103,14 +108,44 @@
}
}
- public static GraphJob createJob(String[] args, HamaConfiguration conf)
- throws IOException {
+ public static class PagerankJsonReader extends
+ VertexInputReader {
+ JSONParser parser = new JSONParser();
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex vertex) throws Exception {
+
+ String strValue = value.toString();
+ JSONArray jsonArray = (JSONArray)parser.parse(strValue);
+
+ vertex.setVertexID(new Text(jsonArray.get(0).toString()));
+
+ Iterator iter = ((JSONArray)jsonArray.get(2)).iterator();
+ while (iter.hasNext()) {
+ JSONArray edge = (JSONArray)iter.next();
+ vertex.addEdge(new Edge(
+ new Text(edge.get(0).toString()), null));
+ }
+
+ return true;
+ }
+ }
+
+ public static GraphJob createJob(String[] args, HamaConfiguration conf, Options opts)
+ throws IOException, ParseException {
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (!cliParser.hasOption("i") || !cliParser.hasOption("o")) {
+ System.out.println("No input or output path specified for PageRank, exiting.");
+ }
+
GraphJob pageJob = new GraphJob(conf, PageRank.class);
pageJob.setJobName("Pagerank");
pageJob.setVertexClass(PageRankVertex.class);
- pageJob.setInputPath(new Path(args[0]));
- pageJob.setOutputPath(new Path(args[1]));
+ pageJob.setInputPath(new Path(cliParser.getOptionValue("i")));
+ pageJob.setOutputPath(new Path(cliParser.getOptionValue("o")));
// set the defaults
pageJob.setMaxIteration(30);
@@ -120,8 +155,8 @@
pageJob.set("hama.graph.self.ref", "true");
pageJob.set("hama.graph.max.convergence.error", "0.001");
- if (args.length == 3) {
- pageJob.setNumBspTask(Integer.parseInt(args[2]));
+ if (cliParser.hasOption("t")) {
+ pageJob.setNumBspTask(Integer.parseInt(cliParser.getOptionValue("t")));
}
// error
@@ -128,15 +163,29 @@
pageJob.setAggregatorClass(AverageAggregator.class);
// Vertex reader
- pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
+ // According to file type, which is Text or Json,
+ // Vertex reader handle it differently.
+ if (cliParser.hasOption("f")) {
+ if (cliParser.getOptionValue("f").equals("text")) {
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ } else if (cliParser.getOptionValue("f").equals("json")) {
+ pageJob.setVertexInputReaderClass(PagerankJsonReader.class);
+ } else {
+ System.out.println("File type is not available to run Pagerank... "
+ + "File type set default value, Text.");
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ }
+ } else {
+ pageJob.setVertexInputReaderClass(PagerankTextReader.class);
+ }
pageJob.setVertexIDClass(Text.class);
pageJob.setVertexValueClass(DoubleWritable.class);
pageJob.setEdgeValueClass(NullWritable.class);
- pageJob.setInputFormat(SequenceFileInputFormat.class);
- pageJob.setInputKeyClass(Text.class);
- pageJob.setInputValueClass(TextArrayWritable.class);
+ pageJob.setInputFormat(TextInputFormat.class);
+ pageJob.setInputKeyClass(LongWritable.class);
+ pageJob.setInputValueClass(Text.class);
pageJob.setPartitioner(HashPartitioner.class);
pageJob.setOutputFormat(TextOutputFormat.class);
@@ -145,18 +194,26 @@
return pageJob;
}
- private static void printUsage() {
- System.out.println("Usage: