Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1419185) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -430,8 +430,6 @@ } else { fs.mkdirs(partitionedPath); } - // FIXME this is soo unsafe - RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job); List writers = new ArrayList( splits.length); @@ -450,12 +448,10 @@ Path p = new Path(partitionedPath, getPartitionName(i)); if (codec == null) { writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), CompressionType.NONE)); + job.getInputKeyClass(), job.getInputValueClass(), CompressionType.NONE)); } else { writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), compressionType, codec)); + job.getInputKeyClass(), job.getInputValueClass(), compressionType, codec)); } } Index: examples/src/main/java/org/apache/hama/examples/InlinkCount.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/InlinkCount.java (revision 1419185) +++ examples/src/main/java/org/apache/hama/examples/InlinkCount.java (working copy) @@ -22,17 +22,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; -import org.apache.hama.bsp.TextInputFormat; -import org.apache.hama.graph.Edge; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; public class InlinkCount extends Vertex { @@ -51,34 +49,6 @@ } } - public static class InlinkCountTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { System.out.println("Usage: [tasks]"); System.exit(-1); @@ -104,14 +74,14 @@ } inlinkJob.setVertexClass(InlinkCount.class); - inlinkJob.setInputFormat(TextInputFormat.class); - inlinkJob.setInputKeyClass(LongWritable.class); - inlinkJob.setInputValueClass(Text.class); + + inlinkJob.setInputFormat(SequenceFileInputFormat.class); + inlinkJob.setInputKeyClass(Text.class); + inlinkJob.setInputValueClass(TextArrayWritable.class); inlinkJob.setVertexIDClass(Text.class); inlinkJob.setVertexValueClass(IntWritable.class); inlinkJob.setEdgeValueClass(NullWritable.class); - inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class); inlinkJob.setPartitioner(HashPartitioner.class); inlinkJob.setOutputFormat(SequenceFileOutputFormat.class); Index: examples/src/main/java/org/apache/hama/examples/MindistSearch.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/MindistSearch.java (revision 1419185) +++ examples/src/main/java/org/apache/hama/examples/MindistSearch.java (working copy) @@ -22,18 +22,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; /** * Finding the mindist vertex in a connected component. @@ -97,34 +96,6 @@ } - public static class MindistSearchCountReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { System.out .println("Usage: [maximum iterations (default 30)] [tasks]"); @@ -157,10 +128,10 @@ job.setVertexValueClass(Text.class); job.setEdgeValueClass(NullWritable.class); - job.setInputKeyClass(LongWritable.class); - job.setInputValueClass(Text.class); - job.setInputFormat(TextInputFormat.class); - job.setVertexInputReaderClass(MindistSearchCountReader.class); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(Text.class); + job.setInputValueClass(TextArrayWritable.class); + job.setPartitioner(HashPartitioner.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1419185) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -23,19 +23,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.AverageAggregator; -import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; /** * Real pagerank with dangling node contribution. @@ -99,37 +97,8 @@ } } - public static class PagerankTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { - System.out - .println("Usage: [damping factor (default 0.85)] [Epsilon (convergence error, default 0.001)] [Max iterations (default 30)] [tasks]"); + System.out.println("Usage: [tasks]"); System.exit(-1); } @@ -161,15 +130,11 @@ // set the defaults pageJob.setMaxIteration(30); pageJob.set("hama.pagerank.alpha", "0.85"); + pageJob.set("hama.graph.max.convergence.error", "0.001"); - if (args.length == 6) - pageJob.setNumBspTask(Integer.parseInt(args[5])); - if (args.length >= 5) - pageJob.setMaxIteration(Integer.parseInt(args[4])); - if (args.length >= 4) - pageJob.set("hama.graph.max.convergence.error", args[3]); - if (args.length >= 3) - pageJob.set("hama.pagerank.alpha", args[2]); + if (args.length == 3) { + pageJob.setNumBspTask(Integer.parseInt(args[3])); + } // error, dangling node probability sum pageJob.setAggregatorClass(AverageAggregator.class, @@ -179,10 +144,10 @@ pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); - pageJob.setInputKeyClass(LongWritable.class); - pageJob.setInputValueClass(Text.class); - pageJob.setInputFormat(TextInputFormat.class); - pageJob.setVertexInputReaderClass(PagerankTextReader.class); + pageJob.setInputFormat(SequenceFileInputFormat.class); + pageJob.setInputKeyClass(Text.class); + pageJob.setInputValueClass(TextArrayWritable.class); + pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); pageJob.setOutputKeyClass(Text.class); Index: examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java (revision 1419214) +++ examples/src/main/java/org/apache/hama/examples/util/SymmetricMatrixGen.java (working copy) @@ -82,6 +82,7 @@ boolean nonZero = new Random().nextInt(density) == 0; if (nonZero && !edges.contains(j) && i != j) { edges.add(j); + // FIXME arrayoutofbound execption can be occured peer.send(peer.getPeerName(j / interval), new Text(j + "," + i)); } } Index: examples/src/test/java/org/apache/hama/examples/LinearRegressionTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/LinearRegressionTest.java (revision 1419185) +++ examples/src/test/java/org/apache/hama/examples/LinearRegressionTest.java (working copy) @@ -19,8 +19,6 @@ import org.junit.Test; -import static org.junit.Assert.fail; - /** * Testcase for {@link GradientDescentExample} for 'linear regression' */ Index: examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (revision 1419185) +++ examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hama.examples; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; @@ -31,8 +29,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.examples.MindistSearch.MinTextCombiner; public class MindistSearchTest extends TestCase { @@ -95,22 +96,25 @@ } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - e.printStackTrace(); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + new Path(INPUT), Text.class, TextArrayWritable.class); + + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (Exception e) { + e.printStackTrace(); } } Index: examples/src/test/java/org/apache/hama/examples/PageRankTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/PageRankTest.java (revision 1419185) +++ examples/src/test/java/org/apache/hama/examples/PageRankTest.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hama.examples; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; @@ -30,29 +28,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.util.SymmetricMatrixGen; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.GraphJobRunner; public class PageRankTest extends TestCase { - /** - * The graph looks like this (adjacency list, [] contains outlinks):
- * stackoverflow.com [yahoo.com]
- * google.com []
- * facebook.com [twitter.com, google.com, nasa.gov]
- * yahoo.com [nasa.gov, stackoverflow.com]
- * twitter.com [google.com, facebook.com]
- * nasa.gov [yahoo.com, stackoverflow.com]
- * youtube.com [google.com, yahoo.com]
- * Note that google is removed in this part mainly to test the repair - * functionality. - */ - String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", - "yahoo.com\tnasa.gov\tstackoverflow.com", - "twitter.com\tgoogle.com\tfacebook.com", - "nasa.gov\tyahoo.com\tstackoverflow.com", - "youtube.com\tgoogle.com\tyahoo.com" }; - private static String INPUT = "/tmp/pagerank-tmp.seq"; private static String TEXT_INPUT = "/tmp/pagerank.txt"; private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq"; @@ -101,24 +81,9 @@ } } - private void generateTestData() { - BufferedWriter bw = null; - try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + private void generateTestData() throws ClassNotFoundException, + InterruptedException, IOException { + SymmetricMatrixGen.main(new String[] { "20", "10", INPUT, "3" }); } private void deleteTempDirs() { Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1419185) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -165,9 +165,6 @@ .checkArgument(this.getConfiguration() .get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null, "Please provide an edge value class, if you don't need one, use NullWritable!"); - Preconditions.checkArgument( - this.getConfiguration().get(VERTEX_GRAPH_INPUT_READER) != null, - "Please provide a vertex input reader!"); super.submit(); } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1419185) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -28,8 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -273,53 +273,24 @@ private void loadVertices( BSPPeer peer) throws IOException, SyncException, InterruptedException { - - /* - * Several partitioning constants begin - */ - - final VertexInputReader reader = (VertexInputReader) ReflectionUtils - .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER, - VertexInputReader.class), conf); - final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false); - final boolean runtimePartitioning = conf.getBoolean( - GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true); - - final long splitSize = peer.getSplitSize(); - final int partitioningSteps = partitionMultiSteps(peer, splitSize); - final long interval = splitSize / partitioningSteps; final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); - /* - * Several partitioning constants end - */ - LOG.debug("vertex class: " + vertexClass); Vertex vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - long startPos = peer.getPos(); - if (startPos == 0) - startPos = 1L; - KeyValuePair next = null; - int steps = 1; while ((next = peer.readNext()) != null) { - boolean vertexFinished = false; - try { - vertexFinished = reader.parseVertex(next.getKey(), next.getValue(), - vertex); - } catch (Exception e) { - // LOG.error("exception occured during parsing vertex!" + e.toString()); - throw new IOException("exception occured during parsing vertex!" - + e.toString()); + V key = (V) next.getKey(); + Writable[] edges = ((ArrayWritable) next.getValue()).get(); + vertex.setVertexID(key); + List> edgeList = new ArrayList>(); + for (Writable edge : edges) { + edgeList.add(new Edge((V) edge, null)); } - - if (!vertexFinished) { - continue; - } + vertex.setEdges(edgeList); if (vertex.getEdges() == null) { if (selfReference) { @@ -334,44 +305,12 @@ vertex.addEdge(new Edge(vertex.getVertexID(), null)); } - if (runtimePartitioning) { - int partition = partitioner.getPartition(vertex.getVertexID(), - vertex.getValue(), peer.getNumPeers()); - peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex)); - } else { - vertex.setup(conf); - vertices.add(vertex); - } + vertex.setup(conf); + vertices.add(vertex); vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - - if (runtimePartitioning) { - if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval) { - peer.sync(); - steps++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - startPos = peer.getPos(); - } - } } - if (runtimePartitioning) { - peer.sync(); - - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - } LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); /* @@ -383,7 +322,7 @@ */ if (repairNeeded) { LOG.debug("Starting repair of this graph!"); - repair(peer, partitioningSteps, selfReference); + repair(peer, selfReference); } LOG.debug("Starting Vertex processing!"); @@ -392,83 +331,16 @@ @SuppressWarnings("unchecked") private void repair( BSPPeer peer, - int partitioningSteps, boolean selfReference) throws IOException, + boolean selfReference) throws IOException, SyncException, InterruptedException { - int multiSteps = 0; - MapWritable ssize = new MapWritable(); - ssize.put(new IntWritable(peer.getPeerIndex()), - new IntWritable(vertices.size())); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - int minVerticesSize = Integer.MAX_VALUE; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - int curr = ((IntWritable) e.getValue()).get(); - if (minVerticesSize > curr) { - minVerticesSize = curr; - } - } - } - - if (minVerticesSize < (partitioningSteps * 2)) { - multiSteps = minVerticesSize; - } else { - multiSteps = (partitioningSteps * 2); - } - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("steps"), new IntWritable(multiSteps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - Map> tmp = new HashMap>(); - int i = 0; - int syncs = 0; - for (Vertex v : vertices) { for (Edge e : v.getEdges()) { peer.send(v.getDestinationPeerName(e), new GraphJobMessage(e.getDestinationVertexID())); } - - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) { - peer.sync(); - syncs++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - V vertexName = (V) msg.getVertexId(); - - Vertex newVertex = newVertexInstance(vertexClass, conf); - newVertex.setVertexID(vertexName); - newVertex.runner = this; - if (selfReference) { - newVertex.setEdges(Collections.singletonList(new Edge( - newVertex.getVertexID(), null))); - } else { - newVertex.setEdges(new ArrayList>(0)); - } - newVertex.setup(conf); - tmp.put(vertexName, newVertex); - - } - } - i++; } peer.sync(); @@ -488,7 +360,6 @@ newVertex.setup(conf); tmp.put(vertexName, newVertex); newVertex = null; - } for (Vertex e : vertices) { @@ -502,59 +373,6 @@ } /** - * Partitions our vertices through multiple supersteps to save memory. - */ - private int partitionMultiSteps( - BSPPeer peer, - long splitSize) throws IOException, SyncException, InterruptedException { - int multiSteps = 1; - - MapWritable ssize = new MapWritable(); - ssize - .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize)); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - long maxSplitSize = 0L; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - long curr = ((LongWritable) e.getValue()).get(); - if (maxSplitSize < curr) { - maxSplitSize = curr; - } - } - } - - int steps = (int) (maxSplitSize / conf.getLong( // 20 mb - "hama.graph.multi.step.partitioning.interval", 20000000)) + 1; - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("max"), new IntWritable(steps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - - if (isMasterTask(peer)) { - peer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment( - multiSteps); - } - - return multiSteps; - } - - /** * Counts vertices globally by sending the count of vertices in the map to the * other peers. */ Index: graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (revision 1419185) +++ graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (working copy) @@ -17,35 +17,33 @@ */ package org.apache.hama.graph; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.TestBSPMasterGroomServer; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.example.PageRank; public class TestSubmitGraphJob extends TestBSPMasterGroomServer { String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]", - "yahoo.com\tnasa.gov\tstackoverflow.com]", - "twitter.com\tgoogle.com\tfacebook.com]", - "nasa.gov\tyahoo.com\tstackoverflow.com]", - "youtube.com\tgoogle.com\tyahoo.com]" }; + "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", + "yahoo.com\tnasa.gov\tstackoverflow.com", + "twitter.com\tgoogle.com\tfacebook.com", + "nasa.gov\tyahoo.com\tstackoverflow.com", + "youtube.com\tgoogle.com\tyahoo.com" }; private static String INPUT = "/tmp/pagerank-real-tmp.seq"; private static String OUTPUT = "/tmp/pagerank-real-out"; @@ -77,14 +75,15 @@ bsp.setAggregatorClass(AverageAggregator.class, PageRank.DanglingNodeAggregator.class); + bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputKeyClass(Text.class); + bsp.setInputValueClass(TextArrayWritable.class); + bsp.setVertexIDClass(Text.class); bsp.setVertexValueClass(DoubleWritable.class); bsp.setEdgeValueClass(NullWritable.class); - bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); - bsp.setInputFormat(TextInputFormat.class); - bsp.setInputKeyClass(LongWritable.class); - bsp.setInputValueClass(Text.class); bsp.setPartitioner(HashPartitioner.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(Text.class); @@ -123,26 +122,25 @@ } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), + new Path(INPUT), Text.class, TextArrayWritable.class); - File file = new File(INPUT); - LOG.info("Temp file length: " + file.length()); - - } catch (IOException e) { - e.printStackTrace(); + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (IOException e) { + e.printStackTrace(); } }