Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java (Revision 0) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java (Revision 0) @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples.graph; + +import org.apache.hadoop.io.ArrayWritable; + +public class ShortestPathVertexArrayWritable extends ArrayWritable { + + public ShortestPathVertexArrayWritable() { + super(ShortestPathVertex.class); + } + +} Index: examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java (Arbeitskopie) @@ -24,8 +24,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,8 +41,6 @@ import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.examples.graph.partitioning.PartitionableWritable; -import org.apache.hama.examples.graph.partitioning.VertexPartitioner; public abstract class PageRankBase extends BSP { public static final Log LOG = LogFactory.getLog(PageRankBase.class); @@ -56,8 +52,6 @@ protected static double DAMPING_FACTOR = 0.85; protected static double EPSILON = 0.001; - private static final VertexPartitioner partitioner = new VertexPartitioner(); - static void mapAdjacencyList(Configuration conf, BSPPeer peer, HashMap> realAdjacencyList, HashMap tentativePagerank, @@ -84,17 +78,6 @@ reader.close(); } - static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf, - String[] groomNames) throws IOException, InstantiationException, - IllegalAccessException, InterruptedException { - - // set the partitioning vertex class - conf.setClass("hama.partitioning.vertex.class", Vertex.class, - PartitionableWritable.class); - - return (HamaConfiguration) partitioner.partition(conf, in, groomNames); - } - static HamaConfiguration partitionExample(Path out, HamaConfiguration conf, String[] groomNames) throws IOException, InstantiationException, IllegalAccessException, InterruptedException { @@ -136,25 +119,25 @@ } writer.close(); - - return partitionTextFile(input, conf, groomNames); + + return null; } - static void savePageRankMap(BSPPeer peer, Configuration conf, - Map tentativePagerank) throws IOException { - FileSystem fs = FileSystem.get(conf); - Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp" - + Path.SEPARATOR - + peer.getPeerName().split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]); - fs.delete(outPath, true); - final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, - outPath, Text.class, DoubleWritable.class); - for (Entry row : tentativePagerank.entrySet()) { - out.append(new Text(row.getKey().getName()), - new DoubleWritable(row.getValue())); - } - out.close(); - } +// static void savePageRankMap(BSPPeer peer, Configuration conf, +// Map tentativePagerank) throws IOException { +// FileSystem fs = FileSystem.get(conf); +// Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp" +// + Path.SEPARATOR +// + peer.getPeerName().split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]); +// fs.delete(outPath, true); +// final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, +// outPath, Text.class, DoubleWritable.class); +// for (Entry row : tentativePagerank.entrySet()) { +// out.append(new Text(row.getKey().getName()), +// new DoubleWritable(row.getValue())); +// } +// out.close(); +// } static void printOutput(FileSystem fs, Configuration conf) throws IOException { LOG.info("-------------------- RESULTS --------------------"); Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java (Arbeitskopie) @@ -17,76 +17,110 @@ */ package org.apache.hama.examples.graph; +import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; + public class ShortestPathsGraphLoader { - - static Map> loadGraph() { - Map> adjacencyList = new HashMap>(); - String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg", - "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg", - "Muenchen" }; + static void loadGraph(Configuration conf, Path inputPath) { + SequenceFile.Writer out = null; + try { + out = SequenceFile.createWriter(FileSystem.get(conf), conf, new Path( + inputPath, "sssp-example"), ShortestPathVertex.class, + ShortestPathVertexArrayWritable.class, CompressionType.NONE, null, + null); - for (String city : cities) { - if (city.equals("Frankfurt")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(85, "Mannheim")); - list.add(new ShortestPathVertex(173, "Kassel")); - list.add(new ShortestPathVertex(217, "Wuerzburg")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Stuttgart")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(183, "Nuernberg")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Kassel")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(502, "Muenchen")); - list.add(new ShortestPathVertex(173, "Frankfurt")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Erfurt")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(186, "Wuerzburg")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Wuerzburg")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(217, "Frankfurt")); - list.add(new ShortestPathVertex(168, "Erfurt")); - list.add(new ShortestPathVertex(103, "Nuernberg")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Mannheim")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(80, "Karlsruhe")); - list.add(new ShortestPathVertex(85, "Frankfurt")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Karlsruhe")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(250, "Augsburg")); - list.add(new ShortestPathVertex(80, "Mannheim")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Augsburg")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(250, "Karlsruhe")); - list.add(new ShortestPathVertex(84, "Muenchen")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Nuernberg")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(183, "Stuttgart")); - list.add(new ShortestPathVertex(167, "Muenchen")); - list.add(new ShortestPathVertex(103, "Wuerzburg")); - adjacencyList.put(new ShortestPathVertex(0, city), list); - } else if (city.equals("Muenchen")) { - List list = new LinkedList(); - list.add(new ShortestPathVertex(167, "Nuernberg")); - list.add(new ShortestPathVertex(173, "Kassel")); - list.add(new ShortestPathVertex(84, "Augsburg")); - adjacencyList.put(new ShortestPathVertex(0, city), list); + Map> adjacencyList = new HashMap>(); + String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg", + "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", + "Augsburg", "Muenchen" }; + + for (String city : cities) { + if (city.equals("Frankfurt")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(85, "Mannheim")); + list.add(new ShortestPathVertex(173, "Kassel")); + list.add(new ShortestPathVertex(217, "Wuerzburg")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Stuttgart")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(183, "Nuernberg")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Kassel")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(502, "Muenchen")); + list.add(new ShortestPathVertex(173, "Frankfurt")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Erfurt")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(186, "Wuerzburg")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Wuerzburg")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(217, "Frankfurt")); + list.add(new ShortestPathVertex(168, "Erfurt")); + list.add(new ShortestPathVertex(103, "Nuernberg")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Mannheim")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(80, "Karlsruhe")); + list.add(new ShortestPathVertex(85, "Frankfurt")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Karlsruhe")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(250, "Augsburg")); + list.add(new ShortestPathVertex(80, "Mannheim")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Augsburg")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(250, "Karlsruhe")); + list.add(new ShortestPathVertex(84, "Muenchen")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Nuernberg")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(183, "Stuttgart")); + list.add(new ShortestPathVertex(167, "Muenchen")); + list.add(new ShortestPathVertex(103, "Wuerzburg")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } else if (city.equals("Muenchen")) { + List list = new LinkedList(); + list.add(new ShortestPathVertex(167, "Nuernberg")); + list.add(new ShortestPathVertex(173, "Kassel")); + list.add(new ShortestPathVertex(84, "Augsburg")); + adjacencyList.put(new ShortestPathVertex(0, city), list); + } } + + for (Entry> entry : adjacencyList + .entrySet()) { + ShortestPathVertex[] array = entry.getValue().toArray( + new ShortestPathVertex[entry.getValue().size()]); + ShortestPathVertexArrayWritable writable = new ShortestPathVertexArrayWritable(); + writable.set(array); + out.append(entry.getKey(), writable); + } + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + if (out != null) { + out.close(); + } + } catch (IOException e) { + e.printStackTrace(); + } } - return adjacencyList; + } } Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java (Arbeitskopie) @@ -24,7 +24,7 @@ public final class ShortestPathVertex extends Vertex { private int weight; - private Integer cost; + private int cost = Integer.MAX_VALUE; public ShortestPathVertex() { } @@ -34,7 +34,7 @@ this.weight = weight; } - public ShortestPathVertex(int weight, String name, Integer cost) { + public ShortestPathVertex(int weight, String name, int cost) { super(name); this.weight = weight; this.cost = cost; @@ -44,7 +44,7 @@ return name; } - public Integer getCost() { + public int getCost() { return cost; } @@ -52,10 +52,6 @@ this.cost = cost; } - public int getId() { - return id; - } - public int getWeight() { return weight; } Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java (Arbeitskopie) @@ -40,8 +40,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; -import org.apache.hama.examples.graph.partitioning.PartitionableWritable; -import org.apache.hama.examples.graph.partitioning.ShortestPathVertexPartitioner; public abstract class ShortestPathsBase extends BSP { @@ -54,8 +52,6 @@ public static final String NAME_VALUE_SEPARATOR = ":"; public static final String MASTER_TASK = "master.groom"; - private static final ShortestPathVertexPartitioner partitioner = new ShortestPathVertexPartitioner(); - /** * When finished we just writing a sequencefile of the vertex name and the * cost. @@ -161,19 +157,7 @@ writer.close(); - return partition(conf, input, groomNames); + return null; } - protected final static Configuration partition(Configuration conf, - Path fileToPartition, String[] groomNames) throws IOException, - InstantiationException, IllegalAccessException, InterruptedException { - - // set the partitioning vertex class - conf.setClass("hama.partitioning.vertex.class", ShortestPathVertex.class, - PartitionableWritable.class); - - return partitioner.partition(conf, fileToPartition, groomNames); - - } - } Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/AdjacentPair.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/AdjacentPair.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/AdjacentPair.java (Arbeitskopie) @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - - -public final class AdjacentPair { - - final K vertex; - final K[] adjacentVertices; - - - public AdjacentPair(K vertex, K[] adjacentVertices) { - super(); - this.vertex = vertex; - this.adjacentVertices = adjacentVertices; - } - - - -} Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/AbstractGraphPartitioner.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/AbstractGraphPartitioner.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/AbstractGraphPartitioner.java (Arbeitskopie) @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hama.examples.graph.ShortestPaths; -import org.apache.hama.examples.graph.Vertex; - -/** - * This partitioner partitions the file data which should be in text form into a - * sequencefile. - * - * TODO: this should be extended with InputFormat stuff so we can parse every - * format. - * - */ -public abstract class AbstractGraphPartitioner { - - public static final Log LOG = LogFactory - .getLog(AbstractGraphPartitioner.class); - - private FileSystem fs; - - private Class vertexClass; - - @SuppressWarnings("unchecked") - public Configuration partition(Configuration conf, Path file, - String[] groomNames) throws InstantiationException, - IllegalAccessException, IOException, InterruptedException { - - fs = FileSystem.get(conf); - - vertexClass = (Class) conf.getClass("hama.partitioning.vertex.class", - Vertex.class); - - int sizeOfCluster = groomNames.length; - - // setup the paths where the grooms can find their input - List partPaths = new ArrayList(sizeOfCluster); - List writers = new ArrayList( - sizeOfCluster); - StringBuilder groomNameBuilder = new StringBuilder(); - // this loop adds partition paths for the writers and sets the appropriate - // groom names to the files and configuration - for (String entry : groomNames) { - partPaths.add(new Path(file.getParent().toString() + Path.SEPARATOR - + ShortestPaths.PARTED + Path.SEPARATOR - + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0])); - conf.set(ShortestPaths.MASTER_TASK, entry); - groomNameBuilder.append(entry + ";"); - } - // put every peer into the configuration - conf.set(ShortestPaths.BSP_PEERS, groomNameBuilder.toString()); - // create a seq writer for the files - for (Path p : partPaths) { - fs.delete(p, true); - writers.add(SequenceFile.createWriter(fs, conf, p, - ObjectWritable.class, ArrayWritable.class,CompressionType.NONE)); - } - - BufferedReader br = null; - try { - // read the input - br = new BufferedReader(new InputStreamReader(fs.open(file))); - - long numLines = 0L; - String line = null; - while ((line = br.readLine()) != null) { - // let the subclass process - AdjacentPair pair = process(line); - // check to which partition the vertex belongs - int mod = Math.abs(pair.vertex.getId() % sizeOfCluster); - writers.get(mod).append(new ObjectWritable(vertexClass, pair.vertex), - new ArrayWritable(vertexClass, pair.adjacentVertices)); - numLines++; - if (numLines % 100000 == 0) { - LOG.debug("Partitioned " + numLines + " of vertices!"); - } - } - - for (Path p : partPaths) { - conf.set("in.path." + p.getName(), p.toString()); - } - conf.set("num.vertices", "" + numLines); - LOG.debug("Partitioned a total of " + numLines + " vertices!"); - - return conf; - } finally { - // close our ressources - if (br != null) - br.close(); - - for (SequenceFile.Writer w : writers) - w.close(); - - fs.close(); - } - } - - protected abstract AdjacentPair process(String line); - -} Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/PartitionableWritable.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/PartitionableWritable.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/PartitionableWritable.java (Arbeitskopie) @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import org.apache.hadoop.io.Writable; - -public interface PartitionableWritable extends Writable { - - public int getId(); - -} Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/ShortestPathVertexPartitioner.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/ShortestPathVertexPartitioner.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/ShortestPathVertexPartitioner.java (Arbeitskopie) @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import org.apache.hama.examples.graph.ShortestPathVertex; - -public class ShortestPathVertexPartitioner extends - AbstractGraphPartitioner { - - @Override - protected AdjacentPair process(String line) { - - String[] vertices = line.split("\t"); - - ShortestPathVertex v = new ShortestPathVertex(0, vertices[0]); - - ShortestPathVertex[] adjacents = new ShortestPathVertex[vertices.length - 1]; - - for (int i = 1; i < vertices.length; i++) { - String[] vertexAndWeight = vertices[i].split(":"); - adjacents[i - 1] = new ShortestPathVertex( - Integer.valueOf(vertexAndWeight[1]), vertexAndWeight[0]); - } - - return new AdjacentPair(v, adjacents); - } - -} Index: examples/src/main/java/org/apache/hama/examples/graph/partitioning/VertexPartitioner.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/partitioning/VertexPartitioner.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/partitioning/VertexPartitioner.java (Arbeitskopie) @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.examples.graph.partitioning; - -import org.apache.hama.examples.graph.Vertex; - -public class VertexPartitioner extends AbstractGraphPartitioner { - - @Override - protected AdjacentPair process(String line) { - - String[] vertices = line.split("\t"); - - Vertex v = new Vertex(vertices[0]); - Vertex[] adjacents = new Vertex[vertices.length - 1]; - - for (int i = 1; i < vertices.length; i++) { - adjacents[i - 1] = new Vertex(vertices[i]); - } - - return new AdjacentPair(v, adjacents); - } - - -} Index: examples/src/main/java/org/apache/hama/examples/graph/PageRank.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/PageRank.java (Arbeitskopie) @@ -28,7 +28,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.BSPPeer; @@ -38,7 +42,7 @@ import org.apache.hama.bsp.RecordReader; import org.apache.zookeeper.KeeperException; -public class PageRank extends PageRankBase { +public class PageRank extends BSP { public static final Log LOG = LogFactory.getLog(PageRank.class); private final HashMap> adjacencyList = new HashMap>(); @@ -51,71 +55,71 @@ @Override public void setup(BSPPeer peer) { Configuration conf = peer.getConfiguration(); - numOfVertices = Integer.parseInt(conf.get("num.vertices")); - DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); - ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; - EPSILON = Double.parseDouble(conf.get("epsilon.error")); - MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); - peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";"); +// numOfVertices = Integer.parseInt(conf.get("num.vertices")); +// DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); +// ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; +// EPSILON = Double.parseDouble(conf.get("epsilon.error")); +// MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations")); +// peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";"); } @Override public void bsp(BSPPeer peer) throws IOException, KeeperException, InterruptedException { - String master = peer.getConfiguration().get(MASTER_TASK); - // setup the datasets - PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList, - tentativePagerank, lookupMap); - - // while the error not converges against epsilon do the pagerank stuff - double error = 1.0; - int iteration = 0; - // if MAX_ITERATIONS are set to 0, ignore the iterations and just go - // with the error - while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS) - || error >= EPSILON) { - peer.sync(); - - if (iteration >= 1) { - // copy the old pagerank to the backup - copyTentativePageRankToBackup(); - // sum up all incoming messages for a vertex - HashMap sumMap = new HashMap(); - DoubleMessage msg = null; - while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) { - Vertex k = lookupMap.get(msg.getTag()); - if (!sumMap.containsKey(k)) { - sumMap.put(k, msg.getData()); - } else { - sumMap.put(k, msg.getData() + sumMap.get(k)); - } - } - // pregel formula: - // ALPHA = 0.15 / NumVertices() - // P(i) = ALPHA + 0.85 * sum - for (Entry entry : sumMap.entrySet()) { - tentativePagerank.put(entry.getKey(), ALPHA - + (entry.getValue() * DAMPING_FACTOR)); - } - - // determine the error and send this to the master - double err = determineError(); - error = broadcastError(peer, master, err); - } - // in every step send the tentative pagerank of a vertex to its - // adjacent vertices - for (Vertex vertex : adjacencyList.keySet()) { - sendMessageToNeighbors(peer, vertex); - } - - iteration++; - } - - // Clears all queues entries. - peer.clear(); - // finally save the chunk of pageranks - PageRankBase.savePageRankMap(peer, peer.getConfiguration(), - lastTentativePagerank); +// String master = peer.getConfiguration().get(MASTER_TASK); +// // setup the datasets +// PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList, +// tentativePagerank, lookupMap); +// +// // while the error not converges against epsilon do the pagerank stuff +// double error = 1.0; +// int iteration = 0; +// // if MAX_ITERATIONS are set to 0, ignore the iterations and just go +// // with the error +// while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS) +// || error >= EPSILON) { +// peer.sync(); +// +// if (iteration >= 1) { +// // copy the old pagerank to the backup +// copyTentativePageRankToBackup(); +// // sum up all incoming messages for a vertex +// HashMap sumMap = new HashMap(); +// DoubleMessage msg = null; +// while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) { +// Vertex k = lookupMap.get(msg.getTag()); +// if (!sumMap.containsKey(k)) { +// sumMap.put(k, msg.getData()); +// } else { +// sumMap.put(k, msg.getData() + sumMap.get(k)); +// } +// } +// // pregel formula: +// // ALPHA = 0.15 / NumVertices() +// // P(i) = ALPHA + 0.85 * sum +// for (Entry entry : sumMap.entrySet()) { +// tentativePagerank.put(entry.getKey(), ALPHA +// + (entry.getValue() * DAMPING_FACTOR)); +// } +// +// // determine the error and send this to the master +// double err = determineError(); +// error = broadcastError(peer, master, err); +// } +// // in every step send the tentative pagerank of a vertex to its +// // adjacent vertices +// for (Vertex vertex : adjacencyList.keySet()) { +// sendMessageToNeighbors(peer, vertex); +// } +// +// iteration++; +// } +// +// // Clears all queues entries. +// peer.clear(); +// // finally save the chunk of pageranks +// PageRankBase.savePageRankMap(peer, peer.getConfiguration(), +// lastTentativePagerank); } private double broadcastError(BSPPeer peer, String master, double error) @@ -161,11 +165,11 @@ throws IOException { List outgoingEdges = adjacencyList.get(v); for (Vertex adjacent : outgoingEdges) { - int mod = Math.abs(adjacent.getId() % peerNames.length); +// int mod = Math.abs(adjacent.getId() % peerNames.length); // send a message of the tentative pagerank divided by the size of // the outgoing edges to all adjacents - peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(), - tentativePagerank.get(v) / outgoingEdges.size())); +// peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(), +// tentativePagerank.get(v) / outgoingEdges.size())); } } @@ -222,11 +226,11 @@ String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]); if (conf.get("in.path") == null) { - conf = PageRankBase.partitionExample(new Path(conf.get("out.path")), - conf, grooms); +// conf = PageRankBase.partitionExample(new Path(conf.get("out.path")), +// conf, grooms); } else { - conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")), - conf, grooms); +// conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")), +// conf, grooms); } BSPJob job = new BSPJob(conf); Index: examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java (Arbeitskopie) @@ -18,57 +18,72 @@ package org.apache.hama.examples.graph; import java.io.IOException; -import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPJob; -import org.apache.hama.bsp.BSPJobClient; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.BooleanMessage; -import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.IntegerMessage; -import org.apache.hama.bsp.OutputCollector; -import org.apache.hama.bsp.RecordReader; -import org.apache.hama.examples.RandBench; +import org.apache.hama.bsp.SequenceFileInputFormat; +import org.apache.hama.bsp.SequenceFileOutputFormat; +import org.apache.hama.util.KeyValuePair; import org.apache.zookeeper.KeeperException; -public class ShortestPaths extends ShortestPathsBase { +public class ShortestPaths extends + BSP { public static final Log LOG = LogFactory.getLog(ShortestPaths.class); + public static final String SHORTEST_PATHS_START_VERTEX_NAME = "shortest.paths.start.vertex.name"; - private final HashMap> adjacencyList = new HashMap>(); private final HashMap vertexLookupMap = new HashMap(); - private String[] peerNames; + private final HashMap adjacencyList = new HashMap(); + private String masterTaskName; + @Override - public void bsp(BSPPeer peer) + public void setup( + BSPPeer peer) throws IOException, KeeperException, InterruptedException { - // map our input into ram - mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList, - vertexLookupMap); - // parse the configuration to get the peerNames - parsePeerNames(peer.getConfiguration()); - // get our master groom - String master = peer.getConfiguration().get(MASTER_TASK); + KeyValuePair next = null; + while ((next = peer.readNext()) != null) { + adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue() + .toArray()); + vertexLookupMap.put(next.getKey().getName(), next.getKey()); + } + + masterTaskName = peer.getPeerName(0); + // initial message bypass - ShortestPathVertex v = vertexLookupMap.get(peer.getConfiguration().get( - SHORTEST_PATHS_START_VERTEX_ID)); - if (v != null) { - v.setCost(0); - sendMessageToNeighbors(peer, v); + ShortestPathVertex startVertex = vertexLookupMap.get(peer + .getConfiguration().get(SHORTEST_PATHS_START_VERTEX_NAME)); + + if (startVertex != null) { + startVertex.setCost(0); + sendMessageToNeighbors(peer, startVertex); } + } + + @Override + public void bsp( + BSPPeer peer) + throws IOException, KeeperException, InterruptedException { boolean updated = true; while (updated) { int updatesMade = 0; @@ -78,6 +93,9 @@ Deque updatedQueue = new LinkedList(); while ((msg = (IntegerMessage) peer.getCurrentMessage()) != null) { ShortestPathVertex vertex = vertexLookupMap.get(msg.getTag()); + if (vertex == null) { + LOG.fatal("This should never happen!"); + } // check if we need an distance update if (vertex.getCost() > msg.getData()) { updatesMade++; @@ -86,23 +104,27 @@ } } // synchonize with all grooms if there were updates - updated = broadcastUpdatesMade(peer, master, updatesMade); + updated = broadcastUpdatesMade(peer, updatesMade); // send updates to the adjacents of the updated vertices for (ShortestPathVertex vertex : updatedQueue) { sendMessageToNeighbors(peer, vertex); } } - // finished, finally save our map to DFS. - saveVertexMap(peer.getConfiguration(), peer, adjacencyList); } - /** - * Parses the peer names to fix inconsistency in bsp peer names from context. - * - * @param conf - */ - private void parsePeerNames(Configuration conf) { - peerNames = conf.get(BSP_PEERS).split(";"); + @Override + public void cleanup( + BSPPeer peer) { + // write our map into hdfs + for (Entry entry : adjacencyList + .entrySet()) { + try { + peer.write(new Text(entry.getKey().getName()), new IntWritable(entry + .getKey().getCost())); + } catch (IOException e) { + e.printStackTrace(); + } + } } /** @@ -119,11 +141,12 @@ * @throws KeeperException * @throws InterruptedException */ - private boolean broadcastUpdatesMade(BSPPeer peer, String master, int updates) - throws IOException, KeeperException, InterruptedException { - peer.send(master, new IntegerMessage(peer.getPeerName(), updates)); + private boolean broadcastUpdatesMade( + BSPPeer peer, + int updates) throws IOException, KeeperException, InterruptedException { + peer.send(masterTaskName, new IntegerMessage(peer.getPeerName(), updates)); peer.sync(); - if (peer.getPeerName().equals(master)) { + if (peer.getPeerName().equals(masterTaskName)) { int count = 0; IntegerMessage message; while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) { @@ -152,21 +175,49 @@ * @param id The vertex to all adjacent vertices the new cost has to be send. * @throws IOException */ - private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id) - throws IOException { - List outgoingEdges = adjacencyList.get(id); + private void sendMessageToNeighbors( + BSPPeer peer, + ShortestPathVertex id) throws IOException { + ShortestPathVertex[] outgoingEdges = adjacencyList.get(id); for (ShortestPathVertex adjacent : outgoingEdges) { - int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length)); - peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id - .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost() - + adjacent.getWeight())); + int mod = Math.abs((adjacent.hashCode() % peer.getAllPeerNames().length)); + peer.send(peer.getPeerName(mod), new IntegerMessage(adjacent.getName(), + id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost() + + adjacent.getWeight())); } } + /** + * Just a reader of the vertexMap in DFS. Output going to STDOUT. + * + * @param fs + * @param conf + * @throws IOException + */ + protected final static void printOutput(FileSystem fs, Configuration conf) + throws IOException { + System.out.println("-------------------- RESULTS --------------------"); + FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir"))); + for (FileStatus status : stati) { + if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) { + Path path = status.getPath(); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + Text key = new Text(); + IntWritable value = new IntWritable(); + while (reader.next(key, value)) { + if (value.get() != Integer.MAX_VALUE) { + System.out.println(key.toString() + " | " + value.get()); + } + } + reader.close(); + } + } + } + public static void printUsage() { System.out.println("Single Source Shortest Path Example:"); System.out - .println(" "); + .println(" "); } public static void main(String[] args) throws IOException, @@ -174,75 +225,61 @@ IllegalAccessException { printUsage(); - // BSP job configuration HamaConfiguration conf = new HamaConfiguration(); - conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt"); - System.out.println("Setting default start vertex to \"Frankfurt\"!"); - conf.set(OUT_PATH, "sssp/output"); - Path adjacencyListPath = null; + BSPJob bsp = new BSPJob(conf); + conf.set(SHORTEST_PATHS_START_VERTEX_NAME, "Frankfurt"); + + bsp.setOutputPath(new Path("sssp/output")); + boolean inputGiven = false; + boolean outputEnabled = true; if (args.length > 0) { - conf.set(SHORTEST_PATHS_START_VERTEX_ID, args[0]); - System.out.println("Setting start vertex to " + args[0] + "!"); - + conf.set(SHORTEST_PATHS_START_VERTEX_NAME, args[0]); if (args.length > 1) { - conf.set(OUT_PATH, args[1]); System.out.println("Using new output folder: " + args[1]); + bsp.setOutputPath(new Path(args[1])); } - if (args.length > 2) { - adjacencyListPath = new Path(args[2]); + bsp.setInputPath(new Path(args[2])); + outputEnabled = false; + inputGiven = true; } + if (args.length > 3) { + outputEnabled = Boolean.parseBoolean(args[4]); + } + } + if (!inputGiven) { + Path tmp = new Path("sssp/input"); + FileSystem.get(conf).delete(tmp, true); + ShortestPathsGraphLoader.loadGraph(conf, tmp); + bsp.setInputPath(tmp); } - Map> adjacencyList = null; - if (adjacencyListPath == null) - adjacencyList = ShortestPathsGraphLoader.loadGraph(); + System.out.println("Setting start vertex to " + + conf.get(SHORTEST_PATHS_START_VERTEX_NAME) + "!"); - BSPJob bsp = new BSPJob(conf, RandBench.class); + bsp.setInputFormat(SequenceFileInputFormat.class); + bsp.setPartitioner(HashPartitioner.class); + bsp.setOutputFormat(SequenceFileOutputFormat.class); + bsp.setOutputKeyClass(Text.class); + bsp.setOutputValueClass(IntWritable.class); + bsp.setNumBspTask(3); + // Set the job name bsp.setJobName("Single Source Shortest Path"); bsp.setBspClass(ShortestPaths.class); - // Set the task size as a number of GroomServer - BSPJobClient jobClient = new BSPJobClient(conf); - ClusterStatus cluster = jobClient.getClusterStatus(true); - - Collection activeGrooms = cluster.getActiveGroomNames().keySet(); - String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]); - - LOG.info("Starting data partitioning..."); - if (adjacencyList == null) { - conf = (HamaConfiguration) partition(conf, adjacencyListPath, grooms); - } else { - conf = (HamaConfiguration) partitionExample(conf, adjacencyList, grooms); - } - LOG.info("Finished!"); - - bsp.setNumBspTask(cluster.getGroomServers()); - long startTime = System.currentTimeMillis(); if (bsp.waitForCompletion(true)) { System.out.println("Job Finished in " + (double) (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); - printOutput(FileSystem.get(conf), conf); + if (outputEnabled) { + printOutput(FileSystem.get(conf), conf); + } } - } - @Override - public void cleanup(BSPPeer peer) { - // TODO Auto-generated method stub - } - - @Override - public void setup(BSPPeer peer) throws IOException, KeeperException, - InterruptedException { - // TODO Auto-generated method stub - - } - } Index: examples/src/main/java/org/apache/hama/examples/graph/Vertex.java =================================================================== --- examples/src/main/java/org/apache/hama/examples/graph/Vertex.java (Revision 1200329) +++ examples/src/main/java/org/apache/hama/examples/graph/Vertex.java (Arbeitskopie) @@ -21,11 +21,10 @@ import java.io.DataOutput; import java.io.IOException; -import org.apache.hama.examples.graph.partitioning.PartitionableWritable; +import org.apache.hadoop.io.Writable; -public class Vertex implements PartitionableWritable { +public class Vertex implements Writable { - protected int id; protected String name; public Vertex() { @@ -35,24 +34,21 @@ public Vertex(String name) { super(); this.name = name; - this.id = name.hashCode(); } @Override public void readFields(DataInput in) throws IOException { - this.id = in.readInt(); this.name = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { - out.writeInt(id); out.writeUTF(name); } @Override public int hashCode() { - return id; + return name.hashCode(); } @Override @@ -69,11 +65,6 @@ return true; } - @Override - public int getId() { - return id; - } - public String getName() { return name; }