Index: graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java (Revision 0)
+++ graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java (Arbeitskopie)
@@ -0,0 +1,45 @@
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Writable serialization for Hadoop objects with a class cache to reduce the
+ * amount of writing the classname instead of a single integer (with V
+ * compression, so most of the time it just takes a single byte).
+ * Enhanced by graph instance that can be passed.
+ */
+public final class VertexWritableSerialization extends
+ WritableSerialization {
+
+ private static final long serialVersionUID = 1L;
+ @SuppressWarnings("rawtypes")
+ private GraphJobRunner runner;
+
+ public VertexWritableSerialization() {
+ }
+
+ public VertexWritableSerialization(Class> writableClazz,
+ @SuppressWarnings("rawtypes") GraphJobRunner runner) {
+ super(writableClazz);
+ Preconditions
+ .checkArgument(
+ writableClazz.isAssignableFrom(Vertex.class),
+ "Class "
+ + writableClazz
+ + " is not assignable from Vertex class! This class only serializes vertices!");
+ this.runner = runner;
+ }
+
+ @Override
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Writable newInstance() {
+ Writable newInstance = (Writable) ReflectionUtils.newInstance(
+ LOOKUP_LIST.get(writableClassIndex), null);
+ ((Vertex) newInstance).runner = this.runner;
+ return newInstance;
+ }
+
+}
Index: graph/src/main/java/org/apache/hama/graph/GraphJob.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/GraphJob.java (Revision 1385087)
+++ graph/src/main/java/org/apache/hama/graph/GraphJob.java (Arbeitskopie)
@@ -22,6 +22,7 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.Combiner;
@@ -72,7 +73,7 @@
/**
* Set the Vertex ID class for the job.
*/
- public void setVertexIDClass(Class extends Writable> cls)
+ public void setVertexIDClass(Class extends WritableComparable>> cls)
throws IllegalStateException {
conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
}
@@ -129,8 +130,8 @@
}
@Override
- public void setPartitioner(@SuppressWarnings("rawtypes")
- Class extends Partitioner> theClass) {
+ public void setPartitioner(
+ @SuppressWarnings("rawtypes") Class extends Partitioner> theClass) {
super.setPartitioner(theClass);
conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
}
Index: graph/src/main/java/org/apache/hama/graph/WritableComparator.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/WritableComparator.java (Revision 0)
+++ graph/src/main/java/org/apache/hama/graph/WritableComparator.java (Arbeitskopie)
@@ -0,0 +1,21 @@
+package org.apache.hama.graph;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Comparator that uses Writable Comparable instance to compare.
+ */
+public final class WritableComparator>
+ implements Comparator, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int compare(T o1, T o2) {
+ return o1.compareTo(o2);
+ }
+
+}
Index: graph/src/main/java/org/apache/hama/graph/WritableSerialization.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/WritableSerialization.java (Revision 0)
+++ graph/src/main/java/org/apache/hama/graph/WritableSerialization.java (Arbeitskopie)
@@ -0,0 +1,93 @@
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.jdbm.Serializer;
+
+/**
+ * Writable serialization for Hadoop objects with a class cache to reduce the
+ * amount of writing the classname instead of a single integer (with V
+ * compression, so most of the time it just takes a single byte).
+ */
+public class WritableSerialization implements Serializer,
+ Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // clazzname as string -> index in the lookuplist
+ protected static final HashMap CLAZZ_CACHE = new HashMap();
+ protected static final ArrayList> LOOKUP_LIST = new ArrayList>();
+ private static int lastAssigned = 0;
+
+ protected transient Writable instance;
+ protected transient int writableClassIndex;
+
+ public WritableSerialization() {
+ }
+
+ public WritableSerialization(Class> writableClazz) {
+ Integer integer = CLAZZ_CACHE.get(writableClazz);
+ if (integer == null) {
+ integer = lastAssigned++;
+ CLAZZ_CACHE.put(writableClazz.getName(), integer);
+ LOOKUP_LIST.add(writableClazz);
+ }
+ this.writableClassIndex = integer;
+ }
+
+ @Override
+ public void serialize(DataOutput out, K obj) throws IOException {
+ WritableUtils.writeVInt(out, writableClassIndex);
+ obj.write(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public K deserialize(DataInput in) throws IOException,
+ ClassNotFoundException {
+ writableClassIndex = WritableUtils.readVInt(in);
+ instance = newInstance();
+ instance.readFields(in);
+ return (K) instance;
+ }
+
+ public Writable newInstance() {
+ return (Writable) ReflectionUtils.newInstance(
+ LOOKUP_LIST.get(writableClassIndex), null);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((instance == null) ? 0 : instance.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ @SuppressWarnings("rawtypes")
+ WritableSerialization other = (WritableSerialization) obj;
+ if (instance == null) {
+ if (other.instance != null)
+ return false;
+ } else if (!instance.equals(other.instance))
+ return false;
+ return true;
+ }
+
+}
Index: graph/src/main/java/org/apache/hama/graph/Vertex.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/Vertex.java (Revision 1385087)
+++ graph/src/main/java/org/apache/hama/graph/Vertex.java (Arbeitskopie)
@@ -17,6 +17,8 @@
*/
package org.apache.hama.graph;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -24,11 +26,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Partitioner;
public abstract class Vertex
- implements VertexInterface {
+ implements VertexInterface, Writable {
GraphJobRunner, ?, ?> runner;
@@ -218,7 +222,64 @@
return true;
}
+ @SuppressWarnings("unchecked")
@Override
+ public void readFields(DataInput in) throws IOException {
+ votedToHalt = in.readBoolean();
+ vertexID = (V) ReflectionUtils.newInstance(runner.vertexIdClass, null);
+ vertexID.readFields(in);
+ if (in.readBoolean()) {
+ value = (M) ReflectionUtils.newInstance(runner.vertexValueClass, null);
+ }
+
+ int edges = WritableUtils.readVInt(in);
+ ArrayList> list = new ArrayList>(edges);
+ for (int i = 0; i < edges; i++) {
+ V adjacentId = (V) ReflectionUtils
+ .newInstance(runner.vertexIdClass, null);
+ adjacentId.readFields(in);
+ E edgeValue = null;
+ if (in.readBoolean()) {
+ edgeValue = (E) ReflectionUtils
+ .newInstance(runner.edgeValueClass, null);
+ edgeValue.readFields(in);
+ }
+ list.add(new Edge(adjacentId, edgeValue));
+ }
+
+ this.setEdges(list);
+
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(votedToHalt);
+ V vId = getVertexID();
+ vId.write(out);
+ M val = getValue();
+ serializeNull(out, val);
+
+ List> edges = getEdges();
+ int length = edges == null ? 0 : edges.size();
+ WritableUtils.writeVInt(out, length);
+ for (Edge edge : edges) {
+ edge.getDestinationVertexID().write(out);
+ serializeNull(out, edge.getValue());
+ }
+
+ }
+
+ private static void serializeNull(DataOutput out, Writable writable)
+ throws IOException {
+ if (writable == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ writable.write(out);
+ }
+ }
+
+ @Override
public String toString() {
return getVertexID() + (getValue() != null ? " = " + getValue() : "")
+ " // " + edges;
Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (Revision 1385087)
+++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (Arbeitskopie)
@@ -34,6 +34,7 @@
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
@@ -42,6 +43,8 @@
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
+import org.apache.jdbm.DB;
+import org.apache.jdbm.DBMaker;
/**
* Fully generic graph job runner.
@@ -50,7 +53,7 @@
* @param the value type of an edge.
* @param the value type of a vertex.
*/
-public final class GraphJobRunner
+public final class GraphJobRunner, E extends Writable, M extends Writable>
extends BSP {
private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
@@ -68,7 +71,7 @@
private Combiner combiner;
private Partitioner partitioner;
- private Map> vertices = new HashMap>();
+ private Map> vertices;
private boolean updated = true;
private int globalUpdateCounts = 0;
@@ -78,15 +81,17 @@
private int maxIteration = -1;
private long iteration;
- private Class vertexIdClass;
- private Class vertexValueClass;
- private Class edgeValueClass;
- private Class> vertexClass;
+ Class vertexIdClass;
+ Class vertexValueClass;
+ Class edgeValueClass;
+ Class> vertexClass;
private AggregationRunner aggregationRunner;
private BSPPeer peer;
+ private DB db;
+
@Override
public final void setup(
BSPPeer peer)
@@ -138,6 +143,8 @@
for (Entry> e : vertices.entrySet()) {
peer.write(e.getValue().getVertexID(), e.getValue().getValue());
}
+
+ db.close();
}
/**
@@ -171,7 +178,9 @@
BSPPeer peer)
throws IOException {
int activeVertices = 0;
- for (Vertex vertex : vertices.values()) {
+ Set keySet = vertices.keySet();
+ for (V key : keySet) {
+ Vertex vertex = vertices.get(key);
List msgs = messages.get(vertex.getVertexID());
// If there are newly received messages, restart.
if (vertex.isHalted() && msgs != null) {
@@ -207,7 +216,9 @@
private void doInitialSuperstep(
BSPPeer peer)
throws IOException {
- for (Vertex vertex : vertices.values()) {
+ Set keySet = vertices.keySet();
+ for (V key : keySet) {
+ Vertex vertex = vertices.get(key);
List singletonList = Collections.singletonList(vertex.getValue());
M lastValue = vertex.getValue();
vertex.compute(singletonList.iterator());
@@ -254,6 +265,20 @@
conf);
}
+ String storagePath = conf.get("hama.graph.storage.path");
+ if (storagePath == null) {
+ storagePath = "/tmp/graph_storage/";
+ }
+
+ db = DBMaker
+ .openFile(storagePath + peer.getTaskId().toString() + "/graph.db")
+ .disableLocking().disableTransactions().deleteFilesAfterClose()
+ .useRandomAccessFile().make();
+
+ vertices = db.createTreeMap("graph-db", new WritableComparator(),
+ new WritableSerialization(vertexIdClass),
+ new VertexWritableSerialization>(vertexClass, this));
+
aggregationRunner = new AggregationRunner();
aggregationRunner.setupAggregators(peer);
}
Index: graph/src/main/java/org/apache/jdbm/Utils.java
===================================================================
--- graph/src/main/java/org/apache/jdbm/Utils.java (Revision 0)
+++ graph/src/main/java/org/apache/jdbm/Utils.java (Arbeitskopie)
@@ -0,0 +1,106 @@
+package org.apache.jdbm;
+
+import javax.crypto.Cipher;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * Various utilities used in JDBM
+ */
+class Utils {
+
+ /**
+ * empty string is used as dummy value to represent null values in HashSet and TreeSet
+ */
+ static final String EMPTY_STRING = "";
+
+
+
+ public static byte[] encrypt(Cipher cipherIn, ByteBuffer b) {
+ if(cipherIn==null && b.hasArray())
+ return b.array();
+ byte[] bb = new byte[Storage.PAGE_SIZE];
+ b.rewind();
+ b.get(bb,0,Storage.PAGE_SIZE);
+ return encrypt(cipherIn,bb);
+ }
+
+ public static byte[] encrypt(Cipher cipherIn, byte[] b) {
+ if (cipherIn == null)
+ return b;
+
+ try {
+ return cipherIn.doFinal(b);
+ } catch (Exception e) {
+ throw new IOError(e);
+ }
+
+ }
+
+
+ /**
+ * Compares comparables. Default comparator for most of java types
+ */
+ static final Comparator COMPARABLE_COMPARATOR = new Comparator() {
+ public int compare(Comparable o1, Comparable o2) {
+ return o1 == null && o2 != null ? -1 : (o1 != null && o2 == null ? 1 : o1.compareTo(o2));
+ }
+ };
+
+
+ static String formatSpaceUsage(long size) {
+ if (size < 1e4)
+ return size + "B";
+ else if (size < 1e7)
+ return "" + Math.round(1D * size / 1024D) + "KB";
+ else if (size < 1e10)
+ return "" + Math.round(1D * size / 1e6) + "MB";
+ else
+ return "" + Math.round(1D * size / 1e9) + "GB";
+ }
+
+
+ static boolean allZeros(byte[] b) {
+ for (int i = 0; i < b.length; i++) {
+ if (b[i] != 0) return false;
+ }
+ return true;
+ }
+
+
+ static E max(E e1, E e2, Comparator comp){
+ if(e1 == null) return e2;
+ if(e2 == null) return e1;
+
+ if(comp == null)
+ comp = COMPARABLE_COMPARATOR;
+ return comp.compare(e1,e2)<0 ? e2:e1;
+ }
+
+ static E min(E e1, E e2, Comparator comp){
+ if(e1 == null) return e2;
+ if(e2 == null) return e1;
+
+ if(comp == null)
+ comp = COMPARABLE_COMPARATOR;
+
+ return comp.compare(e1,e2)>0 ? e2:e1;
+ }
+
+
+ static final Serializer