Index: graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java =================================================================== --- graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (revision 0) +++ graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java (revision 0) @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.graph.example.PageRank.PageRankVertex; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestOffHeapVerticesInfo { + + @Test + public void testOffHeapVerticesInfoLifeCycle() throws Exception { + OffHeapVerticesInfo info = new OffHeapVerticesInfo(); + Configuration conf = new Configuration(); + conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName()); + conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, + NullWritable.class.getName()); + conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName()); + conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName()); + GraphJobRunner. initClasses(conf); + TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0); + try { + ArrayList list = new ArrayList(); + + for (int i = 0; i < 10; i++) { + PageRankVertex v = new PageRankVertex(); + v.setVertexID(new Text(i + "")); + if (i % 2 == 0) { + v.setValue(new DoubleWritable(i * 2)); + } + v.addEdge(new Edge(new Text((10 - i) + ""), null)); + + list.add(v); + } + + info.init(null, conf, attempt); + for (PageRankVertex v : list) { + info.addVertex(v); + } + + info.finishAdditions(); + + assertEquals(10, info.size()); + // no we want to iterate and check if the result can properly be obtained + + int index = 0; + IDSkippingIterator iterator = info + .skippingIterator(); + while (iterator.hasNext()) { + Vertex next = iterator.next(); + PageRankVertex pageRankVertex = list.get(index); + assertEquals(pageRankVertex.getVertexID().toString(), next + .getVertexID().toString()); + if (index % 2 == 0) { + assertEquals((int) next.getValue().get(), index * 2); + } else { + assertNull(next.getValue()); + } + assertEquals(next.isHalted(), false); + // check edges + List> edges = next.getEdges(); + assertEquals(1, edges.size()); + Edge edge = edges.get(0); + assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID() + .toString(), edge.getDestinationVertexID().toString()); + assertNull(edge.getValue()); + + index++; + } + assertEquals(index, list.size()); + info.finishSuperstep(); + // iterate again and compute so vertices change internally + iterator = info.skippingIterator(); + info.startSuperstep(); + while (iterator.hasNext()) { + Vertex next = iterator.next(); + // override everything with constant 2 + next.setValue(new DoubleWritable(2)); + if (Integer.parseInt(next.getVertexID().toString()) == 3) { + next.voteToHalt(); + } + info.finishVertexComputation(next); + } + info.finishSuperstep(); + assertEquals(index, list.size()); + + } finally { + info.cleanup(conf, attempt); + } + + } + + @Test + public void testAdditionWithDefaults() throws Exception { + OffHeapVerticesInfo verticesInfo = + new OffHeapVerticesInfo(); + Configuration conf = new Configuration(); + verticesInfo.init(null, conf, null); + Vertex vertex = new PageRankVertex(); + vertex.setVertexID(new Text("some-id")); + verticesInfo.addVertex(vertex); + assertTrue("added vertex could not be found in the cache", verticesInfo.skippingIterator().hasNext()); + } + +} Property changes on: graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java ___________________________________________________________________ Added: svn:eol-style + native Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1501673) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -357,7 +357,8 @@ aggregationRunner.setupAggregators(peer); // FIXME We should make this configurable. - vertices = new ListVerticesInfo(); + Class> verticesInfoClass = (Class>) conf.getClass("vertices.info", ListVerticesInfo.class, VerticesInfo.class); + vertices = ReflectionUtils.newInstance(verticesInfoClass); vertices.init(this, conf, peer.getTaskId()); } Index: graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (revision 0) +++ graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (revision 0) @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.directmemory.DirectMemory; +import org.apache.directmemory.cache.CacheService; +import org.apache.directmemory.memory.Pointer; +import org.apache.directmemory.serialization.kryo.KryoSerializer; +import org.apache.directmemory.utils.CacheValuesIterable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.TaskAttemptID; + +/** + * An off heap version of a {@link org.apache.hama.graph.Vertex} storage. + */ +public class OffHeapVerticesInfo + implements VerticesInfo { + + public static final String DM_STRICT_ITERATOR = "dm.iterator.strict"; + public static final String DM_BUFFERS = "dm.buffers"; + public static final String DM_SIZE = "dm.size"; + public static final String DM_CAPACITY = "dm.capacity"; + public static final String DM_CONCURRENCY = "dm.concurrency"; + public static final String DM_DISPOSAL_TIME = "dm.disposal.time"; + public static final String DM_SERIALIZER = "dm.serializer"; + + private CacheService> vertices; + + private boolean strict; + + @Override + public void init(GraphJobRunner runner, Configuration conf, TaskAttemptID attempt) throws IOException { + this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true); + this.vertices = new DirectMemory>() + .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 10)) + .setSize(conf.getInt(DM_SIZE, 102400)) + .setInitialCapacity(conf.getInt(DM_CAPACITY, 1000)) + .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10)) + .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 360000)) + .setSerializer(new KryoSerializer()) + .setMap(new ConcurrentSkipListMap>>()) + .newCacheService(); + } + + @Override + public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException { + } + + public void addVertex(Vertex vertex) { + vertices.put(vertex.getVertexID(), vertex); + } + + @Override + public void finishAdditions() { + + } + + @Override + public void startSuperstep() throws IOException { + } + + @Override + public void finishSuperstep() throws IOException { + } + + @Override + public void finishVertexComputation(Vertex vertex) throws IOException { + } + + @Override + public boolean isFinishedAdditions() { + return false; + } + + public void clear() { + vertices.clear(); + } + + public int size() { + return (int) this.vertices.entries(); + } + + @Override + public IDSkippingIterator skippingIterator() { + final Iterator> vertexIterator = + new CacheValuesIterable>(vertices, strict).iterator(); + + return new IDSkippingIterator() { + int currentIndex = 0; + + Vertex currentVertex = null; + + @Override + public boolean hasNext(V e, + org.apache.hama.graph.IDSkippingIterator.Strategy strat) { + if (currentIndex < vertices.entries()) { + + Vertex next = vertexIterator.next(); + while (!strat.accept(next, e)) { + currentIndex++; + } + currentVertex = next; + return true; + } else { + return false; + } + } + + @Override + public Vertex next() { + currentIndex++; + return currentVertex; + } + + }; + + } + +} Property changes on: graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java ___________________________________________________________________ Added: svn:eol-style + native Index: graph/pom.xml =================================================================== --- graph/pom.xml (revision 1501673) +++ graph/pom.xml (working copy) @@ -37,6 +37,16 @@ ${project.version} + org.apache.directmemory + directmemory-cache + 0.2-SNAPSHOT + + + org.apache.directmemory + directmemory-kryo + 0.2-SNAPSHOT + + org.apache.hama hama-core ${project.version}