Index: pom.xml =================================================================== --- pom.xml (revision 1421667) +++ pom.xml (working copy) @@ -243,6 +243,7 @@ + commons core graph examples Index: commons =================================================================== --- commons (revision 0) +++ commons (working copy) Property changes on: commons ___________________________________________________________________ Added: svn:ignore ## -0,0 +1,2 ## +docs +target Index: commons/pom.xml =================================================================== --- commons/pom.xml (revision 0) +++ commons/pom.xml (working copy) @@ -0,0 +1,54 @@ + + + + + + org.apache.hama + hama-parent + 0.7.0-SNAPSHOT + + + 4.0.0 + org.apache.hama + hama-commons + commons + 0.7.0-SNAPSHOT + jar + + + + org.apache.hadoop + hadoop-core + + + org.apache.hadoop + hadoop-test + + + + + hama-commons-${project.version} + + + + maven-surefire-plugin + + + + + Index: commons/src/main/java/org/apache/hama/commons/graph/AbstractVertex.java =================================================================== --- commons/src/main/java/org/apache/hama/commons/graph/AbstractVertex.java (revision 0) +++ commons/src/main/java/org/apache/hama/commons/graph/AbstractVertex.java (working copy) @@ -0,0 +1,41 @@ +package org.apache.hama.commons.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public abstract class AbstractVertex implements Writable { + protected V vertexID; + protected M value; + + public V getVertexID() { + return vertexID; + } + + public void setVertexID(V vertexID) { + this.vertexID = vertexID; + } + + public M getValue() { + return value; + } + + public void setValue(M value) { + this.value = value; + } + + @Override + public void readFields(DataInput arg0) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void write(DataOutput arg0) throws IOException { + // TODO Auto-generated method stub + + } + +} Index: commons/src/main/java/org/apache/hama/commons/graph/VertexInputReader.java =================================================================== --- commons/src/main/java/org/apache/hama/commons/graph/VertexInputReader.java (revision 0) +++ commons/src/main/java/org/apache/hama/commons/graph/VertexInputReader.java (working copy) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.commons.graph; + +import org.apache.hadoop.io.Writable; + +/** + * A reader to read Hama's input files and parses a vertex out of it. + */ +public abstract class VertexInputReader { + + /** + * Parses a given key and value into the given vertex. If returned true, the + * given vertex is considered finished and a new instance will be given in the + * next call. + */ + public abstract boolean parseVertex(KIN key, VIN value, VERTEX vertex) + throws Exception; + +} Index: core/pom.xml =================================================================== --- core/pom.xml (revision 1421667) +++ core/pom.xml (working copy) @@ -32,6 +32,11 @@ + org.apache.hama + hama-commons + ${project.version} + + org.xerial.snappy snappy-java 1.0.4.1 Index: core/src/main/java/org/apache/hama/bsp/BSPJob.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPJob.java (revision 1421667) +++ core/src/main/java/org/apache/hama/bsp/BSPJob.java (working copy) @@ -32,6 +32,8 @@ import org.apache.hama.HamaConfiguration; import org.apache.hama.bsp.message.compress.BSPMessageCompressor; import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory; +import org.apache.hama.commons.graph.AbstractVertex; +import org.apache.hama.commons.graph.VertexInputReader; /** * A BSP job configuration. @@ -238,6 +240,19 @@ } HamaConfiguration conf = new HamaConfiguration(); + + if (this.getBspClass().getName().toString() + .equals("org.apache.hama.graph.GraphJobRunner")) { + conf.setBoolean("hama.graph.job", true); + conf.setClass( + "hama.graph.input.reader.class", + this.getConfiguration().getClass("hama.graph.input.reader.class", + VertexInputReader.class), VertexInputReader.class); + conf.setClass("hama.graph.vertex.class", this.getConfiguration() + .getClass("hama.graph.vertex.class", AbstractVertex.class), + AbstractVertex.class); + } + conf.setInt("desired.num.of.tasks", Integer.parseInt(this.getConfiguration().get("bsp.peers.num"))); BSPJob partitioningJob = new BSPJob(conf); Index: core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (revision 1421667) +++ core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (working copy) @@ -21,6 +21,8 @@ import java.util.HashMap; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -31,10 +33,14 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.commons.graph.AbstractVertex; +import org.apache.hama.commons.graph.VertexInputReader; import org.apache.hama.util.KeyValuePair; public class PartitioningRunner extends BSP { + public static final Log LOG = LogFactory.getLog(PartitioningRunner.class); + private Configuration conf; private int desiredNum; private FileSystem fs = null; @@ -67,12 +73,44 @@ Class keyClass = null; Class valueClass = null; + + ///////////////////////////////////////////////////////// + LOG.info(">>>>>>>>>>> is graphJob? " + isGraphJob()); + LOG.info(">>>>>>>>>>> reader class is? " + + conf.getClass("hama.graph.input.reader.class", + VertexInputReader.class).getName()); + + VertexInputReader parser = (VertexInputReader) ReflectionUtils + .newInstance(conf.getClass("hama.graph.input.reader.class", + VertexInputReader.class), conf); + + + ///////////////////////////////////////////////////////// + while ((pair = peer.readNext()) != null) { if (keyClass == null && valueClass == null) { keyClass = pair.getKey().getClass(); valueClass = pair.getValue().getClass(); } + //////////////////////////////// debugging + if(isGraphJob()) { + AbstractVertex vertex = (AbstractVertex) ReflectionUtils + .newInstance(conf.getClass("hama.graph.vertex.class", + AbstractVertex.class), conf); + + LOG.info("XXXXXXXXXXX " + vertex.getClass().getName()); + try { + parser.parseVertex(pair.getKey(), pair.getValue(), vertex); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + LOG.info(">>>>>>>>> vertexID: " + vertex.getVertexID()); + LOG.info(">>>>>>>>> vertex value: " + vertex.getValue()); + } + //////////////////////////////// + int index = Math.abs(partitioner.getPartition(pair.getKey(), pair.getValue(), desiredNum)); @@ -114,7 +152,8 @@ files[i].getPath(), conf); Writable key = (Writable) ReflectionUtils.newInstance(keyClass, conf); - Writable value = (Writable) ReflectionUtils.newInstance(valueClass, conf); + Writable value = (Writable) ReflectionUtils.newInstance(valueClass, + conf); while (reader.next(key, value)) { writer.append(key, value); @@ -129,14 +168,18 @@ } @SuppressWarnings("rawtypes") - public Partitioner getPartitioner() { + private Partitioner getPartitioner() { return ReflectionUtils.newInstance(conf .getClass("bsp.input.partitioner.class", HashPartitioner.class, Partitioner.class), conf); } - private static String getPartitionName(int i) { + private String getPartitionName(int i) { return "part-" + String.valueOf(100000 + i).substring(1, 6); } + private boolean isGraphJob() { + return conf.getBoolean("hama.graph.job", false); + } + } Index: graph/pom.xml =================================================================== --- graph/pom.xml (revision 1421667) +++ graph/pom.xml (working copy) @@ -33,6 +33,11 @@ org.apache.hama + hama-commons + ${project.version} + + + org.apache.hama hama-core ${project.version} Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJob.java (revision 1421667) +++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hama.bsp.Combiner; import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.Partitioner; +import org.apache.hama.commons.graph.VertexInputReader; import com.google.common.base.Preconditions; @@ -39,7 +40,6 @@ public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class"; public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class"; - public final static String VERTEX_GRAPH_RUNTIME_PARTIONING = "hama.graph.runtime.partitioning"; public final static String VERTEX_GRAPH_INPUT_READER = "hama.graph.input.reader.class"; /** @@ -117,7 +117,7 @@ * Sets the input reader for parsing the input to vertices. */ public void setVertexInputReaderClass( - Class> cls) { + Class> cls) { ensureState(JobState.DEFINE); conf.setClass(VERTEX_GRAPH_INPUT_READER, cls, VertexInputReader.class); } @@ -132,7 +132,6 @@ public void setPartitioner(@SuppressWarnings("rawtypes") Class theClass) { super.setPartitioner(theClass); - conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true); } @Override Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1421667) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -17,6 +17,8 @@ */ package org.apache.hama.graph; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -26,14 +28,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.Partitioner; +import org.apache.hama.commons.graph.AbstractVertex; public abstract class Vertex - implements VertexInterface { + extends AbstractVertex implements VertexInterface { GraphJobRunner runner; - private V vertexID; - private M value; private List> edges; private boolean votedToHalt = false; @@ -43,11 +44,6 @@ } @Override - public V getVertexID() { - return vertexID; - } - - @Override public void setup(Configuration conf) { } @@ -113,20 +109,6 @@ return edges; } - @Override - public M getValue() { - return value; - } - - @Override - public void setValue(M value) { - this.value = value; - } - - public void setVertexID(V vertexID) { - this.vertexID = vertexID; - } - public int getMaxIteration() { return runner.getMaxIteration(); } @@ -224,4 +206,16 @@ + " // " + edges; } + @Override + public void readFields(DataInput arg0) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void write(DataOutput arg0) throws IOException { + // TODO Auto-generated method stub + + } + } Index: graph/src/main/java/org/apache/hama/graph/VertexInputReader.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (revision 1421667) +++ graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (working copy) @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hama.graph; - -import org.apache.hadoop.io.Writable; - -/** - * A reader to read Hama's input files and parses a vertex out of it. - */ -public abstract class VertexInputReader { - - /** - * Parses a given key and value into the given vertex. If returned true, the - * given vertex is considered finished and a new instance will be given in the - * next call. - */ - public abstract boolean parseVertex(KEYIN key, VALUEIN value, - Vertex vertex) throws Exception; - -} Index: graph/src/test/java/org/apache/hama/graph/example/PageRank.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/example/PageRank.java (revision 1421667) +++ graph/src/test/java/org/apache/hama/graph/example/PageRank.java (working copy) @@ -22,13 +22,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.TextArrayWritable; +import org.apache.hama.commons.graph.VertexInputReader; import org.apache.hama.graph.AbstractAggregator; import org.apache.hama.graph.Edge; import org.apache.hama.graph.Vertex; -import org.apache.hama.graph.VertexInputReader; public class PageRank { @@ -89,32 +90,22 @@ } } - public static class PagerankTextReader extends - VertexInputReader { + public static class PagerankTextReader + extends + VertexInputReader> { - /** - * The text file essentially should look like:
- * VERTEX_ID\t(n-tab separated VERTEX_IDs)
- * E.G:
- * 1\t2\t3\t4
- * 2\t3\t1
- * etc. - */ @Override - public boolean parseVertex(LongWritable key, Text value, + public boolean parseVertex(Text key, TextArrayWritable value, Vertex vertex) { - String[] split = value.toString().split("\t"); - for (int i = 0; i < split.length; i++) { - if (i == 0) { - vertex.setVertexID(new Text(split[i])); - } else { - vertex - .addEdge(new Edge(new Text(split[i]), null)); - } + vertex.setVertexID(key); + + for(Writable v : value.get()) { + vertex.addEdge(new Edge((Text) v, null)); } + return true; } - + } public static class DanglingNodeAggregator Index: src/assemble/bin.xml =================================================================== --- src/assemble/bin.xml (revision 1421667) +++ src/assemble/bin.xml (working copy) @@ -74,7 +74,19 @@ ../hama-${project.version}/ - + + ../commons/target + + hama-*.jar + + + *sources.jar + *tests.jar + *javadoc.jar + + ../hama-${project.version}/ + + ../