Index: src/examples/org/apache/hama/examples/RandBench.java =================================================================== --- src/examples/org/apache/hama/examples/RandBench.java (revision 1148587) +++ src/examples/org/apache/hama/examples/RandBench.java (working copy) @@ -76,18 +76,13 @@ } @Override - public void setConf(Configuration conf) { - this.conf = conf; + public void setup(BSPPeer peer) { + this.conf = peer.getConfiguration(); this.sizeOfMsg = conf.getInt(SIZEOFMSG, 1); this.nCommunications = conf.getInt(N_COMMUNICATIONS, 1); this.nSupersteps = conf.getInt(N_SUPERSTEPS, 1); } - @Override - public Configuration getConf() { - return conf; - } - } public static void main(String[] args) throws Exception { Index: src/examples/org/apache/hama/examples/graph/InlinkCountComputer.java =================================================================== --- src/examples/org/apache/hama/examples/graph/InlinkCountComputer.java (revision 0) +++ src/examples/org/apache/hama/examples/graph/InlinkCountComputer.java (revision 0) @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.examples.graph; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hama.examples.graph.InlinkCountComputer.InlinkCount; +import org.apache.hama.graph.GraphComputeRunner; +import org.apache.hama.graph.LocalNode; +import org.apache.hama.graph.MessageValueIterator; +import org.apache.hama.graph.Node; +import org.apache.hama.graph.Pair; +import org.apache.hama.graph.Vertex; +import org.apache.hama.graph.VertexComputerGeneric; +import org.apache.hama.graph.VertexEnvironment; +import org.apache.hama.graph.VertexReference; + +/** + * VertexComputer class for InlinkCount. + */ +public class InlinkCountComputer extends VertexComputerGeneric { + + /** + * Class implementing an example graph BSP algorithm where nodes count the + * number of inlinks they have. + */ + public class InlinkCount extends Vertex { + private long inlinkCount = 0; + + public InlinkCount(LocalNode node, + VertexEnvironment environment) throws IOException { + super(node, environment); + } + + @Override + public void compute(MessageValueIterator messages) + throws IOException { + if (this.getSuperstep() == 1) { + // Send an inlink count of one to all the neighbors. + LongWritable one = new LongWritable(1); + Iterator edges = this.getOutEdgeIterator(); + while (edges.hasNext()) { + VertexReference edge = edges.next(); + this.sendMessageTo(edge, one); + } + } else { + // Count inlinks. + while (messages.hasNext()) { + Pair message = messages.next(); + inlinkCount += message.getSecond().get(); + } + // vote for halt, because we've finished our computation + this.voteToHalt(); + } + } + + public long getInlinkCount() { + return inlinkCount; + } + } + + public InlinkCountComputer() { + super(InlinkCount.class, LongWritable.class); + } + + public static void main(String[] args) throws Exception { + + // TODO define the job configuration interface + + new GraphComputeRunner().run(args); + } + +} Index: src/examples/org/apache/hama/examples/graph/PageRank.java =================================================================== --- src/examples/org/apache/hama/examples/graph/PageRank.java (revision 1148587) +++ src/examples/org/apache/hama/examples/graph/PageRank.java (working copy) @@ -52,7 +52,7 @@ InterruptedException { String master = conf.get(MASTER_TASK); // setup the datasets - adjacencyList = PageRankBase.mapAdjacencyList(getConf(), peer); + adjacencyList = PageRankBase.mapAdjacencyList(conf, peer); // init the pageranks to 1/n where n is the number of all vertices for (PageRankVertex vertex : adjacencyList.keySet()) { tentativePagerank.put(vertex, Double.valueOf(1.0 / numOfVertices)); @@ -109,8 +109,8 @@ LOG.info("Finished with iteration " + iteration + "!"); } - private double broadcastError(BSPPeer peer, String master, - double error) throws IOException, KeeperException, InterruptedException { + private double broadcastError(BSPPeer peer, String master, double error) + throws IOException, KeeperException, InterruptedException { peer.send(master, new DoubleMessage("", error)); peer.sync(); if (peer.getPeerName().equals(master)) { @@ -161,8 +161,8 @@ } @Override - public void setConf(Configuration conf) { - this.conf = conf; + public void setup(BSPPeer peer) { + this.conf = peer.getConfiguration(); numOfVertices = Integer.parseInt(conf.get("num.vertices")); DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor")); ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices; @@ -171,11 +171,6 @@ peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";"); } - @Override - public Configuration getConf() { - return conf; - } - public static void printUsage() { System.out.println("PageRank Example:"); System.out Index: src/examples/org/apache/hama/examples/graph/ShortestPaths.java =================================================================== --- src/examples/org/apache/hama/examples/graph/ShortestPaths.java (revision 1148587) +++ src/examples/org/apache/hama/examples/graph/ShortestPaths.java (working copy) @@ -166,15 +166,10 @@ } @Override - public void setConf(Configuration conf) { - this.conf = conf; + public void setup(BSPPeer peer) { + this.conf = peer.getConfiguration(); } - @Override - public Configuration getConf() { - return conf; - } - public static void printUsage() { System.out.println("Single Source Shortest Path Example:"); System.out Index: src/java/org/apache/hama/bsp/BSP.java =================================================================== --- src/java/org/apache/hama/bsp/BSP.java (revision 1148587) +++ src/java/org/apache/hama/bsp/BSP.java (working copy) @@ -17,8 +17,27 @@ */ package org.apache.hama.bsp; +import java.io.IOException; + +import org.apache.zookeeper.KeeperException; + /** - * This class provides an abstract implementation of the BSP interface. + * This class provides an implementation of our BSP engine.
+ * Execution order: setup -> bsp -> cleanup. */ -public abstract class BSP implements BSPInterface { +public class BSP { + + public void setup(BSPPeer bspPeer) { + + } + + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, + InterruptedException { + + } + + public void cleanup(BSPPeer bspPeer) { + + } + } Index: src/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- src/java/org/apache/hama/bsp/BSPPeer.java (revision 1148587) +++ src/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -420,4 +420,12 @@ public void clearOutgoingQueues() { this.outgoingQueues.clear(); } + + public Configuration getConfiguration() { + return conf; + } + + public void setConfiguration(Configuration conf) { + this.conf = conf; + } } Index: src/java/org/apache/hama/bsp/BSPTask.java =================================================================== --- src/java/org/apache/hama/bsp/BSPTask.java (revision 1148587) +++ src/java/org/apache/hama/bsp/BSPTask.java (working copy) @@ -26,18 +26,19 @@ import org.apache.zookeeper.KeeperException; /** - * Base class for tasks. + * Base class for tasks. */ public class BSPTask extends Task { - + public static final Log LOG = LogFactory.getLog(BSPTask.class); - + private BSPJob conf; - + public BSPTask() { } - public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, int partition) { + public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, + int partition) { this.jobId = jobId; this.jobFile = jobFile; this.taskId = taskid; @@ -52,10 +53,12 @@ @Override public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical) throws IOException { - - BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass( - "bsp.work.class", BSP.class), job.getConf()); + BSP bsp = (BSP) ReflectionUtils.newInstance( + job.getConf().getClass("bsp.work.class", BSP.class), job.getConf()); + + bsp.setup(bspPeer); + try { bsp.bsp(bspPeer); } catch (IOException e) { @@ -66,15 +69,17 @@ LOG.error("Exception during BSP execution!", e); } + bsp.cleanup(bspPeer); + done(umbilical); } - + public BSPJob getConf() { - return conf; - } - - public void setConf(BSPJob conf) { - this.conf = conf; - } + return conf; + } + public void setConf(BSPJob conf) { + this.conf = conf; + } + } Index: src/java/org/apache/hama/bsp/LocalBSPRunner.java =================================================================== --- src/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1148587) +++ src/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy) @@ -195,12 +195,13 @@ } public void run() { - bsp.setConf(conf); + bsp.setup(groom); try { - bsp.bsp(groom); + bsp.bsp(groom); } catch (Exception e) { LOG.error("Exception during BSP execution!", e); } + bsp.cleanup(groom); } @Override Index: src/java/org/apache/hama/graph/EdgeIterator.java =================================================================== --- src/java/org/apache/hama/graph/EdgeIterator.java (revision 0) +++ src/java/org/apache/hama/graph/EdgeIterator.java (revision 0) @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.Iterator; + +/** + * Iterator returning VertexReferences from the outgoing edges of a LocalNode. + * It first iterates through local neighbors then remote neighbors. Beware that + * this iterator is not thread-safe! + */ +public class EdgeIterator implements Iterator { + // LocalNode instance for which outneighbors will be returned. + private LocalNode node; + private int currentNeighbor = -1; // Index of current neighbor. + private VertexReference reference = new VertexReference(); + + /** + * Construct EdgeIterator instance with a {@link LocalNode}. + * + * @param node + */ + public EdgeIterator(LocalNode node) { + this.node = node; + } + + /** + * Checks whether this iterator has more edges left. + * + * @return True if more edges are available, false otherwise. + */ + public boolean hasNext() { + return currentNeighbor < (node.localOutdegree() + node.remoteOutdegree() - 1); + } + + /** + * Returns next edge from this iterator. + * + * @return A {@link VertexReference} if more edges are left, null otherwise. + */ + public VertexReference next() { + if (!hasNext()) { + return null; + } + ++currentNeighbor; + VertexReference result; + // Determine if we have enumerated local neighbors. + if (currentNeighbor < node.localOutdegree()) { + LocalNode nextNode = node.localNeighbor(currentNeighbor); + reference.setPartition(nextNode.owner()); + reference.setLocalId(nextNode.reference()); + result = reference; + } else if (node.remoteOutdegree() > 0) { + // Return remote neighbors. + int remoteIndex = currentNeighbor - node.localOutdegree(); + Node nextNode = node.remoteNeighbor(remoteIndex); + reference.setPartition(nextNode.owner()); + reference.setLocalId(nextNode.reference()); + result = reference; + } else { + result = null; + } + return result; + } + + public void remove() { + throw new UnsupportedOperationException("Not supported yet."); + } + +} Index: src/java/org/apache/hama/graph/Graph.java =================================================================== --- src/java/org/apache/hama/graph/Graph.java (revision 0) +++ src/java/org/apache/hama/graph/Graph.java (revision 0) @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +public final class Graph { + + /** Id of this graph. */ + private final short graphId; + + /** Partition ID of this graph. */ + private final int partitionId; + + /** List of nodes in this graph. */ + private final LocalNode[] nodes; + + /** Graph root node. */ + protected TNode root; + + /** Global size (number of vertices). */ + private final long globalSize; + + /** Outgoing transitions. */ + private final JoinedTransitions outgoing; + + /** Incoming transitions. */ + private final JoinedTransitions incoming; + + /** Class that makes local node. */ + private final Class> TLocalNodeClass; + private final Class TNodeClass; + + @SuppressWarnings("unchecked") + public Graph(Class> TLocalNodeClass, + Class TNodeClass, int partitionId, int localSize, long globalSize, + int localTransitionsSize, int remoteTransitionsSize, + int localInTransitionsSize, int remoteInTransitionsSize) { + this.TLocalNodeClass = (Class>) TLocalNodeClass; + this.TNodeClass = TNodeClass; + // Register graph. + this.graphId = 0; // Not supported. + this.partitionId = partitionId; + this.outgoing = new JoinedTransitions(this, TNodeClass, + localTransitionsSize, remoteTransitionsSize); + if (localInTransitionsSize > 0 || remoteInTransitionsSize > 0) + this.incoming = new JoinedTransitions(this, TNodeClass, + localInTransitionsSize, remoteInTransitionsSize); + else + this.incoming = null; + this.nodes = new LocalNode[localSize]; + this.globalSize = globalSize; + } + + public Class> getTLocalNodeClass() { + return TLocalNodeClass; + } + + public Class getTNodeClass() { + return TNodeClass; + } + + public TNode root() { + return root; + } + + @SuppressWarnings("unchecked") + public void setRoot(int rootOwner, int rootOffset) { + this.root = (rootOwner == partitionId) ? (TNode) nodes[rootOffset] + : (TNode) NodeReference.newInstance(TNodeClass, graphId, rootOwner, + rootOffset); + } + + /** -1 if unknown */ + public long getGlobalSize() { + return globalSize; + } + + public LocalNode node(Node n) { + return nodes[n.reference()]; + } + + public LocalNode node(NodeReference n) { + return nodes[n.reference]; + } + + public LocalNode node(long n) { + return nodes[PairUtils.getSecond(n)]; + } + + public int nodes() { + return nodes.length; + } + + public LocalNode node(int index) { + return nodes[index]; + } + + public void addNode(LocalNode node, int reference) { + nodes[reference] = node; + node.setReference(reference); + } + + public void prepareTransitions() { + int hl = 0, hr = 0; + for (int i = 0; i < nodes(); i++) { + LocalNode n = node(i); + n.setNeighborStart(hl, hr); + hl += n.localOutdegree(); + hr += 2 * n.remoteOutdegree(); + } + } + + public void prepareInTransitions() { + int hl = 0, hr = 0; + for (int i = 0; i < nodes(); i++) { + LocalNode n = node(i); + n.setInNeighborStart(hl, hr); + hl += n.localIndegree(); + hr += 2 * n.remoteIndegree(); + } + } + + public void addTransition(LocalNode source, LocalNode target) { + if (source.localNeighborsStart < 0) + source.localNeighborsStart = outgoing.nextLocalStart; + outgoing.addLocalTransition(source.reference, source.localNeighborsStart, + target); + } + + public void addTransition(LocalNode source, int targetOwner, + int targetReference) { + if (source.remoteNeighborsStart < 0) + source.remoteNeighborsStart = outgoing.nextRemoteStart; + outgoing.addRemoteTransition(source.reference, source.remoteNeighborsStart, + targetOwner, targetReference); + } + + public void addLocalInTransition(LocalNode source, + LocalNode target) { + if (target.localInNeighborsStart < 0) + target.localInNeighborsStart = incoming.nextLocalStart; + incoming.addLocalTransition(target.reference, target.localInNeighborsStart, + source); + } + + public void addRemoteInTransition(int sourceOwner, int sourceReference, + LocalNode target) { + if (target.remoteInNeighborsStart < 0) + target.remoteInNeighborsStart = incoming.nextRemoteStart; + incoming.addRemoteTransition(target.reference, + target.remoteInNeighborsStart, sourceOwner, sourceReference); + } + + public short getId() { + return graphId; + } + + public int getPartitionId() { + return partitionId; + } + + public LocalNode getLocalNeighbor(int start, int index) { + return outgoing.getLocalTransition(start, index); + } + + public TNode getRemoteNeighbor(int start, int index) { + return outgoing.getRemoteTransitionAsNode(start, index); + } + + public long getRemoteNeighborAsLong(int start, int index) { + return outgoing.getRemoteTransitionAsLong(start, index); + } + + public int getRemoteNeighborId(int start, int index) { + return outgoing.getRemoteTransitionId(start, index); + } + + public int getRemoteNeighborOwner(int start, int index) { + return outgoing.getRemoteTransitionOwner(start, index); + } + + public LocalNode getLocalInNeighbor(int start, int index) { + return incoming.getLocalTransition(start, index); + } + + public TNode getRemoteInNeighbor(int start, int index) { + return incoming.getRemoteTransitionAsNode(start, index); + } + + public long getRemoteInNeighborAsLong(int start, int index) { + return incoming.getRemoteTransitionAsLong(start, index); + } + + public int getRemoteInNeighborId(int start, int index) { + return incoming.getRemoteTransitionId(start, index); + } + + public int getRemoteInNeighborOwner(int start, int index) { + return incoming.getRemoteTransitionOwner(start, index); + } + + public LocalNode[] getLocalTransitions() { + return outgoing.getLocalTransitions(); + } + + public int[] getRemoteTransitions() { + return outgoing.getRemoteTransitions(); + } + + public LocalNode[] getLocalInTransitions() { + return incoming.getLocalTransitions(); + } + + public int[] getRemoteInTransitions() { + return incoming.getRemoteTransitions(); + } + + public int transitions() { + return outgoing.getLocalTransitions().length + + outgoing.getRemoteTransitions().length; + } + + public int inTransitions() { + return incoming.getLocalTransitions().length + + incoming.getRemoteTransitions().length; + } + + // + // public LocalNode getLocalInNeighbor(int index) { + // return incoming.getLocalTransition(index); + // } + // + + // public Neighbors transitions(int localStart, int localCount, + // int remoteStart, int remoteCount) { + // return new Neighbors(localStart, localCount, remoteStart, + // remoteCount, outgoing); + // } + + // public Neighbors inTransitions(int localStart, int localCount, + // int remoteStart, int remoteCount) { + // return new Neighbors(localStart, localCount, remoteStart, + // remoteCount, incoming); + // } + + // @SuppressWarnings("unchecked") + // public TransitionsIterator transitionsIterator(int localStart, + // int localCount, int remoteStart, int remoteCount) { + // return outgoing.iterator(localStart, localCount, remoteStart, + // remoteCount); + // } + + // @SuppressWarnings("unchecked") + // public TransitionsIterator inTransitionsIterator(int localStart, + // int localCount, int remoteStart, int remoteCount) { + // return incoming.iterator(localStart, localCount, remoteStart, + // remoteCount); + // } + + public String toString() { + return toString(0, true, true); + } + + public String toString(int max, boolean out, boolean in) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < nodes.length && (max == 0 || i < max); i++) { + sb.append(i); + sb.append(": "); + LocalNode node = node(i); + if (out && node.neighbors() > 0) { + sb.append(" ->"); + for (int j = 0; j < node.neighbors(); j++) { + sb.append(" " + node.neighbor(j)); + } + } + if (in && node.inNeighbors() > 0) { + sb.append(" <- "); + for (int j = 0; j < node.inNeighbors(); j++) { + sb.append(" " + node.inNeighbor(j)); + } + } + sb.append("\n"); + } + return sb.toString(); + } + +} Index: src/java/org/apache/hama/graph/GraphComputeRunner.java =================================================================== --- src/java/org/apache/hama/graph/GraphComputeRunner.java (revision 0) +++ src/java/org/apache/hama/graph/GraphComputeRunner.java (revision 0) @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hama.HamaConfiguration; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPJob; +import org.apache.hama.bsp.BSPJobClient; +import org.apache.hama.bsp.ClusterStatus; +import org.apache.hama.zookeeper.QuorumPeer; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; + +/** + * Command-line driver program for running a graph algorithm in the BSP + * framework. + */ +public class GraphComputeRunner implements Watcher { + + @SuppressWarnings("unchecked") + public int run(String[] args) throws InterruptedException, IOException, + ClassNotFoundException, KeeperException { + if (args.length < 3) { + System.out.println("Usage: runner "); + System.exit(0); + } + // BSP job configuration. + HamaConfiguration conf = new HamaConfiguration(); + BSPJob bsp = new BSPJob(conf); + // Setup job. + bsp.setJobName(args[2]); + bsp.setBspClass((Class) Class.forName(args[2])); + bsp.setJarByClass(Class.forName(args[2])); + + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(true); + int numPartitions = Integer.parseInt(args[1]); + if (cluster.getGroomServers() < numPartitions) { + System.out.println("Not enough groom servers for graph."); + System.out.println("Only " + cluster.getGroomServers() + + " servers reported but needed " + numPartitions + "."); + System.exit(0); + } + System.out.println("Allocating " + numPartitions + " BSP tasks"); + bsp.setNumBspTask(numPartitions); + + // Get ZooKeeper configuration settings. + String zooKeeperConnectionString = QuorumPeer.getZKQuorumServersString(conf); + + // Create ZooKeeper client. + ZooKeeper zk = new ZooKeeper(zooKeeperConnectionString, 3000, this); + // Create base directory in ZooKeeper. + String zkBasePath = "/" + + Long.toHexString(new java.util.Random().nextLong()); + zk.create(zkBasePath, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + // Set configuration. + conf.set(VertexComputerGeneric.COMPUTER_BASE_PATH, zkBasePath); + conf.set(VertexComputerGeneric.COMPUTER_GRAPH_PATH, args[0]); + conf.setInt(VertexComputerGeneric.COMPUTER_NUM_PARTITIONS, numPartitions); + if (null != args[3]) { + conf.setInt(VertexComputerGeneric.COMPUTER_MAX_ITERATIONS, + Integer.parseInt(args[3])); + } + + long startTime = System.currentTimeMillis(); + BSPJobClient.runJob(bsp); + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds"); + return 0; + } + + public void process(WatchedEvent event) { + } + + public static void main(String[] args) throws Exception { + new GraphComputeRunner().run(args); + } + +} Index: src/java/org/apache/hama/graph/JoinedTransitions.java =================================================================== --- src/java/org/apache/hama/graph/JoinedTransitions.java (revision 0) +++ src/java/org/apache/hama/graph/JoinedTransitions.java (revision 0) @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Graph transitions. + * + * @param + */ +public final class JoinedTransitions { + + /** Node interface. */ + private final Class TNodeClass; + /** Transitions outgoing to local nodes. */ + private final LocalNode[] localTransitions; + /** Transitions outgoing to remote nodes. */ + private final int[] remoteTransitions; + /** Graph id. */ + private final short graphId; + + /** Create a join array. */ + @SuppressWarnings("unchecked") + public JoinedTransitions(Graph graph, Class TNodeClass, + int localTransitionsSize, int remoteTransitionsSize) { + this.TNodeClass = TNodeClass; + this.graphId = graph.getId(); + this.localTransitions = new LocalNode[localTransitionsSize]; + this.remoteTransitions = new int[remoteTransitionsSize * 2]; + Arrays.fill(this.remoteTransitions, -1); + } + + public LocalNode[] getLocalTransitions() { + return localTransitions; + } + + public LocalNode getLocalTransition(int start, int index) { + return localTransitions[start + index]; + } + + public LocalNode getLocalTransition(int index) { + return localTransitions[index]; + } + + public int[] getRemoteTransitions() { + return remoteTransitions; + } + + public TNode getRemoteTransitionAsNode(int start, int index) { + return (TNode) NodeReference.newInstance(TNodeClass, graphId, + remoteTransitions[start + index * 2], remoteTransitions[start + index + * 2 + 1]); + } + + public long getRemoteTransitionAsLong(int start, int index) { + return PairUtils.createPair(remoteTransitions[start + index * 2], + remoteTransitions[start + index * 2 + 1]); + } + + public int getRemoteTransition(int index) { + return remoteTransitions[index]; + } + + public int getRemoteTransitionOwner(int start, int index) { + return remoteTransitions[start + 2 * index]; + } + + public int getRemoteTransitionId(int start, int index) { + return remoteTransitions[start + 2 * index + 1]; + } + + int nextLocalStart = 0; + int nextRemoteStart = 0; + private int lastLocalReference = -1; + private int lastRemoteReference = -1; + + public void addLocalTransition(int reference, int start, + LocalNode target) { + int index; + if (reference == lastLocalReference) { + index = nextLocalStart; + } else { + index = start; + while (localTransitions[index] != null) + index++; + } + localTransitions[index] = target; + nextLocalStart = index + 1; + lastLocalReference = reference; + } + + public void addRemoteTransition(int reference, int start, int remoteOwner, + int remoteReference) { + int index; + if (reference == lastRemoteReference) { + index = nextRemoteStart; + } else { + index = start; + while (remoteTransitions[index] != -1) + index += 2; + } + remoteTransitions[index] = remoteOwner; + remoteTransitions[index + 1] = remoteReference; + nextRemoteStart = index + 2; + lastRemoteReference = reference; + } + + public TransitionsIterator iterator(int localStart, int localCount, + int remoteStart, int remoteCount) { + return new TransitionsIterator(localStart, localCount, remoteStart, + remoteCount); + } + + public class TransitionsIterator implements Iterator { + + private int localIndex = 0; + private int remoteIndex = 0; + private final int localStart, localCount; + private final int remoteStart, remoteCount; + + public TransitionsIterator(int localStart, int localCount, int remoteStart, + int remoteCount) { + this.localStart = localStart; + this.localCount = localCount; + this.remoteStart = remoteStart; + this.remoteCount = remoteCount; + } + + public boolean hasNext() { + return (localIndex + remoteIndex < localCount + remoteCount); + } + + @SuppressWarnings("unchecked") + public TNode next() { + if (localIndex < localCount) { + LocalNode node = localTransitions[localStart + localIndex]; + localIndex++; + return (TNode) node; + } + if (remoteIndex < remoteCount) { + TNode node = (TNode) NodeReference.newInstance(TNodeClass, graphId, + remoteTransitions[remoteStart + 2 * remoteIndex], + remoteTransitions[remoteStart + 2 * remoteIndex + 1]); + remoteIndex++; + return node; + } + throw new NoSuchElementException("No element: " + this); + } + + public void remove() { + throw new UnsupportedOperationException("Cannot remove a transitions"); + } + + @Override + public String toString() { + return "local=" + localIndex + "/" + localStart + "*" + localCount + + ",remote=" + remoteIndex + "/" + remoteStart + "*" + remoteCount; + } + } + +} Index: src/java/org/apache/hama/graph/LocalNode.java =================================================================== --- src/java/org/apache/hama/graph/LocalNode.java (revision 0) +++ src/java/org/apache/hama/graph/LocalNode.java (revision 0) @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +public class LocalNode implements Node { + + /** Nodes reference (id). Unique on machine. */ + protected int reference = -1; + + /** Number of local neighbors. */ + public short localNeighborsCount; + /** Number of remote neighbors. */ + public short remoteNeighborsCount; + /** Number of local in-neighbors. */ + public short localInNeighborsCount; + /** Number of remote in-neighbors. */ + public short remoteInNeighborsCount; + + /** Location of the first local neighbor. */ + public int localNeighborsStart = -1; + /** Location of the first remote neighbor. */ + public int remoteNeighborsStart = -1; + /** Location of the first local in-neighbor. */ + public int localInNeighborsStart = -1; + /** Location of the first remote in-neighbor. */ + public int remoteInNeighborsStart = -1; + + /** Graph. */ + protected final Graph graph; + + public LocalNode(Graph graph) { + this.graph = graph; + } + + public final boolean isLocal() { + return true; + } + + public final short graphId() { + return graph.getId(); + } + + public int outdegree() { + return localNeighborsCount + remoteNeighborsCount; + } + + public int indegree() { + return localInNeighborsCount + remoteInNeighborsCount; + } + + public int localOutdegree() { + return localNeighborsCount; + } + + public int remoteOutdegree() { + return remoteNeighborsCount; + } + + public int localIndegree() { + if (localInNeighborsCount < 0) + throw new RuntimeException("Local in neighbors count not defined " + + "for node " + this); + return localInNeighborsCount; + } + + public int remoteIndegree() { + if (localInNeighborsCount < 0) + throw new RuntimeException("Remote in neighbors count not defined" + + " for node " + this); + return remoteInNeighborsCount; + } + + public void allocateTransition(boolean localTarget) throws Exception { + if (localTarget) { + if (localNeighborsCount == Short.MAX_VALUE) + throw new Exception("Cannot handle more than " + Short.MAX_VALUE + + " local transitions per node"); + localNeighborsCount++; + } else { + if (remoteNeighborsCount == Short.MAX_VALUE) + throw new Exception("Cannot handle more than " + Short.MAX_VALUE + + " remote transitions per node"); + remoteNeighborsCount++; + } + } + + public void allocateInTransition(int sourceOwner) { + allocateInTransition(sourceOwner == PartitionManager.getInstance() + .getPartition()); + } + + public void allocateInTransition(boolean local) { + if (local) { + if (localInNeighborsCount == Short.MAX_VALUE) + throw new RuntimeException("Cannot handle more than " + Short.MAX_VALUE + + " local transitions per node"); + localInNeighborsCount++; + } else { + if (remoteInNeighborsCount == Short.MAX_VALUE) + throw new RuntimeException("Cannot handle more than " + Short.MAX_VALUE + + " remote transitions per node"); + remoteInNeighborsCount++; + } + } + + public final void setNeighborStart(int localStart, int remoteStart) { + localNeighborsStart = localStart; + remoteNeighborsStart = remoteStart; + } + + public final void setInNeighborStart(int localStart, int remoteStart) { + localInNeighborsStart = localStart; + remoteInNeighborsStart = remoteStart; + } + + public final int neighbors() { + return localNeighborsCount + remoteNeighborsCount; + } + + public final int localNeighbors() { + return localNeighborsCount; + } + + public final int remoteNeighbors() { + return remoteNeighborsCount; + } + + public final int inNeighbors() { + return localInNeighborsCount + remoteInNeighborsCount; + } + + public final int localInNeighbors() { + return localInNeighborsCount; + } + + public final int remoteInNeighbors() { + return remoteInNeighborsCount; + } + + public final TNode neighbor(int index) { + // if (index < localNeighborsCount) { + // System.err.println(this + " getting local neighbor: " + index); + // return (TNode) graph.getLocalNeighbor(localNeighborsStart, + // index); + // } else { + // System.err.println(this + " getting remote neighbor: " + // + (index - localNeighborsCount)); + // return graph.getRemoteNeighbor(remoteNeighborsStart, index + // - localNeighborsCount); + // } + throw new RuntimeException("LocalNode: TNode neighbor(index) " + + "should never be executed"); + } + + public final LocalNode localNeighbor(int index) { + // System.err.println(this + " getting local neighbor: " + index + ": " + // + graph.getLocalNeighbor(localNeighborsStart, index)); + return graph.getLocalNeighbor(localNeighborsStart, index); + } + + public final TNode remoteNeighbor(int index) { + return graph.getRemoteNeighbor(remoteNeighborsStart, index); + } + + public final int remoteNeighborOwner(int index) { + return graph.getRemoteNeighborOwner(remoteNeighborsStart, index); + } + + public final int remoteNeighborId(int index) { + return graph.getRemoteNeighborId(remoteNeighborsStart, index); + } + + public final TNode inNeighbor(int index) { + // if (index < localInNeighborsCount) + // return (TNode) graph.getLocalInNeighbor(localInNeighborsStart, + // index); + // else + // return graph.getRemoteInNeighbor(remoteInNeighborsStart, index + // - localInNeighborsCount); + throw new RuntimeException("LocalNode: TNode inNeighbor(index) " + + "should never be executed"); + } + + public final LocalNode localInNeighbor(int index) { + return graph.getLocalInNeighbor(localInNeighborsStart, index); + } + + public final TNode remoteInNeighbor(int index) { + return graph.getRemoteInNeighbor(remoteInNeighborsStart, index); + } + + public final int remoteInNeighborOwner(int index) { + return graph.getRemoteInNeighborOwner(remoteInNeighborsStart, index); + } + + public final int remoteInNeighborId(int index) { + return graph.getRemoteInNeighborId(remoteInNeighborsStart, index); + } + + public final void setReference(int reference) { + this.reference = reference; + } + + public final int reference() { + return reference; + } + + public final boolean isReference() { + return false; + } + + public final long asReference() { + return PairUtils.createPair(owner(), reference); + } + + public final int owner() { + return graph.getPartitionId(); + } + + public final Graph graph() { + return graph; + } + + @Override + public String toString() { + if (reference >= 0) + return reference + "@" + PartitionManager.getInstance().getPartition(); + return "HANDLE UNDEFINED YET"; + } + + public final String name() { + return reference + "@" + PartitionManager.getInstance().getPartition(); + } + +} Index: src/java/org/apache/hama/graph/MessageValueIterator.java =================================================================== --- src/java/org/apache/hama/graph/MessageValueIterator.java (revision 0) +++ src/java/org/apache/hama/graph/MessageValueIterator.java (revision 0) @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.Iterator; +import org.apache.hadoop.io.Writable; + +/** + * Interface for iterating through messages collected for a vertex. + */ +public interface MessageValueIterator extends + Iterator> { +} Index: src/java/org/apache/hama/graph/MessageValueIteratorByTag.java =================================================================== --- src/java/org/apache/hama/graph/MessageValueIteratorByTag.java (revision 0) +++ src/java/org/apache/hama/graph/MessageValueIteratorByTag.java (revision 0) @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPMessage; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.util.SerializationHelper; +import org.apache.log4j.Logger; + +/** + * Class for iterating through message values destined for a vertex. Based on a + * proposed BSPPeer API which allows getting messages with a specific tag + * efficiently. + */ +public class MessageValueIteratorByTag + implements MessageValueIterator { + private static Logger LOG = Logger.getLogger(MessageValueIteratorByTag.class); + + BSPPeer bspPeer; + Class MessageValueClass; + VertexMessageEnvelope envelope = new VertexMessageEnvelope(); + BSPMessage next = null; + byte[] tag; + + public MessageValueIteratorByTag() { + } + + public MessageValueIteratorByTag(BSPPeer bspPeer, + Class MessageValueClass) { + this.bspPeer = bspPeer; + this.MessageValueClass = MessageValueClass; + } + + public void setTag(byte[] tag) { + this.tag = tag; + } + + public boolean hasNext() { + if (tag == null/* || bspPeer.getNumCurrentMessagesWithTag(tag) == 0 */) { + return false; + } else { + return true; + } + } + + @SuppressWarnings({ "unchecked", "null" }) + public Pair next() { + Pair result = null; + try { + BSPMessage received = null/* = bspPeer.getCurrentMessageWithTag(tag) */; + // Deserialize VertexMessageEnvelope. + SerializationHelper.deserialize((byte[]) received.getData(), envelope); + Writable messageValue = null; + try { + // Instantiate Writable for message data. + messageValue = MessageValueClass.newInstance(); + SerializationHelper.deserialize((byte[]) received.getData(), + messageValue); + } catch (InstantiationException ex) { + LOG.error(ex); + } catch (IllegalAccessException ex) { + LOG.error(ex); + } + // In case class instantiation failed... + if (messageValue == null) { + if (hasNext()) { + return next(); + } else { + return null; + } + } + SerializationHelper.deserialize(envelope.getRawMessage(), messageValue); + Pair pair = new Pair( + envelope.getSource(), messageValue); + + result = (Pair) pair; + } catch (IOException ex) { + LOG.error(ex); + } + if (result == null && hasNext()) { + return next(); + } else { + return result; + } + } + + public void remove() { + throw new UnsupportedOperationException("Not supported yet."); + } +} Index: src/java/org/apache/hama/graph/MessageValueIteratorFromList.java =================================================================== --- src/java/org/apache/hama/graph/MessageValueIteratorFromList.java (revision 0) +++ src/java/org/apache/hama/graph/MessageValueIteratorFromList.java (revision 0) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.Iterator; +import org.apache.hadoop.io.Writable; + +/** + * Convenience class representing an iterator for message values. + */ +public class MessageValueIteratorFromList + implements MessageValueIterator { + + private Iterator> iterator; + + public MessageValueIteratorFromList() { + } + + /** + * Construct MessageValueIterator instance from the specified {@link Iterator}. + * @param iterator + */ + public MessageValueIteratorFromList( + Iterator> iterator) { + this.iterator = iterator; + } + + public void setIterator( + Iterator> iterator) { + this.iterator = iterator; + } + + public boolean hasNext() { + return iterator.hasNext(); + } + + public Pair next() { + return iterator.next(); + } + + public void remove() { + iterator.remove(); + } +} Index: src/java/org/apache/hama/graph/Node.java =================================================================== --- src/java/org/apache/hama/graph/Node.java (revision 0) +++ src/java/org/apache/hama/graph/Node.java (revision 0) @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +public interface Node { + + public boolean isReference(); + + public boolean isLocal(); + + public int reference(); + + public int owner(); + + public short graphId(); + + public long asReference(); + +} Index: src/java/org/apache/hama/graph/NodeReference.java =================================================================== --- src/java/org/apache/hama/graph/NodeReference.java (revision 0) +++ src/java/org/apache/hama/graph/NodeReference.java (revision 0) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +/** + * Reference to a local or remote node. + */ +public final class NodeReference implements Node { + short graphId; + int owner; + int reference; + + private NodeReference(short graphId, int owner, int reference) { + this.graphId = graphId; + this.owner = owner; + this.reference = reference; + } + + public boolean isLocal() { + return true; // We only handle local nodes. + } + + public final boolean isReference() { + return true; + } + + public long asReference() { + return PairUtils.createPair(owner, reference); + } + + public int reference() { + return reference; + } + + public int owner() { + return owner; + } + + public short graphId() { + return graphId; + } + + @Override + public String toString() { + return reference + "@" + owner; + } + + @SuppressWarnings("unchecked") + public static TNode newInstance(Class TNodeClass, + short graphId, int owner, int reference) { + return (TNode) new NodeReference(graphId, owner, reference); + } + + @SuppressWarnings("unchecked") + public static TNode newInstance(Class TNodeClass, + Node node) { + return (TNode) new NodeReference(node.graphId(), node.owner(), + node.reference()); + } + + public static NodeReference newInstance(short graphId, int owner, + int reference) { + return new NodeReference(graphId, owner, reference); + } + + public static NodeReference newInstance(Node node) { + return new NodeReference(node.graphId(), node.owner(), node.reference()); + } +} Index: src/java/org/apache/hama/graph/Pair.java =================================================================== --- src/java/org/apache/hama/graph/Pair.java (revision 0) +++ src/java/org/apache/hama/graph/Pair.java (revision 0) @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +public class Pair { + private A a; + private B b; + + public Pair(A a, B b) { + this.a = a; + this.b = b; + } + + public A getFirst() { + return a; + } + + public B getSecond() { + return b; + } + + public Pair setFirst(A a) { + this.a = a; + return this; + } + + public Pair setSecond(B b) { + this.b = b; + return this; + } +} Index: src/java/org/apache/hama/graph/PairUtils.java =================================================================== --- src/java/org/apache/hama/graph/PairUtils.java (revision 0) +++ src/java/org/apache/hama/graph/PairUtils.java (revision 0) @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +public class PairUtils { + + public static final long LONG_NULL_PAIR = createPair((int) -1, (int) -1); + public static final int INT_NULL_PAIR = createPair((short) -1, (short) -1); + + private static final long LONG_ZERO_LEFT = 0x00000000FFFFFFFFL; + private static final int INT_ZERO_LEFT = 0x0000FFFF; + + // ///// long=(int,int) + + /** Creates long number from two integers. */ + public static final long createPair(int first, int second) { + return (((long) first) << 32) | (((long) second) & LONG_ZERO_LEFT); + } + + /** Gets first integer element from a long pair. */ + public static final int getFirst(long pair) { + return (int) ((pair >> 32) & LONG_ZERO_LEFT); + } + + /** Gets second integer element from a long pair. */ + public static final int getSecond(long pair) { + return (int) (pair & LONG_ZERO_LEFT); + } + + public static final String pairToString(long pair) { + return "(" + getFirst(pair) + "," + getSecond(pair) + ")"; + } + + // //// int=(short,short) + + /** Creates long number from two integers. */ + public static final int createPair(short first, short second) { + return (((int) first) << 16) | ((int) second & INT_ZERO_LEFT); + } + + /** Gets first integer element from a long pair. */ + public static final short getFirst(int pair) { + return (short) ((pair >> 16) & INT_ZERO_LEFT); + } + + /** Gets second integer element from a long pair. */ + public static final short getSecond(int pair) { + return (short) (pair & INT_ZERO_LEFT); + } + + // //// int=(char,char) + + /** Creates long number from two integers. */ + public static int createPair(char first, char second) { + return (((int) first) << 16) | (((int) second) & INT_ZERO_LEFT); + } + + /** Gets first integer element from a long pair. */ + public static char getFirstChar(int pair) { + return (char) ((pair >> 16) & INT_ZERO_LEFT); + } + + /** Gets second integer element from a long pair. */ + public static char getSecondChar(int pair) { + return (char) (pair & INT_ZERO_LEFT); + } + + public static void main(String args[]) { + + int alist[] = new int[] { 47027, 20, 99, -5955, 1024, 256, 0, 0, -1 }; + int blist[] = new int[] { 1, 359, -335535, 35, 1024, -256, 1, 0, -1 }; + + boolean ok = true; + + for (int i = 0; i < alist.length && i < blist.length; i++) { + int a = alist[i]; + int b = blist[i]; + + long pair = createPair(a, b); + + int aa = getFirst(pair); + int bb = getSecond(pair); + + if (a != aa || b != bb) + ok = false; + + System.out.println(a + " " + b + " -> " + pair + " -> " + aa + " " + bb + + " -> " + (a == aa && b == bb ? "OK" : "FAIL!!!!!")); + } + + if (!ok) + System.out.println("Test failed!"); + else + System.out.println("Test run OK!"); + } +} Index: src/java/org/apache/hama/graph/PartitionManager.java =================================================================== --- src/java/org/apache/hama/graph/PartitionManager.java (revision 0) +++ src/java/org/apache/hama/graph/PartitionManager.java (revision 0) @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.HashMap; +import java.util.Map; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +/** + * Class for automagically assigning a partition to an instance of this class + * and releasing partitions. The assignPartition() and releasePartition() can + * only be called synchronously. + */ +public class PartitionManager { + private static Logger LOG = Logger.getLogger(PartitionManager.class); + private static PartitionManager singleton = null; + + private int numPartitions; + private int myPartition = -1; + private ZooKeeper zk; + private String basePath; + private byte[] nodeData; // Data to be stored with partition assignment. + private Map partitionCache = new HashMap(); + + PartitionManager(ZooKeeper zooKeeper, String basePath, byte[] nodeData, + int numPartitions) { + this.zk = zooKeeper; + this.basePath = basePath; + this.nodeData = nodeData; + this.numPartitions = numPartitions; + } + + /** + * Creates a PartitionManager instance based on the parameters provided. + * + * @param zooKeeper ZooKeeper client instance to use. + * @param basePath + * @param nodeData + * @param numPartitions + * @return Singleton PartitionManager instance. + */ + public static PartitionManager setInstance(ZooKeeper zooKeeper, + String basePath, byte[] nodeData, int numPartitions) { + singleton = new PartitionManager(zooKeeper, basePath, nodeData, + numPartitions); + return getInstance(); + } + + public static PartitionManager getInstance() { + return singleton; + } + + /** + * Tries to assign a partition to this instance of PartitionManager. + * On success it returns a non-negative number. + * + * @return Got partition's number or already assigned partition's number. + */ + public int assignPartition() { + if (myPartition >= 0) { + return myPartition; + } + // Try to create each of the potential partitions. If the creation of one + // fails, move on. + // TODO: Randomize this to get less exceptions. + // TODO: Put listener on partitionPath and invalidate only modified + // elements in our cache. + int gotPartition = -1; + for (int i = 0; i < numPartitions; ++i) { + String path = getPartitionPath(i); + try { + zk.create(path, nodeData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException ke) { + if (ke.code() == Code.NODEEXISTS) { + continue; + } else if (ke.code() == Code.NONODE) { + LOG.error("Base path does not exist: " + basePath); + return -1; + } else { + LOG.error(ke); + continue; + } + } catch (InterruptedException ie) { + LOG.error(ie); + continue; + } + // Node successfully created. + gotPartition = i; + myPartition = gotPartition; + break; + } + return gotPartition; + } + + /** + * Releases the partition assigned to this PartitionManager instance. + * + * @throws InterruptedException + * @throws KeeperException + */ + public void releasePartition() throws InterruptedException, KeeperException { + if (myPartition == -1) { + LOG.error("Not owning a partition -- ignoring release."); + return; + } + zk.delete(getPartitionPath(myPartition), 0); + myPartition = -1; + } + + public int getPartition() { + return myPartition; + } + + public int getNumPartitions() { + return numPartitions; + } + + /** + * Determines which peer is responsible for the specified partition. + * + * @param partition Number of partition. + * @return Hostname of peer responsible for {@link partition}. + * @throws KeeperException + * @throws InterruptedException + */ + public String getPeerForParition(int partition) throws KeeperException, + InterruptedException { + if (partition < 0 || partition >= numPartitions) { + LOG.error("Invalid partition number given: " + partition); + return null; + } + if (partitionCache.containsKey(partition)) { + return partitionCache.get(partition); + } + String partitionPath = getPartitionPath(partition); + Stat stat = zk.exists(partitionPath, false); + if (stat == null) { + LOG.error("Partition not assigned: " + partition); + return null; + } else { + // Get data for this path. + byte[] peerData = zk.getData(partitionPath, false, stat); + String result = new String(peerData); + partitionCache.put(partition, result); + return result; + } + } + + private String getPartitionPath(int partition) { + return basePath + "/" + Integer.toString(partition); + } +} \ No newline at end of file Index: src/java/org/apache/hama/graph/Vertex.java =================================================================== --- src/java/org/apache/hama/graph/Vertex.java (revision 0) +++ src/java/org/apache/hama/graph/Vertex.java (revision 0) @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeerInterface; +import org.apache.hama.bsp.ByteMessage; +import org.apache.hama.util.SerializationHelper; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; + +/** + * Base vertex class for implementing BSP graph algorithms. TODO: Serialization + * of vertex state (Vertex should extend Writable?). + */ +public abstract class Vertex { + private static final Logger LOG = Logger.getLogger(Vertex.class); + + public Class MessageValueClass; + + // Instance of represented local vertex. + private LocalNode localNode; + private BSPPeerInterface bspPeer; // Parent peer. + private PartitionManager partitionManager; + private VertexMessageEnvelope myEnvelope = new VertexMessageEnvelope(); + + private boolean active = true; + + /** + * Construct a {@link Vertex} instance for the specified {@link LocalNode} + * instance and {@link VertexEnvironment}. + * + * @param localNode {@link LocalNode} instance this vertex will be + * representing. + * @param environment {@link VertexEnvironment} in which this Vertex instance + * will be living. + */ + public Vertex(LocalNode localNode, + VertexEnvironment environment) throws IOException { + this.localNode = localNode; + this.bspPeer = environment.getBspPeer(); + this.partitionManager = environment.getPartitionManager(); + // Set up message envelope. + myEnvelope.setSource(getReference()); + } + + /** + * Run local computation for this vertex instance. + * + * @param messages Message iterator for reading messages received in the + * previous superstep. + */ + public void compute(MessageValueIterator messages) + throws IOException { + this.voteToHalt(); + } + + public VertexReference getReference() { + return new VertexReference(localNode.owner(), localNode.reference()); + } + + /** + * Get the current superstep's number. + * + * @return Current superstep's number. + */ + public long getSuperstep() { + return bspPeer.getSuperstepCount(); + } + + /** + * Creates an iterator for the outgoing edges from this vertex instance. + * + * @return Iterator for enumerating end-points of outgoing edges. + */ + public Iterator getOutEdgeIterator() { + return new EdgeIterator(localNode); + } + + /** + * Sends a message to + * + * @param target . + * + * @param target The target vertex of the message. + * @param message Contents of the message to send. + */ + public void sendMessageTo(VertexReference target, TMessageValue message) + throws IOException { + // Determine target's peer. + String targetPeer; + try { + targetPeer = partitionManager.getPeerForParition(target.getPartition()); + } catch (KeeperException ex) { + LOG.error(ex); + return; + } catch (InterruptedException ex) { + LOG.error(ex); + return; + } + // Serialize message tag. + byte[] messageTag = SerializationHelper.serialize(target); + // Serialize message. + byte[] data = SerializationHelper.serialize(message); + // Wrap it in the envelope. + myEnvelope.setRawMessage(data); + byte[] rawData = SerializationHelper.serialize(myEnvelope); + // Send it. + ByteMessage bspMessage = new ByteMessage(messageTag, rawData); + bspPeer.send(targetPeer, bspMessage); + } + + public final void voteToHalt() { + this.active = false; + } + + public final boolean isActive() { + return this.active; + } + +} Index: src/java/org/apache/hama/graph/VertexComputerGeneric.java =================================================================== --- src/java/org/apache/hama/graph/VertexComputerGeneric.java (revision 0) +++ src/java/org/apache/hama/graph/VertexComputerGeneric.java (revision 0) @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSP; +import org.apache.hama.bsp.BSPMessage; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.util.SerializationHelper; +import org.apache.hama.zookeeper.QuorumPeer; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * Class for performing per-vertex computations on the local graph partition. + * TODO: Better vertex storage. + */ +public class VertexComputerGeneric> + extends BSP implements Watcher { + private static final Logger LOG = Logger + .getLogger(VertexComputerGeneric.class); + + // Base path in ZooKeeper for this job (without terminating slash). + public static final String COMPUTER_BASE_PATH = "vertexcomputer.basepath"; + public static final String COMPUTER_NUM_PARTITIONS = "vertexcomputer.partitions"; + public static final String COMPUTER_GRAPH_PATH = "vertexcomputer.graphPath"; + public static final String COMPUTER_MAX_ITERATIONS = "vertexcomputer.maxIterations"; + private Configuration conf; + private int numPartitions; + private int maxIterations = -1; + // ZooKeeper client for this instance. + private ZooKeeper zooKeeper = null; + private PartitionManager partitionManager; + + private Class> VertexClass; + private Class MessageValueClass; + + // Local graph partition. + private Graph graph; + // Local pool of vertexes. + private TVertex[] vertexes; + // local message pool + private Map>> messageBuffer = new HashMap>>(); + + // count of inactive vertices + private int inactiveVertices = 0; + + public VertexComputerGeneric(Class> vertexClass, + Class messageValueClass) { + this.VertexClass = vertexClass; + this.MessageValueClass = messageValueClass; + } + + public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, + InterruptedException { + LOG.error("Initializing peer " + bspPeer.getPeerName()); + if (bspPeer.getSuperstepCount() < 1) { + try { + initialize(bspPeer); + } catch (Exception ex) { + throw new RuntimeException("Couldn't initialize client: " + ex); + } + // Synchronize. + bspPeer.sync(); + } + // Iterate till stop condition has not been met. + boolean done = false; + while (!done) { + // Note current iteration. + // Beware that superstepCount starts from 0 while iterations start + // from 1. + long currentIteration = bspPeer.getSuperstepCount(); + LOG.info("Starting iteration " + currentIteration); + // Select messages for nodes. + processReceivedMessages(bspPeer); + // Run computation on local nodes. + runLocalComputation(); + // Synchronize. + LOG.info("Synchronizing"); + bspPeer.sync(); + + // Check stop condition. + if ((inactiveVertices == vertexes.length) || maxIterations > 0 + && currentIteration >= maxIterations) { + done = true; + } + + LOG.info("Finished iteration " + currentIteration); + } + LOG.info("Done with computation"); + cleanup(); + LOG.info("Finished cleaning up"); + } + + public void setConf(Configuration conf) { + this.conf = conf; + this.numPartitions = conf.getInt(COMPUTER_NUM_PARTITIONS, 1); + this.maxIterations = conf.getInt(COMPUTER_MAX_ITERATIONS, -1); + } + + public Configuration getConf() { + return this.conf; + } + + public void process(WatchedEvent event) { + } + + private void initialize(BSPPeer bspPeer) throws IOException, KeeperException, + InterruptedException { + // Create ZooKeeper instance. + String connectionString = QuorumPeer.getZKQuorumServersString(conf); + + zooKeeper = new ZooKeeper(connectionString, 3000, this); + // Get one partition of the graph. + partitionManager = new PartitionManager(zooKeeper, + conf.get(COMPUTER_BASE_PATH), bspPeer.getPeerName().getBytes(), + numPartitions); + partitionManager.assignPartition(); + + // TODO graph load + /* + * String graphBasePath = conf.get(COMPUTER_GRAPH_PATH); + * graph = GraphIO.read(LocalNode.class, Node.class, "svcii", graphBasePath, + * partitionManager.getPartition(), numPartitions); + */ + + // Init shadow vertexes for graph. + initShadowVertexes(bspPeer); + } + + private void cleanup() throws InterruptedException, KeeperException { + partitionManager.releasePartition(); + zooKeeper.close(); + } + + /** + * Creates shadow vertexes for the graph partition handled by this peer. + * + * @param bspPeer + */ + private void initShadowVertexes(BSPPeer bspPeer) { + try { + VertexEnvironment vertexEnvironment = new VertexEnvironment(bspPeer, + partitionManager); + vertexes = (TVertex[]) Array.newInstance(VertexClass, graph.nodes()); + Constructor constructor = VertexClass + .getConstructor(new Class[] { LocalNode.class, + VertexEnvironment.class }); + for (int i = 0; i < graph.nodes(); ++i) { + LocalNode localNode = graph.node(i); + vertexes[i] = (TVertex) constructor.newInstance(localNode, + vertexEnvironment); + if (vertexes[i] == null) + LOG.error("vertex null " + i); + } + } catch (IllegalArgumentException ex) { + LOG.error(ex); + } catch (InvocationTargetException ex) { + LOG.error(ex); + } catch (NoSuchMethodException ex) { + LOG.error(ex); + } catch (SecurityException ex) { + LOG.error(ex); + } catch (IllegalAccessException iaex) { + LOG.error(iaex); + } catch (InstantiationException iex) { + LOG.error(iex); + } + } + + /** + * Processes messages received by this peer. Deserializes messages and puts + * them into the respective pool of messages for the local destination vertex. + * Messages which are not intended for the graph partition handled by this + * peer are thrown away. + * + * @param peer + * @throws IOException + */ + private void processReceivedMessages(BSPPeer peer) throws IOException { + int myPartition = partitionManager.getPartition(); + LOG.info("Processing received messages for partition " + myPartition); + BSPMessage received; + VertexReference destination = new VertexReference(); + VertexMessageEnvelope envelope = new VertexMessageEnvelope(); + while ((received = peer.getCurrentMessage()) != null) { + // Check if this message has been sent to the right peer. + SerializationHelper.deserialize((byte[]) received.getTag(), destination); + if (destination.getPartition() != myPartition) { + LOG.error("Received message not destined for me: " + + destination.getPartition()); + continue; + } + // If needed, create message list for vertex. + if (!messageBuffer.containsKey(destination.getLocalId())) { + messageBuffer.put(destination.getLocalId(), + new ArrayList>()); + } + // Deserialize message envelope. + SerializationHelper.deserialize((byte[]) received.getData(), envelope); + // Deserialize message value. + Writable messageValue = null; + try { + // Instantiate Writable for message data. + messageValue = MessageValueClass.newInstance(); + } catch (InstantiationException ex) { + LOG.error(ex); + } catch (IllegalAccessException ex) { + LOG.error(ex); + } + if (messageValue == null) { + // Skip storing this message. + continue; + } + SerializationHelper.deserialize(envelope.getRawMessage(), messageValue); + // Put message into relevant map. + List> messages = messageBuffer + .get(destination.getLocalId()); + messages.add(new Pair(envelope.getSource(), + messageValue)); + } + } + + /** + * Calls compute() on all the local vertexes. + * + * @throws IOException + */ + private void runLocalComputation() throws IOException { + LOG.info("Running local computation on " + vertexes.length + " nodes"); + MessageValueIteratorFromList valueIterator = new MessageValueIteratorFromList(); + // Create empty iterator for nodes which did not receive any message. + List> emptyList = new ArrayList>(); + // Iterate through local vertexes and invoke compute() method on each of + // them. + int i = 0; + for (TVertex vertex : vertexes) { + // Check if there are messages intended for this vertex and if they are + // active + if (vertex != null && vertex.isActive()) { + if (vertex.getReference() == null) { + LOG.error("Vertex reference is null for " + i + "th vertex"); + } + // Get list of messages for this vertex. + List> receivedMessages = messageBuffer + .get(vertex.getReference().getLocalId()); + if (receivedMessages != null) { + valueIterator.setIterator(receivedMessages.iterator()); + } else { + valueIterator.setIterator(emptyList.iterator()); + } + // Do local computation for vertex. + vertex.compute(valueIterator); + ++i; + if (0 == (i % 10000)) { + LOG.info("Ran compute() on " + i + " nodes"); + } + } else if (vertex == null) { + LOG.error(i + "th vertex is null"); + } else if (!vertex.isActive()) { + inactiveVertices++; + } + } + LOG.info("Done with local computation! We found " + inactiveVertices + + " in this iteration! Total vertex count is " + vertexes.length + "!"); + } +} Index: src/java/org/apache/hama/graph/VertexEnvironment.java =================================================================== --- src/java/org/apache/hama/graph/VertexEnvironment.java (revision 0) +++ src/java/org/apache/hama/graph/VertexEnvironment.java (revision 0) @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import org.apache.hama.bsp.BSPPeerInterface; + +/** + * Class holding information relevant for each Vertex. + */ +public class VertexEnvironment { + + private BSPPeerInterface bspPeer; + private PartitionManager partitionManager; + + public VertexEnvironment(BSPPeerInterface bspPeer, + PartitionManager partitionManager) { + this.bspPeer = bspPeer; + this.partitionManager = partitionManager; + } + + public BSPPeerInterface getBspPeer() { + return bspPeer; + } + + public PartitionManager getPartitionManager() { + return partitionManager; + } +} Index: src/java/org/apache/hama/graph/VertexMessageEnvelope.java =================================================================== --- src/java/org/apache/hama/graph/VertexMessageEnvelope.java (revision 0) +++ src/java/org/apache/hama/graph/VertexMessageEnvelope.java (revision 0) @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +/** + * Class for wrapping serialized vertex message data with sender information. + */ +public class VertexMessageEnvelope implements Writable { + private VertexReference source; + private byte[] rawMessage; + + public VertexMessageEnvelope() { + } + + public VertexMessageEnvelope(VertexReference source, byte[] rawMessage) { + this.source = source; + this.rawMessage = rawMessage; + } + + public void write(DataOutput out) throws IOException { + source.write(out); + out.writeInt(rawMessage.length); + out.write(rawMessage); + } + + public void readFields(DataInput in) throws IOException { + source = new VertexReference(); + source.readFields(in); + int rawLength = in.readInt(); + rawMessage = new byte[rawLength]; + in.readFully(rawMessage); + } + + public byte[] getRawMessage() { + return rawMessage; + } + + public void setRawMessage(byte[] rawMessage) { + this.rawMessage = rawMessage; + } + + public VertexReference getSource() { + return source; + } + + public void setSource(VertexReference source) { + this.source = source; + } +} Index: src/java/org/apache/hama/graph/VertexMessageTag.java =================================================================== --- src/java/org/apache/hama/graph/VertexMessageTag.java (revision 0) +++ src/java/org/apache/hama/graph/VertexMessageTag.java (revision 0) @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +/** + * Class holding two VertexReferences: one target and one destination. Used as a + * tag when passing messages. + */ +public class VertexMessageTag implements Writable { + private VertexReference source; + private VertexReference destination; + + public VertexReference getDestination() { + return destination; + } + + public void setDestination(VertexReference destination) { + this.destination = destination; + } + + public VertexReference getSource() { + return source; + } + + public void setSource(VertexReference source) { + this.source = source; + } + + public void write(DataOutput out) throws IOException { + source.write(out); + destination.write(out); + } + + public void readFields(DataInput in) throws IOException { + source = new VertexReference(); + source.readFields(in); + destination = new VertexReference(); + destination.readFields(in); + } + +} Index: src/java/org/apache/hama/graph/VertexReference.java =================================================================== --- src/java/org/apache/hama/graph/VertexReference.java (revision 0) +++ src/java/org/apache/hama/graph/VertexReference.java (revision 0) @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +/** + * Class for storing a reference to a vertex. A reference consists of a + * partition ID and a local vertex ID. + */ +public class VertexReference implements Writable { + private int partition; + private int localId; + + public VertexReference() { + } + + public VertexReference(int partition, int localId) { + this.partition = partition; + this.localId = localId; + } + + public int getLocalId() { + return localId; + } + + public void setLocalId(int localId) { + this.localId = localId; + } + + public int getPartition() { + return partition; + } + + public void setPartition(int partition) { + this.partition = partition; + } + + public void write(DataOutput out) throws IOException { + out.writeInt(partition); + out.writeInt(localId); + } + + public void readFields(DataInput in) throws IOException { + partition = in.readInt(); + localId = in.readInt(); + } +} Index: src/java/org/apache/hama/util/SerializationHelper.java =================================================================== --- src/java/org/apache/hama/util/SerializationHelper.java (revision 0) +++ src/java/org/apache/hama/util/SerializationHelper.java (revision 0) @@ -0,0 +1,56 @@ +package org.apache.hama.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +/** + * Serialization helper for easier usage of Hadoop's Writable types. + */ +public final class SerializationHelper { + + /** + * Serializes the writable object into a byte array. + * + * @param object {@link Writable} object to serialize. + * @return Byte array containing serialized {@link object}. + * @throws IOException + */ + public static byte[] serialize(Writable object) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + object.write(dos); + dos.flush(); + return baos.toByteArray(); + } + + /** + * Deserializes a {@link Writable} object from a byte array. + * + * @param data Byte array to deserialize from. + * @param object {@link Writable} instance to deserialize into. + * @throws IOException + */ + public static void deserialize(byte[] data, Writable object) + throws IOException { + object.readFields(new DataInputStream(new ByteArrayInputStream(data))); + } + + /** + * Serializes the long value into a byte array using DataOutputStream. + * + * @param value Long value to serialize. + * @return Byte array containing serialized value. + * @throws IOException + */ + public static byte[] longToBytes(long value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeLong(value); + dos.flush(); + return baos.toByteArray(); + } +} Index: src/test/testjar/ClassSerializePrinting.java =================================================================== --- src/test/testjar/ClassSerializePrinting.java (revision 1148587) +++ src/test/testjar/ClassSerializePrinting.java (working copy) @@ -42,6 +42,7 @@ private FileSystem fileSys; private int num; + @Override public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, InterruptedException { @@ -67,12 +68,9 @@ writer.close(); } - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; + @Override + public void setup(BSPPeer peer) { + this.conf = peer.getConfiguration(); num = Integer.parseInt(conf.get("bsp.peers.num")); try { fileSys = FileSystem.get(conf);