Index: core/pom.xml =================================================================== --- core/pom.xml (revision 1671493) +++ core/pom.xml (working copy) @@ -154,7 +154,12 @@ io.netty netty-all 4.0.21.Final - + + + com.googlecode.json-simple + json-simple + 1.1 + Index: examples/src/main/java/org/apache/hama/examples/util/FastGraphGen.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/FastGraphGen.java (revision 1671493) +++ examples/src/main/java/org/apache/hama/examples/util/FastGraphGen.java (working copy) @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Random; +import org.apache.commons.cli.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -28,16 +29,12 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.BSP; -import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.bsp.FileOutputFormat; -import org.apache.hama.bsp.NullInputFormat; -import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.bsp.*; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.commons.io.TextArrayWritable; import com.google.common.collect.Sets; +import org.json.simple.JSONArray; public class FastGraphGen { protected static Log LOG = LogFactory.getLog(FastGraphGen.class); @@ -44,6 +41,8 @@ private static String SIZE_OF_MATRIX = "size.of.matrix"; private static String MAX_EDGES = "max.outlinks"; + private static String OUTPUT_FORMAT = "graph.outputformat"; + private static String WEIGHT = "graph.weight"; public static class FastGraphGenBSP extends BSP { @@ -51,6 +50,8 @@ private Configuration conf; private int sizeN; private int maxOutEdges; + private boolean isJson; + private int weight; @Override public void setup( @@ -58,6 +59,8 @@ this.conf = peer.getConfiguration(); sizeN = conf.getInt(SIZE_OF_MATRIX, 10); maxOutEdges = conf.getInt(MAX_EDGES, 1); + isJson = conf.getBoolean(OUTPUT_FORMAT, false); + weight = conf.getInt(WEIGHT, 0); } @Override @@ -74,37 +77,96 @@ } Random r = new Random(); - for (int i = startID; i < endID; i++) { - HashSet set = Sets.newHashSet(); - for (int j = 0; j < maxOutEdges; j++) { - set.add(r.nextInt(sizeN)); + if (isJson) { + for (int i = startID; i < endID; i++) { + + JSONArray vtxArray = new JSONArray(); + vtxArray.add(i); + vtxArray.add(0); + JSONArray edgeArray = new JSONArray(); + HashSet set = Sets.newHashSet(); + for (int j = 0; j < maxOutEdges; j++) { + set.add(r.nextInt(sizeN)); + } + for (int x : set) { + JSONArray edge = new JSONArray(); + edge.add(x); + edge.add(r.nextInt(weight)); + edgeArray.add(edge); + } + vtxArray.add(edgeArray); + peer.write(new Text(vtxArray.toString()), null); } - TextArrayWritable textArrayWritable = new TextArrayWritable(); - Text[] arr = new Text[set.size()]; - int index = 0; - for (int x : set) { - arr[index++] = new Text(String.valueOf(x)); + + } else { + for (int i = startID; i < endID; i++) { + HashSet set = Sets.newHashSet(); + for (int j = 0; j < maxOutEdges; j++) { + set.add(r.nextInt(sizeN)); + } + + TextArrayWritable textArrayWritable = new TextArrayWritable(); + Text[] arr = new Text[set.size()]; + int index = 0; + for (int x : set) { + arr[index++] = new Text(String.valueOf(x)); + } + textArrayWritable.set(arr); + + peer.write(new Text(String.valueOf(i)), textArrayWritable); } - textArrayWritable.set(arr); - peer.write(new Text(String.valueOf(i)), textArrayWritable); } - } } - public static void main(String[] args) throws InterruptedException, - IOException, ClassNotFoundException { - if (args.length < 4) { - System.out - .println("Usage: "); - System.exit(1); + public static void main(String[] args) + throws InterruptedException, IOException, ClassNotFoundException, + ParseException { + Options opts = new Options(); + opts.addOption("s", "size", true, "The size of vertex. Default value is 5."); + opts.addOption("m", "max_out_edges", true, "The number of maximum edges. Default value is 3."); + opts.addOption("p", "output_path", true, "The Location of output Path."); + opts.addOption("t", "task_num", true, "The number of tasks. Default value is one."); + opts.addOption("h", "help", false, "Print usage"); + opts.addOption("of", "output_format", true, "OutputFormat Type which is \"Text\", " + + "tab delimiter separated or \"Json\". Default value - Text"); + opts.addOption("w", "weight", true, "Enable to set weight of graph edges." + + "Default value - 0."); + + CommandLine cliParser = new GnuParser().parse(opts, args); + + // outputType, that has a value of "Text" unless true, + // when it has a value of "Json". + boolean outputType = false; + + if (args.length == 0) { + System.out.println("No args specified for FastGraphGen to initialize"); + System.exit(-1); } + if (cliParser.hasOption("h")) { + new HelpFormatter().printHelp("FastGraphGen -p OUTPUT_PATH [options]", opts); + return; + } + + if (!cliParser.hasOption("p")) { + System.out.println("No output path specified for FastGraphGen, exiting."); + System.exit(-1); + } + + if (cliParser.hasOption("of")) { + if (cliParser.getOptionValue("of").equals("Json")) + outputType = true; + } + // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); - conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(args[0])); - conf.setInt(MAX_EDGES, Integer.parseInt(args[1])); + conf.setInt(SIZE_OF_MATRIX, Integer.parseInt(cliParser.getOptionValue("size", "5"))); + conf.setInt(MAX_EDGES, + Integer.parseInt(cliParser.getOptionValue("max_out_edges", "3"))); + conf.setBoolean(OUTPUT_FORMAT, outputType); + conf.setInt(WEIGHT, Integer.parseInt(cliParser.getOptionValue("weight", "1"))); BSPJob bsp = new BSPJob(conf, FastGraphGenBSP.class); // Set the job name @@ -113,9 +175,9 @@ bsp.setInputFormat(NullInputFormat.class); bsp.setOutputKeyClass(Text.class); bsp.setOutputValueClass(TextArrayWritable.class); - bsp.setOutputFormat(SequenceFileOutputFormat.class); - FileOutputFormat.setOutputPath(bsp, new Path(args[2])); - bsp.setNumBspTask(Integer.parseInt(args[3])); + bsp.setOutputFormat(TextOutputFormat.class); + FileOutputFormat.setOutputPath(bsp, new Path(cliParser.getOptionValue("output_path"))); + bsp.setNumBspTask(Integer.parseInt(cliParser.getOptionValue("task_num", "1"))); long startTime = System.currentTimeMillis(); if (bsp.waitForCompletion(true)) { Index: examples/src/test/java/org/apache/hama/examples/FastGraphGenTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/FastGraphGenTest.java (revision 1671493) +++ examples/src/test/java/org/apache/hama/examples/FastGraphGenTest.java (working copy) @@ -25,13 +25,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hama.commons.io.TextArrayWritable; import org.apache.hama.examples.util.FastGraphGen; +import org.json.simple.JSONArray; +import org.json.simple.parser.JSONParser; import org.junit.Test; +import java.io.BufferedReader; +import java.io.InputStreamReader; + public class FastGraphGenTest extends TestCase { protected static Log LOG = LogFactory.getLog(FastGraphGenTest.class); private static String TEST_OUTPUT = "/tmp/test"; @@ -40,26 +41,83 @@ public void testGraphGenerator() throws Exception { Configuration conf = new Configuration(); - FastGraphGen.main(new String[] { "20", "10", TEST_OUTPUT, "3" }); + // vertex size : 20 + // maximum edges : 10 + // output path : /tmp/test + // tasks num : 3 + FastGraphGen.main(new String[] { "-s", "20", "-m", "10", + "-p", TEST_OUTPUT, "-t", "3" }); FileSystem fs = FileSystem.get(conf); FileStatus[] globStatus = fs.globStatus(new Path(TEST_OUTPUT + "/part-*")); for (FileStatus fts : globStatus) { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(), - conf); - Text key = new Text(); - TextArrayWritable value = new TextArrayWritable(); + BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(fts.getPath()))); + try { + String line; + line = br.readLine(); + while (line != null) { + String[] keyValue = line.split("\t"); + String[] outlinkId = keyValue[1].split(" "); + assertTrue(outlinkId.length <= 10); + for (String edge : outlinkId) { + assertTrue(Integer.parseInt(edge) < 20); + assertTrue(Integer.parseInt(edge) >= 0); + } + line = br.readLine(); + } + } finally { + br.close(); + } + } - while (reader.next(key, value)) { - Writable[] writables = value.get(); - assertTrue(writables.length <= 10); - for (Writable t : writables) { - int outlinkId = Integer.parseInt(t.toString()); - assertTrue(outlinkId <= 20); - assertTrue(outlinkId >= 0); + fs.delete(new Path(TEST_OUTPUT), true); + } + + @Test + public void testJsonGraphGenerator() throws Exception { + Configuration conf = new Configuration(); + + // vertex size : 20 + // maximum edges : 10 + // output path : /tmp/test + // tasks num : 3 + // output type : Json + // weight : below 10 + FastGraphGen.main(new String[] { "-s", "30000", "-m", "10000", + "-p", TEST_OUTPUT, "-t", "1", "-of", "Json", "-w", "0" }); + FileSystem fs = FileSystem.get(conf); + + FileStatus[] globStatus = fs.globStatus(new Path(TEST_OUTPUT + "/part-*")); + JSONParser parser = new JSONParser(); + for (FileStatus fts : globStatus) { + BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(fts.getPath()))); + try { + String line; + line = br.readLine(); + + while (line != null) { + JSONArray jsonArray = (JSONArray)parser.parse(line); + + // the edge data begins at the third element. + JSONArray edgeArray = (JSONArray)jsonArray.get(2); + assertTrue(edgeArray.size() <= 10); + + for (Object obj : edgeArray) { + JSONArray edge = (JSONArray)obj; + assertTrue(Integer.parseInt(edge.get(0).toString()) < 20); + assertTrue(Integer.parseInt(edge.get(0).toString()) >= 0); + assertTrue(Integer.parseInt(edge.get(1).toString()) <= 10); + assertTrue(Integer.parseInt(edge.get(1).toString()) >= 1); + } + for (int i = 0; i < edgeArray.size(); ++i) { + } + line = br.readLine(); } + } finally { + br.close(); } - reader.close(); } fs.delete(new Path(TEST_OUTPUT), true);