Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1492639) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -21,10 +21,12 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.util.ReflectionUtils; /** @@ -46,7 +48,16 @@ private WritableComparable vertexId; private Writable vertexValue; private IntWritable verticesSize; + private static GraphJobMessageComparator comparator; + static { + if (comparator == null) { + comparator = new GraphJobMessageComparator(); + } + + WritableComparator.define(GraphJobMessage.class, comparator); + } + public GraphJobMessage() { } @@ -84,6 +95,28 @@ } + public void fastReadFields(DataInput in) throws IOException { + flag = in.readByte(); + if (isVertexMessage()) { + vertexId = GraphJobRunner.createVertexIDObject(); + vertexId.readFields(in); + /* + * vertexValue = GraphJobRunner.createVertexValue(); + * vertexValue.readFields(in); + */ + } else if (isMapMessage()) { + map = new MapWritable(); + map.readFields(in); + } else if (isVerticesSizeMessage()) { + verticesSize = new IntWritable(); + verticesSize.readFields(in); + } else { + vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS, + null); + vertexId.readFields(in); + } + } + @Override public void readFields(DataInput in) throws IOException { flag = in.readByte(); @@ -162,4 +195,41 @@ } } + public static class GraphJobMessageComparator extends WritableComparator { + private final DataInputBuffer buffer; + private final GraphJobMessage key1; + private final GraphJobMessage key2; + + public GraphJobMessageComparator() { + this(GraphJobMessage.class); + } + + protected GraphJobMessageComparator( + Class> keyClass) { + this(keyClass, false); + } + + protected GraphJobMessageComparator( + Class> keyClass, boolean createInstances) { + super(keyClass, createInstances); + key1 = new GraphJobMessage(); + key2 = new GraphJobMessage(); + buffer = new DataInputBuffer(); + } + + public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2, + int s2, int l2) { + try { + buffer.reset(b1, s1, l1); // parse key1 + key1.fastReadFields(buffer); + + buffer.reset(b2, s2, l2); // parse key2 + key2.fastReadFields(buffer); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return compare(key1, key2); // compare them + } + } }