Index: core/src/main/java/org/apache/hama/bsp/BSPPeer.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeer.java (revision 1447032) +++ core/src/main/java/org/apache/hama/bsp/BSPPeer.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hama.Constants; import org.apache.hama.bsp.Counters.Counter; +import org.apache.hama.bsp.message.BSPMessageCoupledStorage; import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.util.KeyValuePair; @@ -196,4 +197,11 @@ * @return the task id of this task. */ public TaskAttemptID getTaskId(); + + /** + * Register storage coupled with messaging. + * @param storage + */ + public void registerBSPStorage(BSPMessageCoupledStorage storage); + } Index: core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (revision 1447032) +++ core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl; import org.apache.hama.bsp.ft.BSPFaultTolerantService; import org.apache.hama.bsp.ft.FaultTolerantPeerService; +import org.apache.hama.bsp.message.BSPMessageCoupledStorage; import org.apache.hama.bsp.message.MessageManager; import org.apache.hama.bsp.message.MessageManagerFactory; import org.apache.hama.bsp.message.queue.MessageQueue; @@ -433,7 +434,7 @@ } // Clear outgoing queues. - messenger.clearOutgoingQueues(); + messenger.switchQueues(); leaveBarrier(); @@ -680,4 +681,9 @@ return taskId; } + @Override + public void registerBSPStorage(BSPMessageCoupledStorage storage) { + this.messenger.registerBSPStorage(storage); + } + } Index: core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (revision 1447032) +++ core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (working copy) @@ -347,11 +347,7 @@ @Override public void transfer(InetSocketAddress addr, BSPMessageBundle bundle) throws IOException { - for (M value : bundle.getMessages()) { - MANAGER_MAP.get(addr).localQueueForNextIteration.add(value); - peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, - 1L); - } + MANAGER_MAP.get(addr).loopBackMessages(bundle); } } Index: core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (revision 1447032) +++ core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (working copy) @@ -77,6 +77,9 @@ // List of listeners for all the sent messages protected Queue> messageListenerQueue; + + // Message coupled registered storage. + protected BSPMessageCoupledStorage storage; /* * (non-Javadoc) @@ -162,6 +165,14 @@ localQueue = localQueueForNextIteration.getMessageQueue(); localQueue.prepareRead(); localQueueForNextIteration = getSynchronizedReceiverQueue(); + } + + @Override + public void switchQueues() { + clearOutgoingQueues(); + if(storage != null){ + storage.notifySyncComplete(); + } notifyInit(); } @@ -288,21 +299,36 @@ } @Override - public void loopBackMessages(BSPMessageBundle bundle) + public void loopBackMessages(BSPMessageBundle bundle) throws IOException { - for (Writable message : bundle.getMessages()) { + for (M message : bundle.getMessages()) { loopBackMessage(message); } } - @SuppressWarnings("unchecked") @Override - public void loopBackMessage(Writable message) throws IOException { - this.localQueueForNextIteration.add((M) message); + public void loopBackMessage(M message) throws IOException { + if(checkWithStorage(message, peer)){ + storage.consumeMessage(message, peer); + } + else { + this.localQueueForNextIteration.add((M) message); + } peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L); notifyReceivedMessage((M) message); } + + private boolean checkWithStorage(M message, BSPPeer peer) { + return (storage != null && storage.isValidToStore(message, peer)); + } + @Override + public void registerBSPStorage(BSPMessageCoupledStorage storage) { + this.storage = storage; + } + + + } Index: core/src/main/java/org/apache/hama/bsp/message/BSPMessageCoupledStorage.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/BSPMessageCoupledStorage.java (revision 0) +++ core/src/main/java/org/apache/hama/bsp/message/BSPMessageCoupledStorage.java (working copy) @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp.message; + +import org.apache.hadoop.io.Writable; +import org.apache.hama.bsp.BSPPeer; + +/** + * This is a utililty feature provided by HAMA messaging framework to couple + * local storage with messaging asynchronously. This may be used if the user + * wants to control the lifecycle of the messages and does not want to clear + * them at the end of the current superstep. + * + * @param Message type. + */ +public interface BSPMessageCoupledStorage { + + /** + * Should this message be consumed by this storage. If not this message is + * not saved and is available for the next superstep by calling + * {@link BSPPeer#getCurrentMessage()} + * @param message + * @return true if the message should be stored locally, if false, the framework + * gets the message for the next superstep. + */ + public boolean isValidToStore(M message, BSPPeer peer); + + /** + * Consume message by this storage. The message called with this function is + * not saved and is not available for the next superstep via + * {@link BSPPeer#getCurrentMessage()} + * @param message + * @return true if the message is stored locally successfully. + * @param message + * @return + */ + public boolean consumeMessage(M message, BSPPeer peer); + + /** + * Synchronization is complete. + */ + public void notifySyncComplete(); + +} Index: core/src/main/java/org/apache/hama/bsp/message/MessageManager.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (revision 1447032) +++ core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (working copy) @@ -91,6 +91,11 @@ * Clears the outgoing queue. Can be used to switch queues. */ public void clearOutgoingQueues(); + + /** + * Clears the outgoing queue. Can be used to switch queues. + */ + public void switchQueues(); /** * Gets the number of messages in the current queue. @@ -101,13 +106,13 @@ /** * Send the messages to self to receive in the next superstep. */ - public void loopBackMessages(BSPMessageBundle bundle) + public void loopBackMessages(BSPMessageBundle bundle) throws IOException; /** * Send the message to self to receive in the next superstep. */ - public void loopBackMessage(Writable message) throws IOException; + public void loopBackMessage(M message) throws IOException; /** * Register a listener for the events in message manager. @@ -118,5 +123,11 @@ */ public void registerListener(MessageEventListener listener) throws IOException; + + /** + * + * @param storage + */ + public void registerBSPStorage(BSPMessageCoupledStorage storage); } Index: core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (revision 1447032) +++ core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (working copy) @@ -52,7 +52,7 @@ if (directAlloc) { buffer = ByteBuffer.allocateDirect(size); } else { - buffer = ByteBuffer.allocateDirect(size); + buffer = ByteBuffer.allocate(size); } fileName = conf.get(Constants.DATA_SPILL_PATH) + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32); Index: core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java =================================================================== --- core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (revision 1447032) +++ core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (working copy) @@ -49,6 +49,7 @@ import org.apache.hama.bsp.Counters.Counter; import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl; import org.apache.hama.bsp.ft.FaultTolerantPeerService; +import org.apache.hama.bsp.message.BSPMessageCoupledStorage; import org.apache.hama.bsp.message.MessageEventListener; import org.apache.hama.bsp.message.MessageManager; import org.apache.hama.bsp.message.queue.MessageQueue; @@ -136,20 +137,32 @@ @SuppressWarnings("unchecked") @Override - public void loopBackMessages(BSPMessageBundle bundle) { + public void loopBackMessages(BSPMessageBundle bundle) { this.loopbackBundle = (BSPMessageBundle) bundle; } @Override - public void loopBackMessage(Writable message) { - } - - @Override public void registerListener(MessageEventListener listener) throws IOException { this.listener = listener; } + @Override + public void switchQueues() { + } + + @Override + public void loopBackMessage(Text message) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void registerBSPStorage(BSPMessageCoupledStorage storage) { + // TODO Auto-generated method stub + + } + } public static class TestBSPPeer implements @@ -299,7 +312,14 @@ public TaskAttemptID getTaskId() { return null; } + + @Override + public void registerBSPStorage(BSPMessageCoupledStorage storage) { + + } + + } public static class TempSyncClient extends BSPPeerSyncClient { Index: graph/src/main/java/org/apache/hama/graph/AggregationRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.sync.SyncException; @@ -35,7 +36,7 @@ * configured. * */ -public final class AggregationRunner { +public final class AggregationRunner, E extends Writable, M extends Writable> { // multiple aggregator arrays private Aggregator>[] aggregators; @@ -53,7 +54,7 @@ @SuppressWarnings("unchecked") public void setupAggregators( - BSPPeer peer) { + BSPPeer> peer) { this.conf = peer.getConfiguration(); String aggregatorClasses = peer.getConfiguration().get( GraphJob.AGGREGATOR_CLASS_ATTR); @@ -90,7 +91,7 @@ * Runs the aggregators by sending their values to the master task. */ public void sendAggregatorValues( - BSPPeer peer, + BSPPeer> peer, int activeVertices) throws IOException { // send msgCounts to the master task MapWritable updatedCnt = new MapWritable(); @@ -114,7 +115,7 @@ } } } - peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage( + peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage( updatedCnt)); } @@ -172,7 +173,7 @@ * we haven't seen any messages anymore. */ public boolean receiveAggregatedValues( - BSPPeer peer, + BSPPeer> peer, long iteration) throws IOException, SyncException, InterruptedException { // if we have an aggregator defined, we must make an additional sync // to have the updated values available on all our peers. Index: graph/src/main/java/org/apache/hama/graph/Edge.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Edge.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/Edge.java (working copy) @@ -19,11 +19,12 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * The edge class */ -public final class Edge { +public final class Edge, EDGE_VALUE_TYPE extends Writable> { private final VERTEX_ID destinationVertexID; private final EDGE_VALUE_TYPE cost; Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -26,6 +26,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; /** @@ -33,7 +34,8 @@ * real message (vertex ID and value). It can be extended by adding flags, for * example for a graph repair call. */ -public final class GraphJobMessage implements Writable { +public final class GraphJobMessage, E extends Writable, M extends Writable> + implements WritableComparable> { public static final int MAP_FLAG = 0x01; public static final int VERTEX_FLAG = 0x02; @@ -123,6 +125,7 @@ } + @SuppressWarnings("unchecked") @Override public void readFields(DataInput in) throws IOException { flag = in.readByte(); @@ -135,30 +138,29 @@ map = new MapWritable(); map.readFields(in); } else if (isPartitioningMessage()) { - Vertex vertex = GraphJobRunner - .newVertexInstance(VERTEX_CLASS, null); - Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null); + Vertex vertex = (Vertex) ReflectionUtils.newInstance( + VERTEX_CLASS, null); + + V vertexId = (V) ReflectionUtils.newInstance(VERTEX_ID_CLASS, null); vertexId.readFields(in); vertex.setVertexID(vertexId); if (in.readBoolean()) { - Writable vertexValue = ReflectionUtils.newInstance(VERTEX_VALUE_CLASS, + M vertexValue = (M) ReflectionUtils.newInstance(VERTEX_VALUE_CLASS, null); vertexValue.readFields(in); vertex.setValue(vertexValue); } int size = in.readInt(); - vertex.setEdges(new ArrayList>(size)); + vertex.setEdges(new ArrayList>(size)); for (int i = 0; i < size; i++) { - Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS, - null); + V edgeVertexID = (V) ReflectionUtils.newInstance(VERTEX_ID_CLASS, null); edgeVertexID.readFields(in); - Writable edgeValue = null; + E edgeValue = null; if (in.readBoolean()) { - edgeValue = ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null); + edgeValue = (E) ReflectionUtils.newInstance(EDGE_VALUE_CLASS, null); edgeValue.readFields(in); } - vertex.getEdges().add( - new Edge(edgeVertexID, edgeValue)); + vertex.getEdges().add(new Edge(edgeVertexID, edgeValue)); } this.vertex = vertex; } else if (isVerticesSizeMessage()) { @@ -217,4 +219,25 @@ + "]"; } + @SuppressWarnings("unchecked") + @Override + public int compareTo(GraphJobMessage that) { + if (this.flag != that.flag) { + return (this.flag - that.flag); + } else { + if (this.isVertexMessage() || this.isRepairMessage()) { + return ((V) this.getVertexId()).compareTo((V) that.getVertexId()); + } else if (this.isMapMessage()) { + return -1; + } else if (this.isPartitioningMessage()) { + return ((Vertex) vertex) + .compareTo((Vertex) that.vertex); + } else if (this.isVerticesSizeMessage()) { + return this.vertices_size.compareTo(that.vertices_size); + } + } + // Can we do better. Not safe! + return this.hashCode() - that.hashCode(); + } + } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -32,6 +33,7 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSP; import org.apache.hama.bsp.BSPPeer; @@ -39,6 +41,7 @@ import org.apache.hama.bsp.HashPartitioner; import org.apache.hama.bsp.Partitioner; import org.apache.hama.bsp.sync.SyncException; +import org.apache.hama.graph.VerticesInfo.VertexMappedMessageEntry; import org.apache.hama.util.KeyValuePair; /** @@ -48,8 +51,9 @@ * @param the value type of an edge. * @param the value type of a vertex. */ -public final class GraphJobRunner - extends BSP { +public final class GraphJobRunner, E extends Writable, M extends Writable> + extends + BSP> { public static enum GraphJobCounter { MULTISTEP_PARTITIONING, ITERATIONS, INPUT_VERTICES, AGGREGATE_VERTICES @@ -84,11 +88,11 @@ private AggregationRunner aggregationRunner; - private BSPPeer peer; + private BSPPeer> peer; @Override public final void setup( - BSPPeer peer) + BSPPeer> peer) throws IOException, SyncException, InterruptedException { setupFields(peer); @@ -103,7 +107,7 @@ @Override public final void bsp( - BSPPeer peer) + BSPPeer> peer) throws IOException, SyncException, InterruptedException { // we do supersteps while we still have updates and have not reached our @@ -114,7 +118,11 @@ peer.sync(); // note that the messages must be parsed here - final Map> messages = parseMessages(peer); + parseMessages(peer); + + if (aggregationRunner.isEnabled() && iteration > 1) { + vertices.switchQueues(); + } // master needs to update doMasterUpdates(peer); // if aggregators say we don't have updates anymore, break @@ -122,7 +130,7 @@ break; } // loop over vertices and do their computation - doSuperstep(messages, peer); + doSuperstep(peer); if (isMasterTask(peer)) { peer.getCounter(GraphJobCounter.ITERATIONS).increment(1); @@ -137,7 +145,7 @@ */ @Override public final void cleanup( - BSPPeer peer) + BSPPeer> peer) throws IOException { for (Vertex e : vertices) { peer.write(e.getVertexID(), e.getValue()); @@ -150,7 +158,7 @@ * reading multiple typed messages. */ private void doMasterUpdates( - BSPPeer peer) + BSPPeer> peer) throws IOException { if (isMasterTask(peer) && iteration > 1) { MapWritable updatedCnt = new MapWritable(); @@ -162,7 +170,7 @@ } // send the updates from the mater tasks back to the slaves for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(updatedCnt)); + peer.send(peerName, new GraphJobMessage(updatedCnt)); } } } @@ -171,25 +179,29 @@ * Do the main logic of a superstep, namely checking if vertices are active, * feeding compute with messages and controlling combiners/aggregators. */ - private void doSuperstep(Map> messages, - BSPPeer peer) + private void doSuperstep( + BSPPeer> peer) throws IOException { int activeVertices = 0; - for (Vertex vertex : vertices) { - List msgs = messages.get(vertex.getVertexID()); - // If there are newly received messages, restart. - if (vertex.isHalted() && msgs != null) { + + Iterator> vertexMsgIter = vertices + .getVertexMappedMessageIterator(); + + List combinedList = new ArrayList(1); + + while (vertexMsgIter.hasNext()) { + VertexMappedMessageEntry entry = vertexMsgIter.next(); + List msgs = entry.getMessages(); + Vertex vertex = entry.getVertex(); + if (vertex.isHalted() && !msgs.isEmpty()) { vertex.setActive(); } - if (msgs == null) { - msgs = Collections.emptyList(); - } - if (!vertex.isHalted()) { if (combiner != null) { M combined = combiner.combine(msgs); - msgs = new ArrayList(); - msgs.add(combined); + combinedList.clear(); + combinedList.add(combined); + msgs = combinedList; } M lastValue = vertex.getValue(); vertex.compute(msgs.iterator()); @@ -200,6 +212,8 @@ } } + vertices.clearCurrentMessages(); + aggregationRunner.sendAggregatorValues(peer, activeVertices); iteration++; } @@ -209,8 +223,9 @@ * superstep after the vertices have been loaded. */ private void doInitialSuperstep( - BSPPeer peer) + BSPPeer> peer) throws IOException { + for (Vertex vertex : vertices) { List singletonList = Collections.singletonList(vertex.getValue()); M lastValue = vertex.getValue(); @@ -223,19 +238,19 @@ @SuppressWarnings("unchecked") private void setupFields( - BSPPeer peer) { + BSPPeer> peer) { this.peer = peer; this.conf = peer.getConfiguration(); maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1); - Class vertexIdClass = (Class) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR, - Text.class, Writable.class); - Class vertexValueClass = (Class) conf.getClass( - GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class); - Class edgeValueClass = (Class) conf.getClass( - GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, - Writable.class); + Class vertexIdClass = (Class) conf.getClass( + GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class); + Class vertexValueClass = (Class) conf.getClass( + GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class); + Class edgeValueClass = (Class) conf.getClass( + GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class, + Writable.class); vertexClass = (Class>) conf.getClass( "hama.graph.vertex.class", Vertex.class); @@ -262,6 +277,7 @@ aggregationRunner.setupAggregators(peer); vertices = new VerticesInfo(); + vertices.init(conf); } /** @@ -269,7 +285,7 @@ */ @SuppressWarnings("unchecked") private void loadVertices( - BSPPeer peer) + BSPPeer> peer) throws IOException, SyncException, InterruptedException { final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false); @@ -289,7 +305,8 @@ } } - LOG.info(vertices.size() + " vertices are loaded into " + peer.getPeerName()); + LOG.info(vertices.size() + " vertices are loaded into " + + peer.getPeerName()); /* * If the user want to repair the graph, it should traverse through that @@ -304,13 +321,17 @@ repair(peer, selfReference); } + vertices.loadComplete(); + + peer.registerBSPStorage(vertices); + if (LOG.isDebugEnabled()) LOG.debug("Starting Vertex processing!"); } @SuppressWarnings("unchecked") private void repair( - BSPPeer peer, + BSPPeer> peer, boolean selfReference) throws IOException, SyncException, InterruptedException { @@ -319,12 +340,12 @@ for (Vertex v : vertices) { for (Edge e : v.getEdges()) { peer.send(v.getDestinationPeerName(e), - new GraphJobMessage(e.getDestinationVertexID())); + new GraphJobMessage(e.getDestinationVertexID())); } } peer.sync(); - GraphJobMessage msg; + GraphJobMessage msg; while ((msg = peer.getCurrentMessage()) != null) { V vertexName = (V) msg.getVertexId(); @@ -358,15 +379,16 @@ * other peers. */ private void countGlobalVertexCount( - BSPPeer peer) + BSPPeer> peer) throws IOException, SyncException, InterruptedException { for (String peerName : peer.getAllPeerNames()) { - peer.send(peerName, new GraphJobMessage(new IntWritable(vertices.size()))); + peer.send(peerName, + new GraphJobMessage(new IntWritable(vertices.size()))); } peer.sync(); - GraphJobMessage msg; + GraphJobMessage msg; while ((msg = peer.getCurrentMessage()) != null) { if (msg.isVerticesSizeMessage()) { numberVertices += msg.getVerticesSize().get(); @@ -385,23 +407,17 @@ * @return a map that contains messages pro vertex. */ @SuppressWarnings("unchecked") - private Map> parseMessages( - BSPPeer peer) + private void parseMessages( + BSPPeer> peer) throws IOException { - GraphJobMessage msg; - final Map> msgMap = new HashMap>(); + GraphJobMessage msg; while ((msg = peer.getCurrentMessage()) != null) { // either this is a vertex message or a directive that must be read // as map if (msg.isVertexMessage()) { - final V vertexID = (V) msg.getVertexId(); - final M value = (M) msg.getVertexValue(); - List msgs = msgMap.get(vertexID); - if (msgs == null) { - msgs = new ArrayList(); - msgMap.put(vertexID, msgs); - } - msgs.add(value); + throw new IllegalStateException( + "This message should have been consumed" + " by the framework: " + + msg.toString()); } else if (msg.isMapMessage()) { for (Entry e : msg.getMap().entrySet()) { Text vertexID = (Text) e.getKey(); @@ -426,7 +442,6 @@ } } - return msgMap; } /** @@ -480,7 +495,7 @@ /** * @return the peer instance. */ - public final BSPPeer getPeer() { + public final BSPPeer> getPeer() { return peer; } @@ -488,8 +503,8 @@ * Checks if this is a master task. The master task is the first peer in the * peer array. */ - public static boolean isMasterTask( - BSPPeer peer) { + @SuppressWarnings({ "rawtypes" }) + public static boolean isMasterTask(BSPPeer peer) { return peer.getPeerName().equals(getMasterTask(peer)); } @@ -497,18 +512,18 @@ * @return the name of the master peer, the name at the first index of the * peers. */ - public static String getMasterTask( - BSPPeer peer) { + @SuppressWarnings("rawtypes") + public static String getMasterTask(BSPPeer peer) { return peer.getPeerName(0); } /** * @return a new vertex instance */ - public static Vertex newVertexInstance( - Class vertexClass, Configuration conf) { - return (Vertex) ReflectionUtils.newInstance( - vertexClass, conf); + @SuppressWarnings("unchecked") + public Vertex newVertexInstance(Class vertexClass, + Configuration conf) { + return (Vertex) ReflectionUtils.newInstance(vertexClass, conf); } } Index: graph/src/main/java/org/apache/hama/graph/Vertex.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/Vertex.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/Vertex.java (working copy) @@ -23,9 +23,12 @@ import java.util.LinkedList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.Partitioner; @@ -45,8 +48,10 @@ * @param Edge cost object type * @param Vertex value object type */ -public abstract class Vertex - implements VertexInterface, Writable { +public abstract class Vertex, E extends Writable, M extends Writable> + implements VertexInterface, WritableComparable> { + + private static final Log LOG = LogFactory.getLog(Vertex.class); GraphJobRunner runner; @@ -181,7 +186,7 @@ /** * Gives access to the BSP primitives and additional features by a peer. */ - public BSPPeer getPeer() { + public BSPPeer getPeer() { return runner.getPeer(); } @@ -218,6 +223,9 @@ @Override public boolean equals(Object obj) { + + LOG.info("Comparing " + vertexID + " with " + obj); + if (this == obj) return true; if (obj == null) @@ -230,6 +238,8 @@ return false; } else if (!vertexID.equals(other.vertexID)) return false; + LOG.info("True " + vertexID + " with " + other.vertexID); + return true; } @@ -238,8 +248,18 @@ return getVertexID() + (getValue() != null ? " = " + getValue() : "") + " // " + edges; } + + @Override + public int compareTo(Vertex o) { + int val = this.vertexID.compareTo(o.vertexID); + LOG.info("Returning " + val + " " + vertexID + " with " + o); + + return val; + } + + @Override public void readFields(DataInput in) throws IOException { if (in.readBoolean()) { if (vertexID == null) { Index: graph/src/main/java/org/apache/hama/graph/VertexInputReader.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/VertexInputReader.java (working copy) @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.bsp.BSPPeer; import org.apache.hama.bsp.Partitioner; import org.apache.hama.bsp.PartitioningRunner.RecordConverter; @@ -30,7 +32,7 @@ /** * A reader to read Hama's input files and parses a vertex out of it. */ -public abstract class VertexInputReader +public abstract class VertexInputReader, E extends Writable, M extends Writable> implements RecordConverter { private static final Log LOG = LogFactory.getLog(VertexInputReader.class); @@ -52,8 +54,9 @@ Class> vertexClass = (Class>) conf .getClass(GraphJob.VERTEX_CLASS_ATTR, Vertex.class); boolean vertexCreation; - Vertex vertex = GraphJobRunner - .newVertexInstance(vertexClass, conf); + Vertex vertex = (Vertex) ReflectionUtils.newInstance( + vertexClass, conf); + try { vertexCreation = parseVertex((KEYIN) inputRecord.getKey(), (VALUEIN) inputRecord.getValue(), vertex); Index: graph/src/main/java/org/apache/hama/graph/VertexInterface.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VertexInterface.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/VertexInterface.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** * The vertex interface. @@ -33,7 +34,7 @@ * edge. * @param the type used for messaging, usually the value of a vertex. */ -public interface VertexInterface { +public interface VertexInterface, E extends Writable, M extends Writable> { /** * Used to setup a vertex. Index: graph/src/main/java/org/apache/hama/graph/VerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (revision 1447032) +++ graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (working copy) @@ -20,10 +20,20 @@ import java.io.DataInput; import java.io.DataOutput; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.bsp.BSPPeer; +import org.apache.hama.bsp.message.BSPMessageCoupledStorage; /** * VerticesInfo encapsulates the storage of vertices in a BSP Task. @@ -32,11 +42,159 @@ * @param Edge cost object type * @param Vertex value object type */ -public class VerticesInfo - implements Iterable> { +public class VerticesInfo, E extends Writable, M extends Writable> + implements Iterable>, + BSPMessageCoupledStorage> { - private final List> vertices = new ArrayList>(100); + private static final Log LOG = LogFactory.getLog(VerticesInfo.class); + private final List> vertices = new ArrayList>( + 100); + private VertexIndexedMessageStore messageStorage; + private Vertex tmpVertex; + + public static class VertexComparator, E extends Writable, M extends Writable> + implements Comparator> { + + @Override + public int compare(Vertex o1, Vertex o2) { + return o1.getVertexID().compareTo(o2.getVertexID()); + } + + } + + public static class VertexMappedMessageEntry, E extends Writable, M extends Writable> { + + Vertex vertex; + List list; + + @SuppressWarnings("unchecked") + public void setTuple(Vertex vertex, List list) { + this.vertex = vertex; + this.list = (list == null) ? Collections.EMPTY_LIST : list; + } + + public Vertex getVertex() { + return vertex; + } + + public List getMessages() { + return list; + } + + } + + public static class VertexMessageMappedIterator, E extends Writable, M extends Writable> + implements Iterator> { + + List> messageMap; + List> vertices; + private int currentIndex; + VertexMappedMessageEntry mappedEntry; + + VertexMessageMappedIterator(List> vertices, + List> messageMap) { + this.vertices = vertices; + this.messageMap = messageMap; + currentIndex = 0; + mappedEntry = new VertexMappedMessageEntry(); + } + + @Override + public boolean hasNext() { + return currentIndex < this.vertices.size(); + } + + @Override + public VertexMappedMessageEntry next() { + mappedEntry.setTuple(vertices.get(currentIndex), + messageMap.get(currentIndex)); + ++currentIndex; + return mappedEntry; + } + + @Override + public void remove() { + } + + } + + public static class VertexIndexedMessageStore, E extends Writable, M extends Writable> { + + List> vertices; + private Vertex tmpVertex; + private List> messageMap; + private List> nextMessageMap; + + public VertexIndexedMessageStore(List> vertices, + Vertex tmpVertex) { + this.vertices = vertices; + this.tmpVertex = tmpVertex; + this.messageMap = new ArrayList>(vertices.size()); + this.nextMessageMap = new ArrayList>(vertices.size()); + for (int i = 0; i < vertices.size(); ++i) { + messageMap.add(null); + nextMessageMap.add(null); + } + } + + @SuppressWarnings("unchecked") + public void addMessage(GraphJobMessage message, + BSPPeer> peer) { + V vertexId = (V) message.getVertexId(); + //tmpVertex.setVertexID(vertexId); + int index = -1; + synchronized(vertices){ + tmpVertex.setVertexID(vertexId); + index = Collections.binarySearch(this.vertices, tmpVertex); + } + if (index < 0) { + LOG.error("Message reached a non-existent vertice at: " + + vertexId.toString()); + return; + } + List list = nextMessageMap.get(index); + if (list == null) { + list = new LinkedList(); + nextMessageMap.set(index, list); + } + list.add((M) message.getVertexValue()); + } + + public void syncCompleted() { + switchQueues(); + } + + public void switchQueues() { + List> tmpMessageMap = messageMap; + messageMap = nextMessageMap; + nextMessageMap = tmpMessageMap; + } + + public void clearMessages() { + for (List list : this.messageMap) { + if (list != null) + list.clear(); + } + } + + public Iterator> getMappedIterator() { + return new VertexMessageMappedIterator(this.vertices, + this.messageMap); + } + } + + public Iterator> getVertexMappedMessageIterator() { + return messageStorage.getMappedIterator(); + } + + public void init(Configuration conf) { + @SuppressWarnings("unchecked") + Class> vertexClass = (Class>) conf + .getClass("hama.graph.vertex.class", Vertex.class); + tmpVertex = ReflectionUtils.newInstance(vertexClass, null); + } + public void addVertex(Vertex vertex) { int i = 0; for (Vertex check : this) { @@ -87,4 +245,37 @@ public void saveState(DataOutput out) { } + + public void loadComplete() { + Collections.sort(this.vertices, new VertexComparator()); + messageStorage = new VertexIndexedMessageStore(vertices, tmpVertex); + } + + @Override + public boolean isValidToStore(GraphJobMessage message, + BSPPeer> peer) { + return message.isVertexMessage(); + } + + @Override + public boolean consumeMessage(GraphJobMessage message, + BSPPeer> peer) { + if (!message.isVertexMessage()) + return false; + messageStorage.addMessage(message, peer); + return true; + } + + @Override + public void notifySyncComplete() { + switchQueues(); + } + + public void clearCurrentMessages() { + messageStorage.clearMessages(); + } + + public void switchQueues() { + messageStorage.switchQueues(); + } }