Index: core/pom.xml =================================================================== --- core/pom.xml (revision 1673130) +++ core/pom.xml (working copy) @@ -154,7 +154,12 @@ io.netty netty-all 4.0.21.Final - + + + com.googlecode.json-simple + json-simple + 1.1 + Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1673130) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -18,27 +18,28 @@ package org.apache.hama.examples; import java.io.IOException; +import java.util.Iterator; +import org.apache.commons.cli.*; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.*; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextInputFormat; import org.apache.hama.bsp.TextOutputFormat; -import org.apache.hama.commons.io.TextArrayWritable; import org.apache.hama.graph.AverageAggregator; import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; import org.apache.hama.graph.VertexInputReader; +import org.json.simple.JSONArray; +import org.json.simple.parser.JSONParser; /** * Real pagerank with dangling node contribution. */ public class PageRank { + private static String FILE_TYPE = "graph.input.filetype"; public static class PageRankVertex extends Vertex { @@ -87,16 +88,23 @@ } } - public static class PagerankSeqReader - extends - VertexInputReader { + public static class PagerankTextReader extends + VertexInputReader { + @Override - public boolean parseVertex(Text key, TextArrayWritable value, + public boolean parseVertex(LongWritable key, Text value, Vertex vertex) throws Exception { - vertex.setVertexID(key); - for (Writable v : value.get()) { - vertex.addEdge(new Edge((Text) v, null)); + System.out.println("key : " + key + " & value : " + value); + + String[] tokenArray = value.toString().split("\t"); + String vtx = tokenArray[0].trim(); + String[] edges = tokenArray[1].trim().split(" "); + + vertex.setVertexID(new Text(vtx)); + + for (String v : edges) { + vertex.addEdge(new Edge(new Text(v), null)); } return true; @@ -103,14 +111,44 @@ } } - public static GraphJob createJob(String[] args, HamaConfiguration conf) - throws IOException { + public static class PagerankJsonReader extends + VertexInputReader { + JSONParser parser = new JSONParser(); + + @Override + public boolean parseVertex(LongWritable key, Text value, + Vertex vertex) throws Exception { + + String strValue = value.toString(); + JSONArray jsonArray = (JSONArray)parser.parse(strValue); + + vertex.setVertexID(new Text(jsonArray.get(0).toString())); + + Iterator iter = ((JSONArray)jsonArray.get(2)).iterator(); + while (iter.hasNext()) { + JSONArray edge = (JSONArray)iter.next(); + vertex.addEdge(new Edge( + new Text(edge.get(0).toString()), null)); + } + + return true; + } + } + + public static GraphJob createJob(String[] args, HamaConfiguration conf, Options opts) + throws IOException, ParseException { + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (!cliParser.hasOption("i") || !cliParser.hasOption("o")) { + System.out.println("No input or output path specified for PageRank, exiting."); + } + GraphJob pageJob = new GraphJob(conf, PageRank.class); pageJob.setJobName("Pagerank"); pageJob.setVertexClass(PageRankVertex.class); - pageJob.setInputPath(new Path(args[0])); - pageJob.setOutputPath(new Path(args[1])); + pageJob.setInputPath(new Path(cliParser.getOptionValue("i"))); + pageJob.setOutputPath(new Path(cliParser.getOptionValue("o"))); // set the defaults pageJob.setMaxIteration(30); @@ -120,8 +158,8 @@ pageJob.set("hama.graph.self.ref", "true"); pageJob.set("hama.graph.max.convergence.error", "0.001"); - if (args.length == 3) { - pageJob.setNumBspTask(Integer.parseInt(args[2])); + if (cliParser.hasOption("t")) { + pageJob.setNumBspTask(Integer.parseInt(cliParser.getOptionValue("t"))); } // error @@ -128,15 +166,34 @@ pageJob.setAggregatorClass(AverageAggregator.class); // Vertex reader - pageJob.setVertexInputReaderClass(PagerankSeqReader.class); + // According to file type, which is Text or Json, + // Vertex reader handle it differently. + if (cliParser.hasOption("f")) { + if (cliParser.getOptionValue("f").equals("Text")) { + System.out.println("Text started..."); + pageJob.setVertexInputReaderClass(PagerankTextReader.class); + } else if (cliParser.getOptionValue("f").equals("Json")) { + System.out.println("Json started..."); + pageJob.setVertexInputReaderClass(PagerankJsonReader.class); + } else { + System.out.println("File type is not available to run Pagerank... " + + "File type set default value, Text."); + pageJob.setVertexInputReaderClass(PagerankTextReader.class); + } + } else { + pageJob.setVertexInputReaderClass(PagerankTextReader.class); + } pageJob.setVertexIDClass(Text.class); pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); - pageJob.setInputFormat(SequenceFileInputFormat.class); - pageJob.setInputKeyClass(Text.class); - pageJob.setInputValueClass(TextArrayWritable.class); + //pageJob.setInputFormat(SequenceFileInputFormat.class); + pageJob.setInputFormat(TextInputFormat.class); + pageJob.setInputKeyClass(LongWritable.class); + pageJob.setInputValueClass(Text.class); +// pageJob.setInputKeyClass(Text.class); +// pageJob.setInputValueClass(TextArrayWritable.class); pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); @@ -146,17 +203,30 @@ } private static void printUsage() { - System.out.println("Usage: [tasks]"); + System.out.println("Usage: [tasks] [input_file_type]"); System.exit(-1); } - public static void main(String[] args) throws IOException, - InterruptedException, ClassNotFoundException { - if (args.length < 2) - printUsage(); + public static void main(String[] args) + throws IOException, InterruptedException, ClassNotFoundException, + ParseException { + Options opts = new Options(); + opts.addOption("i", "input_path", true, "The Location of output path."); + opts.addOption("o", "output_path", true, "The Location of input path."); + opts.addOption("h", "help", false, "Print usage"); + opts.addOption("t", "task_num", true, "The number of tasks."); + opts.addOption("f", "file_type", true, "The file type of input data. Input" + + "file format which is \"Text\" tab delimiter separated or \"Json\"." + + "Default value - Text"); + if (args.length < 2) { + new HelpFormatter().printHelp("PageRank -i INPUT_PATH -o OUTPUT_PATH " + + "[-t NUM_TASKS] [-f FILE_TYPE]", opts); + System.exit(-1); + } + HamaConfiguration conf = new HamaConfiguration(); - GraphJob pageJob = createJob(args, conf); + GraphJob pageJob = createJob(args, conf, opts); long startTime = System.currentTimeMillis(); if (pageJob.waitForCompletion(true)) { @@ -165,3 +235,4 @@ } } } + Index: examples/src/main/java/org/apache/hama/examples/util/FastGraphGen.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/FastGraphGen.java (revision 1673130) +++ examples/src/main/java/org/apache/hama/examples/util/FastGraphGen.java (working copy) @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Random; +import org.apache.commons.cli.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,16 +29,12 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.FileOutputFormat; -import org.apache.hama.bsp.NullInputFormat; -import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.*; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.commons.io.TextArrayWritable; import com.google.common.collect.Sets; +import org.json.simple.JSONArray; public class FastGraphGen { protected static Log LOG = LogFactory.getLog(FastGraphGen.class); @@ -44,6 +41,8 @@ private static String SIZE_OF_MATRIX = "size.of.matrix"; private static String MAX_EDGES = "max.outlinks"; + private static String OUTPUT_FORMAT = "graph.outputformat"; + private static String WEIGHT = "graph.weight"; public static class FastGraphGenBSP extends BSP { @@ -51,6 +50,8 @@ private Configuration conf; private int sizeN; private int maxOutEdges; + private boolean isJson; + private int weight; @Override public void setup( @@ -58,6 +59,8 @@ this.conf = peer.getConfiguration(); sizeN = conf.getInt(SIZE_OF_MATRIX, 10); maxOutEdges = conf.getInt(MAX_EDGES, 1); + isJson = conf.getBoolean(OUTPUT_FORMAT, false); + weight = conf.getInt(WEIGHT, 0); } @Override @@ -74,37 +77,96 @@ } Random r = new Random(); - for (int i = startID; i < endID; i++) { - HashSet set = Sets.newHashSet(); - for (int j = 0; j < maxOutEdges; j++) { - set.add(r.nextInt(sizeN)); + if (isJson) { + for (int i = startID; i < endID; i++) { + + JSONArray vtxArray = new JSONArray(); + vtxArray.add(i); + vtxArray.add(0); + JSONArray edgeArray = new JSONArray(); + HashSet set = Sets.newHashSet(); + for (int j = 0; j < maxOutEdges; j++) { + set.add(r.nextInt(sizeN)); + } + for (int x : set) { + JSONArray edge = new JSONArray(); + edge.add(x); + edge.add(r.nextInt(weight)); + edgeArray.add(edge); + } + vtxArray.add(edgeArray); + peer.write(new Text(vtxArray.toString()), null); } - TextArrayWritable textArrayWritable = new TextArrayWritable(); - Text[] arr = new Text[set.size()]; - int index = 0; - for (int x : set) { - arr[index++] = new Text(String.valueOf(x)); + + } else { + for (int i = startID; i < endID; i++) { + HashSet set = Sets.newHashSet(); + for (int j = 0; j < maxOutEdges; j++) { + set.add(r.nextInt(sizeN)); + } + + TextArrayWritable textArrayWritable = new TextArrayWritable(); + Text[] arr = new Text[set.size()]; + int index = 0; + for (int x : set) { + arr[index++] = new Text(String.valueOf(x)); + } + textArrayWritable.set(arr); + + peer.write(new Text(String.valueOf(i)), textArrayWritable); } - textArrayWritable.set(arr); - peer.write(new Text(String.valueOf(i)), textArrayWritable); } - } } - public static void main(String[] args) throws InterruptedException, - IOException, ClassNotFoundException { - if (args.length < 4) { - System.out - .println("Usage: "); - System.exit(1); + public static void main(String[] args) + throws InterruptedException, IOException, ClassNotFoundException, + ParseException { + Options opts = new Options(); + opts.addOption("s", "size", true, "The size of vertex. Default value is 5."); + opts.addOption("m", "max_out_edges", true, "The number of maximum edges. Default value is 3."); + opts.addOption("o", "output_path", true, "The Location of output path."); + opts.addOption("t", "task_num", true, "The number of tasks. Default value is one."); + opts.addOption("h", "help", false, "Print usage"); + opts.addOption("of", "output_format", true, "OutputFormat Type which is \"Text\", " + + "tab delimiter separated or \"Json\". Default value - Text"); + opts.addOption("w", "weight", true, "Enable to set weight of graph edges." + + "Default value - 0."); + + CommandLine cliParser = new GnuParser().parse(opts, args); + + // outputType, that has a value of "Text" unless true, + // when it has a value of "Json". + boolean outputType = false; + + if (args.length == 0) { + System.out.println("No args specified for FastGraphGen to initialize"); + System.exit(-1); } + if (cliParser.hasOption("h")) { + new HelpFormatter().printHelp("FastGraphGen -p OUTPUT_PATH [options]", opts); + return; + } + + if (!cliParser.hasOption("o")) { + System.out.println("No output path specified for FastGraphGen, exiting."); + System.exit(-1); + } + + if (cliParser.hasOption("of")) { + if (cliParser.getOptionValue("of").equals("Json")) + outputType = true; + } + // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); - conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(args[0])); - conf.setInt(MAX_EDGES, Integer.parseInt(args[1])); + conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(cliParser.getOptionValue("size", "5"))); + conf.setInt(MAX_EDGES, + Integer.parseInt(cliParser.getOptionValue("max_out_edges", "3"))); + conf.setBoolean(OUTPUT_FORMAT, outputType); + conf.setInt(WEIGHT, Integer.parseInt(cliParser.getOptionValue("weight", "1"))); BSPJob bsp = new BSPJob(conf, FastGraphGenBSP.class); // Set the job name @@ -113,9 +175,9 @@ bsp.setInputFormat(NullInputFormat.class); bsp.setOutputKeyClass(Text.class); bsp.setOutputValueClass(TextArrayWritable.class); - bsp.setOutputFormat(SequenceFileOutputFormat.class); - FileOutputFormat.setOutputPath(bsp, new Path(args[2])); - bsp.setNumBspTask(Integer.parseInt(args[3])); + bsp.setOutputFormat(TextOutputFormat.class); + FileOutputFormat.setOutputPath(bsp, new Path(cliParser.getOptionValue("output_path"))); + bsp.setNumBspTask(Integer.parseInt(cliParser.getOptionValue("task_num", "1"))); long startTime = System.currentTimeMillis(); if (bsp.waitForCompletion(true)) { Index: examples/src/test/java/org/apache/hama/examples/FastGraphGenTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/FastGraphGenTest.java (revision 1673130) +++ examples/src/test/java/org/apache/hama/examples/FastGraphGenTest.java (working copy) @@ -25,13 +25,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hama.commons.io.TextArrayWritable; import org.apache.hama.examples.util.FastGraphGen; +import org.json.simple.JSONArray; +import org.json.simple.parser.JSONParser; import org.junit.Test; +import java.io.BufferedReader; +import java.io.InputStreamReader; + public class FastGraphGenTest extends TestCase { protected static Log LOG = LogFactory.getLog(FastGraphGenTest.class); private static String TEST_OUTPUT = "/tmp/test"; @@ -40,26 +41,83 @@ public void testGraphGenerator() throws Exception { Configuration conf = new Configuration(); - FastGraphGen.main(new String[] { "20", "10", TEST_OUTPUT, "3" }); + // vertex size : 20 + // maximum edges : 10 + // output path : /tmp/test + // tasks num : 3 + FastGraphGen.main(new String[] { "-s", "20", "-m", "10", + "-p", TEST_OUTPUT, "-t", "3" }); FileSystem fs = FileSystem.get(conf); FileStatus[] globStatus = fs.globStatus(new Path(TEST_OUTPUT + "/part-*")); for (FileStatus fts : globStatus) { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(), - conf); - Text key = new Text(); - TextArrayWritable value = new TextArrayWritable(); + BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(fts.getPath()))); + try { + String line; + line = br.readLine(); + while (line != null) { + String[] keyValue = line.split("\t"); + String[] outlinkId = keyValue[1].split(" "); + assertTrue(outlinkId.length <= 10); + for (String edge : outlinkId) { + assertTrue(Integer.parseInt(edge) < 20); + assertTrue(Integer.parseInt(edge) >= 0); + } + line = br.readLine(); + } + } finally { + br.close(); + } + } - while (reader.next(key, value)) { - Writable[] writables = value.get(); - assertTrue(writables.length <= 10); - for (Writable t : writables) { - int outlinkId = Integer.parseInt(t.toString()); - assertTrue(outlinkId <= 20); - assertTrue(outlinkId >= 0); + fs.delete(new Path(TEST_OUTPUT), true); + } + + @Test + public void testJsonGraphGenerator() throws Exception { + Configuration conf = new Configuration(); + + // vertex size : 20 + // maximum edges : 10 + // output path : /tmp/test + // tasks num : 3 + // output type : Json + // weight : below 10 + FastGraphGen.main(new String[] { "-s", "30000", "-m", "10000", + "-p", TEST_OUTPUT, "-t", "1", "-of", "Json", "-w", "0" }); + FileSystem fs = FileSystem.get(conf); + + FileStatus[] globStatus = fs.globStatus(new Path(TEST_OUTPUT + "/part-*")); + JSONParser parser = new JSONParser(); + for (FileStatus fts : globStatus) { + BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(fts.getPath()))); + try { + String line; + line = br.readLine(); + + while (line != null) { + JSONArray jsonArray = (JSONArray)parser.parse(line); + + // the edge data begins at the third element. + JSONArray edgeArray = (JSONArray)jsonArray.get(2); + assertTrue(edgeArray.size() <= 10); + + for (Object obj : edgeArray) { + JSONArray edge = (JSONArray)obj; + assertTrue(Integer.parseInt(edge.get(0).toString()) < 20); + assertTrue(Integer.parseInt(edge.get(0).toString()) >= 0); + assertTrue(Integer.parseInt(edge.get(1).toString()) <= 10); + assertTrue(Integer.parseInt(edge.get(1).toString()) >= 1); + } + for (int i = 0; i < edgeArray.size(); ++i) { + } + line = br.readLine(); } + } finally { + br.close(); } - reader.close(); } fs.delete(new Path(TEST_OUTPUT), true); Index: examples/src/test/java/org/apache/hama/examples/PageRankTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/PageRankTest.java (revision 1673130) +++ examples/src/test/java/org/apache/hama/examples/PageRankTest.java (working copy) @@ -23,6 +23,7 @@ import junit.framework.TestCase; +import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -54,8 +55,10 @@ generateTestData(); try { - PageRank.main(new String[] { INPUT, OUTPUT, "3" }); + PageRank.main(new String[] { "-i", INPUT, "-p", OUTPUT, "-t", "1" }); verifyResult(); + } catch (ParseException e) { + e.printStackTrace(); } finally { deleteTempDirs(); } @@ -79,7 +82,8 @@ private void generateTestData() { try { - FastGraphGen.main(new String[] { "60", "3", INPUT, "3" }); + FastGraphGen.main(new String[] { "-s", "60", "-m", "3", + "-p", INPUT, "-t", "1" }); } catch (Exception e) { e.printStackTrace(); }