Index: core/src/main/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJob.java (revision 1419949) +++ core/src/main/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -22,6 +22,7 @@ import java.net.URLDecoder; import java.util.Enumeration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -219,8 +220,41 @@ state = JobState.RUNNING; } + boolean isPartitioned = false; + public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException { + if (this.getConfiguration().get("bsp.input.partitioner.class") != null + && !isPartitioned) { + FileSystem fs = FileSystem.get(conf); + Path inputDir = new Path(conf.get("bsp.input.dir")); + if (fs.isFile(inputDir)) { + inputDir = inputDir.getParent(); + } + Path partitionDir = new Path(inputDir + "/partitions"); + + if (fs.exists(partitionDir)) { + fs.delete(partitionDir, true); + } + + HamaConfiguration conf = new HamaConfiguration(); + conf.setInt("desired.num.of.tasks", + Integer.parseInt(this.getConfiguration().get("bsp.peers.num"))); + BSPJob partitioningJob = new BSPJob(conf); + partitioningJob.setInputPath(new Path(this.getConfiguration().get( + "bsp.input.dir"))); + partitioningJob.setInputFormat(this.getInputFormat().getClass()); + partitioningJob.setInputKeyClass(this.getInputKeyClass()); + partitioningJob.setInputValueClass(getInputValueClass()); + partitioningJob.setOutputFormat(NullOutputFormat.class); + partitioningJob.setBspClass(PartitioningRunner.class); + + isPartitioned = partitioningJob.waitForCompletion(true); + if (isPartitioned) { + this.setInputPath(new Path(inputDir + "/partitions")); + } + } + if (state == JobState.DEFINE) { submit(); } Index: core/src/main/java/org/apache/hama/bsp/BSPJobClient.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (revision 1419949) +++ core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (working copy) @@ -30,9 +30,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.StringTokenizer; @@ -55,7 +53,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -301,9 +298,10 @@ throws IOException { BSPJob job = pJob; job.setJobID(jobId); - int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0); + int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, + 0); int maxTasks = checkTaskLimits(job, limitTasks); - + Path submitJobDir = new Path(getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); Path submitSplitFile = new Path(submitJobDir, "job.split"); @@ -325,12 +323,6 @@ if (job.get("bsp.input.dir") != null) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); - if (job.getConfiguration().get("bsp.input.partitioner.class") != null - && !job.getConfiguration() - .getBoolean("hama.graph.runtime.partitioning", false)) { - job = partition(job, maxTasks); - maxTasks = job.getInt("hama.partition.count", maxTasks); - } job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks)); job.set("bsp.job.split.file", submitSplitFile.toString()); } @@ -375,15 +367,16 @@ protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { int maxTasks; ClusterStatus clusterStatus = getClusterStatus(true); - - if(limitTasks > 0) { + + if (limitTasks > 0) { maxTasks = limitTasks; } else { maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks(); } - + if (maxTasks < job.getNumBspTask()) { - throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed."); + throw new IOException( + "Job failed! The number of tasks has exceeded the maximum allowed."); } return maxTasks; } @@ -402,97 +395,10 @@ } } - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected BSPJob partition(BSPJob job, int maxTasks) throws IOException { - InputSplit[] splits = job.getInputFormat().getSplits( - job, - (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() - : maxTasks); - - String input = job.getConfiguration().get("bsp.input.dir"); - - if (input != null) { - InputFormat inputFormat = job.getInputFormat(); - - Path partitionedPath = new Path(input, "hama-partitions"); - Path inputPath = new Path(input); - if (fs.isFile(inputPath)) { - partitionedPath = new Path(inputPath.getParent(), "hama-partitions"); - } - - String alternatePart = job.get("bsp.partitioning.dir"); - if (alternatePart != null) { - partitionedPath = new Path(alternatePart, job.getJobID().toString()); - } - - if (fs.exists(partitionedPath)) { - fs.delete(partitionedPath, true); - } else { - fs.mkdirs(partitionedPath); - } - // FIXME this is soo unsafe - RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job); - - List writers = new ArrayList( - splits.length); - - CompressionType compressionType = getOutputCompressionType(job); - Class outputCompressorClass = getOutputCompressorClass( - job, null); - CompressionCodec codec = null; - if (outputCompressorClass != null) { - codec = ReflectionUtils.newInstance(outputCompressorClass, - job.getConfiguration()); - } - - try { - for (int i = 0; i < splits.length; i++) { - Path p = new Path(partitionedPath, getPartitionName(i)); - if (codec == null) { - writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), CompressionType.NONE)); - } else { - writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, - sampleReader.createKey().getClass(), sampleReader.createValue() - .getClass(), compressionType, codec)); - } - } - - Partitioner partitioner = job.getPartitioner(); - for (int i = 0; i < splits.length; i++) { - InputSplit split = splits[i]; - RecordReader recordReader = inputFormat.getRecordReader(split, job); - Object key = recordReader.createKey(); - Object value = recordReader.createValue(); - while (recordReader.next(key, value)) { - int index = Math.abs(partitioner.getPartition(key, value, - splits.length)); - writers.get(index).append(key, value); - } - LOG.debug("Done with split " + i); - } - } finally { - for (SequenceFile.Writer wr : writers) { - wr.close(); - } - } - job.set("hama.partition.count", writers.size() + ""); - job.setInputFormat(SequenceFileInputFormat.class); - job.setInputPath(partitionedPath); - } - - return job; - } - private static boolean isProperSize(int numBspTask, int maxTasks) { return (numBspTask > 1 && numBspTask < maxTasks); } - private static String getPartitionName(int i) { - return "part-" + String.valueOf(100000 + i).substring(1, 6); - } - /** * Get the {@link CompressionType} for the output {@link SequenceFile}. * Index: core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (working copy) @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.util.KeyValuePair; + +public class PartitioningRunner extends + BSP { + private Configuration conf; + private int desiredNum; + private FileSystem fs = null; + private Path partitionDir; + private Map> values = new HashMap>(); + + @Override + public final void setup( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + this.conf = peer.getConfiguration(); + this.desiredNum = conf.getInt("desired.num.of.tasks", 1); + this.fs = FileSystem.get(conf); + + Path inputDir = new Path(conf.get("bsp.input.dir")); + if (fs.isFile(inputDir)) { + inputDir = inputDir.getParent(); + } + + this.partitionDir = new Path(inputDir + "/partitions"); + } + + @Override + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void bsp( + BSPPeer peer) + throws IOException, SyncException, InterruptedException { + Partitioner partitioner = getPartitioner(); + KeyValuePair pair = null; + + Class keyClass = null; + Class valueClass = null; + while ((pair = peer.readNext()) != null) { + if (keyClass == null && valueClass == null) { + keyClass = pair.getKey().getClass(); + valueClass = pair.getValue().getClass(); + } + + int index = Math.abs(partitioner.getPartition(pair.getKey(), + pair.getValue(), desiredNum)); + + if (!values.containsKey(index)) { + values.put(index, new HashMap()); + } + values.get(index).put(pair.getKey(), pair.getValue()); + } + + for (Map.Entry> e : values.entrySet()) { + Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-" + + peer.getPeerIndex()); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + destFile, keyClass, valueClass, CompressionType.NONE); + for (Map.Entry v : e.getValue().entrySet()) { + writer.append(v.getKey(), v.getValue()); + } + writer.close(); + } + + peer.sync(); + + if (peer.getPeerIndex() == 0) { + // merge files into one. + FileStatus[] status = fs.listStatus(partitionDir); + for (int j = 0; j < status.length; j++) { + FileStatus[] files = fs.listStatus(status[j].getPath()); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + new Path(partitionDir + "/" + getPartitionName(j)), keyClass, + valueClass, CompressionType.NONE); + + for (int i = 0; i < files.length; i++) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, + files[i].getPath(), conf); + + Writable key = ReflectionUtils.newInstance(keyClass, conf); + Writable value = ReflectionUtils.newInstance(valueClass, conf); + + while (reader.next(key, value)) { + writer.append(key, value); + } + reader.close(); + } + + writer.close(); + fs.delete(status[j].getPath(), true); + } + } + } + + @SuppressWarnings("rawtypes") + public Partitioner getPartitioner() { + return ReflectionUtils.newInstance(conf + .getClass("bsp.input.partitioner.class", HashPartitioner.class, + Partitioner.class), conf); + } + + private static String getPartitionName(int i) { + return "part-" + String.valueOf(100000 + i).substring(1, 6); + } + +} Index: examples/src/main/java/org/apache/hama/examples/InlinkCount.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/InlinkCount.java (revision 1419951) +++ examples/src/main/java/org/apache/hama/examples/InlinkCount.java (working copy) @@ -22,17 +22,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; -import org.apache.hama.bsp.TextInputFormat; -import org.apache.hama.graph.Edge; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; public class InlinkCount extends Vertex { @@ -51,34 +49,6 @@ } } - public static class InlinkCountTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { System.out.println("Usage: [tasks]"); System.exit(-1); @@ -104,14 +74,14 @@ } inlinkJob.setVertexClass(InlinkCount.class); - inlinkJob.setInputFormat(TextInputFormat.class); - inlinkJob.setInputKeyClass(LongWritable.class); - inlinkJob.setInputValueClass(Text.class); + + inlinkJob.setInputFormat(SequenceFileInputFormat.class); + inlinkJob.setInputKeyClass(Text.class); + inlinkJob.setInputValueClass(TextArrayWritable.class); inlinkJob.setVertexIDClass(Text.class); inlinkJob.setVertexValueClass(IntWritable.class); inlinkJob.setEdgeValueClass(NullWritable.class); - inlinkJob.setVertexInputReaderClass(InlinkCountTextReader.class); inlinkJob.setPartitioner(HashPartitioner.class); inlinkJob.setOutputFormat(SequenceFileOutputFormat.class); Index: examples/src/main/java/org/apache/hama/examples/MindistSearch.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/MindistSearch.java (revision 1419951) +++ examples/src/main/java/org/apache/hama/examples/MindistSearch.java (working copy) @@ -22,18 +22,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; /** * Finding the mindist vertex in a connected component. @@ -97,34 +96,6 @@ } - public static class MindistSearchCountReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { System.out .println("Usage: [maximum iterations (default 30)] [tasks]"); @@ -157,10 +128,10 @@ job.setVertexValueClass(Text.class); job.setEdgeValueClass(NullWritable.class); - job.setInputKeyClass(LongWritable.class); - job.setInputValueClass(Text.class); - job.setInputFormat(TextInputFormat.class); - job.setVertexInputReaderClass(MindistSearchCountReader.class); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(Text.class); + job.setInputValueClass(TextArrayWritable.class); + job.setPartitioner(HashPartitioner.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); Index: examples/src/main/java/org/apache/hama/examples/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/PageRank.java (revision 1419951) +++ examples/src/main/java/org/apache/hama/examples/PageRank.java (working copy) @@ -23,19 +23,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.HashPartitioner; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.bsp.TextOutputFormat; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.AverageAggregator; -import org.apache.hama.graph.Edge; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; /** * Real pagerank with dangling node contribution. @@ -99,37 +97,8 @@ } } - public static class PagerankTextReader extends - VertexInputReader { - - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ - @Override - public boolean parseVertex(LongWritable key, Text value, - Vertex vertex) throws Exception { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } - } - return true; - } - - } - private static void printUsage() { - System.out - .println("Usage: [damping factor (default 0.85)] [Epsilon (convergence error, default 0.001)] [Max iterations (default 30)] [tasks]"); + System.out.println("Usage: [tasks]"); System.exit(-1); } @@ -161,15 +130,11 @@ // set the defaults pageJob.setMaxIteration(30); pageJob.set("hama.pagerank.alpha", "0.85"); + pageJob.set("hama.graph.max.convergence.error", "0.001"); - if (args.length == 6) - pageJob.setNumBspTask(Integer.parseInt(args[5])); - if (args.length >= 5) - pageJob.setMaxIteration(Integer.parseInt(args[4])); - if (args.length >= 4) - pageJob.set("hama.graph.max.convergence.error", args[3]); - if (args.length >= 3) - pageJob.set("hama.pagerank.alpha", args[2]); + if (args.length == 3) { + pageJob.setNumBspTask(Integer.parseInt(args[3])); + } // error, dangling node probability sum pageJob.setAggregatorClass(AverageAggregator.class, @@ -179,10 +144,10 @@ pageJob.setVertexValueClass(DoubleWritable.class); pageJob.setEdgeValueClass(NullWritable.class); - pageJob.setInputKeyClass(LongWritable.class); - pageJob.setInputValueClass(Text.class); - pageJob.setInputFormat(TextInputFormat.class); - pageJob.setVertexInputReaderClass(PagerankTextReader.class); + pageJob.setInputFormat(SequenceFileInputFormat.class); + pageJob.setInputKeyClass(Text.class); + pageJob.setInputValueClass(TextArrayWritable.class); + pageJob.setPartitioner(HashPartitioner.class); pageJob.setOutputFormat(TextOutputFormat.class); pageJob.setOutputKeyClass(Text.class); Index: examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (revision 1419949) +++ examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hama.examples; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.util.Arrays; @@ -31,8 +29,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.examples.MindistSearch.MinTextCombiner; public class MindistSearchTest extends TestCase { @@ -95,22 +96,25 @@ } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - e.printStackTrace(); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, + new Path(INPUT), Text.class, TextArrayWritable.class); + + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (Exception e) { + e.printStackTrace(); } } Index: examples/src/test/java/org/apache/hama/examples/PageRankTest.java =================================================================== --- examples/src/test/java/org/apache/hama/examples/PageRankTest.java (revision 1419949) +++ examples/src/test/java/org/apache/hama/examples/PageRankTest.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hama.examples; import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; @@ -30,29 +28,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hama.HamaConfiguration; +import org.apache.hama.examples.util.SymmetricMatrixGen; import org.apache.hama.graph.GraphJob; import org.apache.hama.graph.GraphJobRunner; public class PageRankTest extends TestCase { - /** - * The graph looks like this (adjacency list, [] contains outlinks):
- * stackoverflow.com [yahoo.com]
- * google.com []
- * facebook.com [twitter.com, google.com, nasa.gov]
- * yahoo.com [nasa.gov, stackoverflow.com]
- * twitter.com [google.com, facebook.com]
- * nasa.gov [yahoo.com, stackoverflow.com]
- * youtube.com [google.com, yahoo.com]
- * Note that google is removed in this part mainly to test the repair - * functionality. - */ - String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", - "yahoo.com\tnasa.gov\tstackoverflow.com", - "twitter.com\tgoogle.com\tfacebook.com", - "nasa.gov\tyahoo.com\tstackoverflow.com", - "youtube.com\tgoogle.com\tyahoo.com" }; - private static String INPUT = "/tmp/pagerank-tmp.seq"; private static String TEXT_INPUT = "/tmp/pagerank.txt"; private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq"; @@ -101,24 +81,9 @@ } } - private void generateTestData() { - BufferedWriter bw = null; - try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + private void generateTestData() throws ClassNotFoundException, + InterruptedException, IOException { + SymmetricMatrixGen.main(new String[] { "20", "10", INPUT, "3" }); } private void deleteTempDirs() { Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1419949) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -165,9 +165,6 @@ .checkArgument(this.getConfiguration() .get(VERTEX_EDGE_VALUE_CLASS_ATTR) != null, "Please provide an edge value class, if you don't need one, use NullWritable!"); - Preconditions.checkArgument( - this.getConfiguration().get(VERTEX_GRAPH_INPUT_READER) != null, - "Please provide a vertex input reader!"); super.submit(); } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1419949) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -28,8 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -273,53 +273,24 @@ private void loadVertices( BSPPeer peer) throws IOException, SyncException, InterruptedException { - - /* - * Several partitioning constants begin - */ - - final VertexInputReader reader = (VertexInputReader) ReflectionUtils - .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER, - VertexInputReader.class), conf); - final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false); - final boolean runtimePartitioning = conf.getBoolean( - GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true); - - final long splitSize = peer.getSplitSize(); - final int partitioningSteps = partitionMultiSteps(peer, splitSize); - final long interval = splitSize / partitioningSteps; final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false); - /* - * Several partitioning constants end - */ - LOG.debug("vertex class: " + vertexClass); Vertex vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - long startPos = peer.getPos(); - if (startPos == 0) - startPos = 1L; - KeyValuePair next = null; - int steps = 1; while ((next = peer.readNext()) != null) { - boolean vertexFinished = false; - try { - vertexFinished = reader.parseVertex(next.getKey(), next.getValue(), - vertex); - } catch (Exception e) { - // LOG.error("exception occured during parsing vertex!" + e.toString()); - throw new IOException("exception occured during parsing vertex!" - + e.toString()); + V key = (V) next.getKey(); + Writable[] edges = ((ArrayWritable) next.getValue()).get(); + vertex.setVertexID(key); + List> edgeList = new ArrayList>(); + for (Writable edge : edges) { + edgeList.add(new Edge((V) edge, null)); } - - if (!vertexFinished) { - continue; - } + vertex.setEdges(edgeList); if (vertex.getEdges() == null) { if (selfReference) { @@ -334,44 +305,12 @@ vertex.addEdge(new Edge(vertex.getVertexID(), null)); } - if (runtimePartitioning) { - int partition = partitioner.getPartition(vertex.getVertexID(), - vertex.getValue(), peer.getNumPeers()); - peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex)); - } else { - vertex.setup(conf); - vertices.add(vertex); - } + vertex.setup(conf); + vertices.add(vertex); vertex = newVertexInstance(vertexClass, conf); vertex.runner = this; - - if (runtimePartitioning) { - if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval) { - peer.sync(); - steps++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - startPos = peer.getPos(); - } - } } - if (runtimePartitioning) { - peer.sync(); - - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - Vertex messagedVertex = (Vertex) msg.getVertex(); - messagedVertex.runner = this; - messagedVertex.setup(conf); - vertices.add(messagedVertex); - } - } LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps."); /* @@ -383,7 +322,7 @@ */ if (repairNeeded) { LOG.debug("Starting repair of this graph!"); - repair(peer, partitioningSteps, selfReference); + repair(peer, selfReference); } LOG.debug("Starting Vertex processing!"); @@ -392,83 +331,16 @@ @SuppressWarnings("unchecked") private void repair( BSPPeer peer, - int partitioningSteps, boolean selfReference) throws IOException, + boolean selfReference) throws IOException, SyncException, InterruptedException { - int multiSteps = 0; - MapWritable ssize = new MapWritable(); - ssize.put(new IntWritable(peer.getPeerIndex()), - new IntWritable(vertices.size())); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - int minVerticesSize = Integer.MAX_VALUE; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - int curr = ((IntWritable) e.getValue()).get(); - if (minVerticesSize > curr) { - minVerticesSize = curr; - } - } - } - - if (minVerticesSize < (partitioningSteps * 2)) { - multiSteps = minVerticesSize; - } else { - multiSteps = (partitioningSteps * 2); - } - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("steps"), new IntWritable(multiSteps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - Map> tmp = new HashMap>(); - int i = 0; - int syncs = 0; - for (Vertex v : vertices) { for (Edge e : v.getEdges()) { peer.send(v.getDestinationPeerName(e), new GraphJobMessage(e.getDestinationVertexID())); } - - if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) { - peer.sync(); - syncs++; - GraphJobMessage msg = null; - while ((msg = peer.getCurrentMessage()) != null) { - V vertexName = (V) msg.getVertexId(); - - Vertex newVertex = newVertexInstance(vertexClass, conf); - newVertex.setVertexID(vertexName); - newVertex.runner = this; - if (selfReference) { - newVertex.setEdges(Collections.singletonList(new Edge( - newVertex.getVertexID(), null))); - } else { - newVertex.setEdges(new ArrayList>(0)); - } - newVertex.setup(conf); - tmp.put(vertexName, newVertex); - - } - } - i++; } peer.sync(); @@ -488,7 +360,6 @@ newVertex.setup(conf); tmp.put(vertexName, newVertex); newVertex = null; - } for (Vertex e : vertices) { @@ -502,59 +373,6 @@ } /** - * Partitions our vertices through multiple supersteps to save memory. - */ - private int partitionMultiSteps( - BSPPeer peer, - long splitSize) throws IOException, SyncException, InterruptedException { - int multiSteps = 1; - - MapWritable ssize = new MapWritable(); - ssize - .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize)); - peer.send(getMasterTask(peer), new GraphJobMessage(ssize)); - ssize = null; - peer.sync(); - - if (isMasterTask(peer)) { - long maxSplitSize = 0L; - GraphJobMessage received = null; - while ((received = peer.getCurrentMessage()) != null) { - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - long curr = ((LongWritable) e.getValue()).get(); - if (maxSplitSize < curr) { - maxSplitSize = curr; - } - } - } - - int steps = (int) (maxSplitSize / conf.getLong( // 20 mb - "hama.graph.multi.step.partitioning.interval", 20000000)) + 1; - - for (String peerName : peer.getAllPeerNames()) { - MapWritable temp = new MapWritable(); - temp.put(new Text("max"), new IntWritable(steps)); - peer.send(peerName, new GraphJobMessage(temp)); - } - } - peer.sync(); - - GraphJobMessage received = peer.getCurrentMessage(); - MapWritable x = received.getMap(); - for (Entry e : x.entrySet()) { - multiSteps = ((IntWritable) e.getValue()).get(); - } - - if (isMasterTask(peer)) { - peer.getCounter(GraphJobCounter.MULTISTEP_PARTITIONING).increment( - multiSteps); - } - - return multiSteps; - } - - /** * Counts vertices globally by sending the count of vertices in the map to the * other peers. */ Index: graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (revision 1419949) +++ graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (working copy) @@ -17,38 +17,36 @@ */ package org.apache.hama.graph; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.ClusterStatus; import org.apache.hama.bsp.HashPartitioner; +import org.apache.hama.bsp.SequenceFileInputFormat; import org.apache.hama.bsp.SequenceFileOutputFormat; import org.apache.hama.bsp.TestBSPMasterGroomServer; -import org.apache.hama.bsp.TextInputFormat; +import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.example.PageRank; public class TestSubmitGraphJob extends TestBSPMasterGroomServer { String[] input = new String[] { "stackoverflow.com\tyahoo.com", - "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]", - "yahoo.com\tnasa.gov\tstackoverflow.com]", - "twitter.com\tgoogle.com\tfacebook.com]", - "nasa.gov\tyahoo.com\tstackoverflow.com]", - "youtube.com\tgoogle.com\tyahoo.com]" }; + "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov", + "yahoo.com\tnasa.gov\tstackoverflow.com", + "twitter.com\tgoogle.com\tfacebook.com", + "nasa.gov\tyahoo.com\tstackoverflow.com", + "youtube.com\tgoogle.com\tyahoo.com" }; - private static String INPUT = "/tmp/pagerank-real-tmp.seq"; - private static String OUTPUT = "/tmp/pagerank-real-out"; + private static String INPUT = "/tmp/pagerank/real-tmp.seq"; + private static String OUTPUT = "/tmp/pagerank/real-out"; @SuppressWarnings("unchecked") @Override @@ -60,7 +58,7 @@ configuration.setInt("hama.graph.multi.step.partitioning.interval", 30); GraphJob bsp = new GraphJob(configuration, PageRank.class); - bsp.setInputPath(new Path(INPUT)); + bsp.setInputPath(new Path("/tmp/pagerank")); bsp.setOutputPath(new Path(OUTPUT)); BSPJobClient jobClient = new BSPJobClient(configuration); configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); @@ -77,14 +75,15 @@ bsp.setAggregatorClass(AverageAggregator.class, PageRank.DanglingNodeAggregator.class); + bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setInputKeyClass(Text.class); + bsp.setInputValueClass(TextArrayWritable.class); + bsp.setVertexIDClass(Text.class); bsp.setVertexValueClass(DoubleWritable.class); bsp.setEdgeValueClass(NullWritable.class); - bsp.setVertexInputReaderClass(PageRank.PagerankTextReader.class); - bsp.setInputFormat(TextInputFormat.class); - bsp.setInputKeyClass(LongWritable.class); - bsp.setInputValueClass(Text.class); bsp.setPartitioner(HashPartitioner.class); bsp.setOutputFormat(SequenceFileOutputFormat.class); bsp.setOutputKeyClass(Text.class); @@ -123,26 +122,25 @@ } private void generateTestData() { - BufferedWriter bw = null; try { - bw = new BufferedWriter(new FileWriter(INPUT)); - for (String s : input) { - bw.write(s + "\n"); - } - } catch (IOException e) { - e.printStackTrace(); - } finally { - if (bw != null) { - try { - bw.close(); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), + new Path(INPUT), Text.class, TextArrayWritable.class); - File file = new File(INPUT); - LOG.info("Temp file length: " + file.length()); - - } catch (IOException e) { - e.printStackTrace(); + for (int i = 0; i < input.length; i++) { + String[] x = input[i].split("\t"); + Text key = new Text(x[0]); + Writable[] values = new Writable[x.length - 1]; + for (int j = 1; j < x.length; j++) { + values[j - 1] = new Text(x[j]); } + TextArrayWritable value = new TextArrayWritable(); + value.set(values); + writer.append(key, value); } + + writer.close(); + } catch (IOException e) { + e.printStackTrace(); } }