Description
Currently in PageRankBenchmark we have the code for pagerank duplicated in each of the implementations of Vertex:
public static class PageRankHashMapVertex extends HashMapVertex< LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { @Override public void compute(Iterator<DoubleWritable> msgIterator) { if (getSuperstep() >= 1) { double sum = 0; while (msgIterator.hasNext()) { sum += msgIterator.next().get(); } DoubleWritable vertexValue = new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum); setVertexValue(vertexValue); } if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) { long edges = getNumOutEdges(); sendMsgToAllEdges( new DoubleWritable(getVertexValue().get() / edges)); } else { voteToHalt(); } } } public static class PageRankEdgeListVertex extends EdgeListVertex< LongWritable, DoubleWritable, DoubleWritable, DoubleWritable> { @Override public void compute(Iterator<DoubleWritable> msgIterator) { if (getSuperstep() >= 1) { double sum = 0; while (msgIterator.hasNext()) { sum += msgIterator.next().get(); } DoubleWritable vertexValue = new DoubleWritable((0.15f / getNumVertices()) + 0.85f * sum); setVertexValue(vertexValue); } if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) { long edges = getNumOutEdges(); sendMsgToAllEdges( new DoubleWritable(getVertexValue().get() / edges)); } else { voteToHalt(); } } }
This code can be consolidated into private class and the two implementations just extend that.