From 1bf703c10d648a512906ce78e7efda0fa57c4fc1 Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Tue, 16 Jul 2013 15:29:09 +0200 Subject: [PATCH 01/11] HAMA-732 - first version of the integration with Apache DM --- graph/pom.xml | 10 ++++++++++ .../test/java/org/apache/hama/graph/TestSubmitGraphJob.java | 1 + 2 files changed, 11 insertions(+) diff --git a/graph/pom.xml b/graph/pom.xml index 53bc7cd..b8a7545 100644 --- a/graph/pom.xml +++ b/graph/pom.xml @@ -37,6 +37,16 @@ ${project.version} + org.apache.directmemory + directmemory-cache + 0.2-SNAPSHOT + + + org.apache.directmemory + directmemory-kryo + 0.2-SNAPSHOT + + org.apache.hama hama-core ${project.version} diff --git a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java index 2d73985..2a554fc 100644 --- a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java +++ b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java @@ -60,6 +60,7 @@ public void testSubmitJob() throws Exception { BSPJobClient jobClient = new BSPJobClient(configuration); configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); configuration.set("hama.graph.self.ref", "true"); + configuration.setClass("hama.graph.vertices.info", OffHeapVerticesInfo.class, VerticesInfo.class); ClusterStatus cluster = jobClient.getClusterStatus(false); assertEquals(this.numOfGroom, cluster.getGroomServers()); LOG.info("Client finishes execution job."); -- 1.8.4 From 41f229bcaecc63b35e9048e63d187c477737e22e Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Tue, 16 Jul 2013 15:32:51 +0200 Subject: [PATCH 02/11] HAMA-732 - first version of the integration with Apache DM --- .../org/apache/hama/graph/OffHeapVerticesInfo.java | 148 ++++++++++++++++++++ .../apache/hama/graph/TestOffHeapVerticesInfo.java | 151 +++++++++++++++++++++ 2 files changed, 299 insertions(+) create mode 100644 graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java create mode 100644 graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java diff --git a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java new file mode 100644 index 0000000..d87f47b --- /dev/null +++ b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.directmemory.DirectMemory; +import org.apache.directmemory.cache.CacheService; +import org.apache.directmemory.memory.Pointer; +import org.apache.directmemory.serialization.Serializer; +import org.apache.directmemory.serialization.kryo.KryoSerializer; +import org.apache.directmemory.utils.CacheValuesIterable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.util.ReflectionUtils; + +/** + * An off heap version of a {@link org.apache.hama.graph.Vertex} storage. + */ +public class OffHeapVerticesInfo + implements VerticesInfo { + + public static final String DM_STRICT_ITERATOR = "dm.iterator.strict"; + public static final String DM_BUFFERS = "dm.buffers"; + public static final String DM_SIZE = "dm.size"; + public static final String DM_CAPACITY = "dm.capacity"; + public static final String DM_CONCURRENCY = "dm.concurrency"; + public static final String DM_DISPOSAL_TIME = "dm.disposal.time"; + public static final String DM_SERIALIZER = "dm.serializer"; + public static final String DM_SORTED = "dm.sorted"; + + private CacheService> vertices; + + private boolean strict; + + @Override + public void init(GraphJobRunner runner, Configuration conf, TaskAttemptID attempt) throws IOException { + this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true); + DirectMemory> dm = new DirectMemory>() + .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100)) + .setSize(conf.getInt(DM_SIZE, 102400)) + .setSerializer(ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER, KryoSerializer.class, Serializer.class))) + .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000)); + if (conf.getBoolean(DM_SORTED, true)) { + dm.setMap(new ConcurrentSkipListMap>>()); + } else { + dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000)) + .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10)); + } + + this.vertices = dm.newCacheService(); + } + + @Override + public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException { + vertices.dump(); + } + + public void addVertex(Vertex vertex) { + vertices.put(vertex.getVertexID(), vertex); + } + + @Override + public void finishAdditions() { + vertices.collectAll(); + } + + @Override + public void startSuperstep() throws IOException { + } + + @Override + public void finishSuperstep() throws IOException { + } + + @Override + public void finishVertexComputation(Vertex vertex) throws IOException { + } + + @Override + public boolean isFinishedAdditions() { + return false; + } + + public void clear() { + vertices.clear(); + } + + public int size() { + return (int) this.vertices.entries(); + } + + @Override + public IDSkippingIterator skippingIterator() { + final Iterator> vertexIterator = + new CacheValuesIterable>(vertices, strict).iterator(); + + return new IDSkippingIterator() { + int currentIndex = 0; + + Vertex currentVertex = null; + + @Override + public boolean hasNext(V e, + org.apache.hama.graph.IDSkippingIterator.Strategy strat) { + if (currentIndex < vertices.entries()) { + + Vertex next = vertexIterator.next(); + while (!strat.accept(next, e)) { + currentIndex++; + } + currentVertex = next; + return true; + } else { + return false; + } + } + + @Override + public Vertex next() { + currentIndex++; + return currentVertex; + } + + }; + + } + +} diff --git a/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java b/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java new file mode 100644 index 0000000..d4366be --- /dev/null +++ b/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.graph; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.bsp.TaskAttemptID; +import org.apache.hama.graph.example.PageRank.PageRankVertex; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestOffHeapVerticesInfo { + + @Test + public void testOffHeapVerticesInfoLifeCycle() throws Exception { + OffHeapVerticesInfo info = new OffHeapVerticesInfo(); + Configuration conf = new Configuration(); + conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName()); + conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, + NullWritable.class.getName()); + conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName()); + conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName()); + GraphJobRunner. initClasses(conf); + TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0); + try { + ArrayList list = new ArrayList(); + + for (int i = 0; i < 10; i++) { + PageRankVertex v = new PageRankVertex(); + v.setVertexID(new Text(i + "")); + if (i % 2 == 0) { + v.setValue(new DoubleWritable(i * 2)); + } + v.addEdge(new Edge(new Text((10 - i) + ""), null)); + + list.add(v); + } + + info.init(null, conf, attempt); + for (PageRankVertex v : list) { + info.addVertex(v); + } + + info.finishAdditions(); + + assertEquals(10, info.size()); + // no we want to iterate and check if the result can properly be obtained + + int index = 0; + IDSkippingIterator iterator = info + .skippingIterator(); + while (iterator.hasNext()) { + Vertex next = iterator.next(); + PageRankVertex pageRankVertex = list.get(index); + assertEquals(pageRankVertex.getVertexID().toString(), next + .getVertexID().toString()); + if (index % 2 == 0) { + assertEquals((int) next.getValue().get(), index * 2); + } else { + assertNull(next.getValue()); + } + assertEquals(next.isHalted(), false); + // check edges + List> edges = next.getEdges(); + assertEquals(1, edges.size()); + Edge edge = edges.get(0); + assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID() + .toString(), edge.getDestinationVertexID().toString()); + assertNull(edge.getValue()); + + index++; + } + assertEquals(index, list.size()); + info.finishSuperstep(); + // iterate again and compute so vertices change internally + iterator = info.skippingIterator(); + info.startSuperstep(); + while (iterator.hasNext()) { + Vertex next = iterator.next(); + // override everything with constant 2 + next.setValue(new DoubleWritable(2)); + if (Integer.parseInt(next.getVertexID().toString()) == 3) { + next.voteToHalt(); + } + info.finishVertexComputation(next); + } + info.finishSuperstep(); + assertEquals(index, list.size()); + + } finally { + info.cleanup(conf, attempt); + } + + } + + @Test + public void testAdditionWithDefaults() throws Exception { + OffHeapVerticesInfo verticesInfo = + new OffHeapVerticesInfo(); + Configuration conf = new Configuration(); + verticesInfo.init(null, conf, null); + Vertex vertex = new PageRankVertex(); + vertex.setVertexID(new Text("some-id")); + verticesInfo.addVertex(vertex); + assertTrue("added vertex could not be found in the cache", verticesInfo.skippingIterator().hasNext()); + } + + @Test + public void testMassiveAdditionWithDefaults() throws Exception { + OffHeapVerticesInfo verticesInfo = + new OffHeapVerticesInfo(); + Configuration conf = new Configuration(); + verticesInfo.init(null, conf, null); + assertEquals("vertices info size should be 0 at startup", 0, verticesInfo.size()); + Random r = new Random(); + int i = 10000; + for (int n = 0; n < i; n++) { + Vertex vertex = new PageRankVertex(); + vertex.setVertexID(new Text(String.valueOf(r.nextFloat()))); + vertex.setValue(new DoubleWritable(r.nextDouble())); + verticesInfo.addVertex(vertex); + } + verticesInfo.finishAdditions(); + assertEquals("vertices info size is not correct", i, verticesInfo.size()); + } + +} -- 1.8.4 From dc1e9c9179c566a9963c0e6c8ed7b4fcb16d29ce Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Wed, 17 Jul 2013 09:09:23 +0200 Subject: [PATCH 03/11] HAMA-732 - fixed test for multiple additions --- .../src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java b/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java index d4366be..1a0660c 100644 --- a/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java +++ b/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java @@ -45,7 +45,7 @@ public void testOffHeapVerticesInfoLifeCycle() throws Exception { conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName()); conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName()); GraphJobRunner. initClasses(conf); - TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0); + TaskAttemptID attempt = new TaskAttemptID("123", 1, 1, 0); try { ArrayList list = new ArrayList(); @@ -140,7 +140,7 @@ public void testMassiveAdditionWithDefaults() throws Exception { int i = 10000; for (int n = 0; n < i; n++) { Vertex vertex = new PageRankVertex(); - vertex.setVertexID(new Text(String.valueOf(r.nextFloat()))); + vertex.setVertexID(new Text(String.valueOf(r.nextInt()))); vertex.setValue(new DoubleWritable(r.nextDouble())); verticesInfo.addVertex(vertex); } -- 1.8.4 From 45b44a6f03bf64a4f864791791c9b9c19ae0295a Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Wed, 17 Jul 2013 09:15:12 +0200 Subject: [PATCH 04/11] HAMA-732 - collectAll shouldn't be call upon finishAdditions --- graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java | 1 - 1 file changed, 1 deletion(-) diff --git a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java index d87f47b..8d413b4 100644 --- a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java @@ -81,7 +81,6 @@ public void addVertex(Vertex vertex) { @Override public void finishAdditions() { - vertices.collectAll(); } @Override -- 1.8.4 From 810f6a3637d6e9f16b9f22d1ca3222f95557732e Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Wed, 17 Jul 2013 09:25:29 +0200 Subject: [PATCH 05/11] HAMA-732 - removed unused API from VerticesInfo, added randomized VI initialization in TestSubmitGraphJob --- graph/pom.xml | 3 +++ .../org/apache/hama/graph/DiskVerticesInfo.java | 9 ++------- .../org/apache/hama/graph/ListVerticesInfo.java | 7 +------ .../org/apache/hama/graph/OffHeapVerticesInfo.java | 8 +++----- .../java/org/apache/hama/graph/VerticesInfo.java | 5 ----- .../org/apache/hama/graph/TestSubmitGraphJob.java | 22 +++++++++++++++++++++- 6 files changed, 30 insertions(+), 24 deletions(-) diff --git a/graph/pom.xml b/graph/pom.xml index b8a7545..263f7c4 100644 --- a/graph/pom.xml +++ b/graph/pom.xml @@ -60,6 +60,9 @@ maven-surefire-plugin + + + diff --git a/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java index b2cd16c..d28bbe3 100644 --- a/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java @@ -17,8 +17,6 @@ */ package org.apache.hama.graph; -import static com.google.common.base.Preconditions.checkArgument; - import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; @@ -36,6 +34,8 @@ import org.apache.hama.bsp.TaskAttemptID; import org.apache.hama.graph.IDSkippingIterator.Strategy; +import static com.google.common.base.Preconditions.checkArgument; + @SuppressWarnings("rawtypes") public final class DiskVerticesInfo implements VerticesInfo { @@ -178,11 +178,6 @@ public void finishAdditions() { } @Override - public boolean isFinishedAdditions() { - return lockedAdditions; - } - - @Override public void startSuperstep() throws IOException { index = 0; String softGraphFileName = getSoftGraphFileName(rootPath, currentStep); diff --git a/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java index 0b0ae45..726c0c4 100644 --- a/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java @@ -92,12 +92,7 @@ public void finishAdditions() { Collections.sort(vertices); } - @Override - public boolean isFinishedAdditions() { - return false; - } - - @Override + @Override public void finishSuperstep() { } diff --git a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java index 8d413b4..fc148d2 100644 --- a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java @@ -52,6 +52,8 @@ private boolean strict; + private boolean finishedAdditions; + @Override public void init(GraphJobRunner runner, Configuration conf, TaskAttemptID attempt) throws IOException { this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true); @@ -81,6 +83,7 @@ public void addVertex(Vertex vertex) { @Override public void finishAdditions() { + finishedAdditions = true; } @Override @@ -95,11 +98,6 @@ public void finishSuperstep() throws IOException { public void finishVertexComputation(Vertex vertex) throws IOException { } - @Override - public boolean isFinishedAdditions() { - return false; - } - public void clear() { vertices.clear(); } diff --git a/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java index 0e1cf74..9ed36c1 100644 --- a/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java @@ -75,11 +75,6 @@ public void finishVertexComputation(Vertex vertex) throws IOException; /** - * @return true of all vertices are added. - */ - public boolean isFinishedAdditions(); - - /** * @return the number of vertices added to the underlying structure. * Implementations should take care this is a constant time operation. */ diff --git a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java index 2a554fc..04f2637 100644 --- a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java +++ b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java @@ -18,6 +18,9 @@ package org.apache.hama.graph; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -36,6 +39,7 @@ import org.apache.hama.bsp.TextArrayWritable; import org.apache.hama.graph.example.PageRank; import org.apache.hama.graph.example.PageRank.PagerankSeqReader; +import org.junit.Before; public class TestSubmitGraphJob extends TestBSPMasterGroomServer { @@ -49,6 +53,16 @@ private static String INPUT = "/tmp/pagerank/real-tmp.seq"; private static String OUTPUT = "/tmp/pagerank/real-out"; + private static final List> vi = new ArrayList>(); + + @Before + public void setUp() throws Exception { + super.setUp(); + vi.add(ListVerticesInfo.class); + vi.add(DiskVerticesInfo.class); + vi.add(OffHeapVerticesInfo.class); + } + @Override public void testSubmitJob() throws Exception { @@ -60,7 +74,7 @@ public void testSubmitJob() throws Exception { BSPJobClient jobClient = new BSPJobClient(configuration); configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000); configuration.set("hama.graph.self.ref", "true"); - configuration.setClass("hama.graph.vertices.info", OffHeapVerticesInfo.class, VerticesInfo.class); + injectVerticesInfo(); ClusterStatus cluster = jobClient.getClusterStatus(false); assertEquals(this.numOfGroom, cluster.getGroomServers()); LOG.info("Client finishes execution job."); @@ -99,6 +113,12 @@ public void testSubmitJob() throws Exception { } } + protected void injectVerticesInfo() { + Class verticesInfoClass = vi.get(new Random().nextInt() % 2); + LOG.info("using vertices info of type : "+verticesInfoClass.getName()); + configuration.setClass("hama.graph.vertices.info", verticesInfoClass, VerticesInfo.class); + } + private void verifyResult() throws IOException { double sum = 0.0; FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*")); -- 1.8.4 From 292d338e7eefc3c4eef2b9589d97dfb43a9a3f8f Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Wed, 17 Jul 2013 09:53:52 +0200 Subject: [PATCH 06/11] HAMA-732 - fixed random VI picking in TestSubmitGraphJob --- graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java index 04f2637..4e9a750 100644 --- a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java +++ b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java @@ -114,7 +114,7 @@ public void testSubmitJob() throws Exception { } protected void injectVerticesInfo() { - Class verticesInfoClass = vi.get(new Random().nextInt() % 2); + Class verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3)); LOG.info("using vertices info of type : "+verticesInfoClass.getName()); configuration.setClass("hama.graph.vertices.info", verticesInfoClass, VerticesInfo.class); } -- 1.8.4 From dd7d29f64c9d3f9b8859a8470c6f606f43744729 Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Fri, 19 Jul 2013 15:42:05 +0200 Subject: [PATCH 07/11] HAMA-732 - first working draft of OffHeapVerticesInfo --- graph/pom.xml | 18 +++++++++++++++--- .../apache/hama/graph/DefaultVertexOutputWriter.java | 3 +++ .../java/org/apache/hama/graph/DiskVerticesInfo.java | 2 +- .../java/org/apache/hama/graph/GraphJobRunner.java | 4 ++-- .../org/apache/hama/graph/OffHeapVerticesInfo.java | 10 +++++++--- graph/src/main/java/org/apache/hama/graph/Vertex.java | 9 ++++++++- .../java/org/apache/hama/graph/TestSubmitGraphJob.java | 7 +++---- 7 files changed, 39 insertions(+), 14 deletions(-) diff --git a/graph/pom.xml b/graph/pom.xml index 263f7c4..0d91282 100644 --- a/graph/pom.xml +++ b/graph/pom.xml @@ -47,6 +47,21 @@ 0.2-SNAPSHOT + org.apache.directmemory + directmemory-msgpack + 0.2-SNAPSHOT + + + org.apache.directmemory + directmemory-protobuf + 0.2-SNAPSHOT + + + org.apache.directmemory + directmemory-protostuff + 0.2-SNAPSHOT + + org.apache.hama hama-core ${project.version} @@ -60,9 +75,6 @@ maven-surefire-plugin - - - diff --git a/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java b/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java index 68f754b..472959b 100644 --- a/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java +++ b/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java @@ -45,6 +45,9 @@ public void setup(Configuration conf) { public void write(Vertex vertex, BSPPeer peer) throws IOException { + assert vertex.getVertexID() != null : "vertex id cannot be null"; + assert vertex.getValue() != null : "vertex value cannot be null"; + System.err.println("vertexid: "+vertex.getVertexID()+" - vertexval: "+vertex.getValue()); peer.write(vertex.getVertexID(), vertex.getValue()); } diff --git a/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java index d28bbe3..1bd5972 100644 --- a/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java @@ -256,7 +256,7 @@ public boolean hasNext(V e, if (cachedVertexInstance == null) { cachedVertexInstance = GraphJobRunner . newVertexInstance(GraphJobRunner.VERTEX_CLASS); - cachedVertexInstance.runner = runner; + cachedVertexInstance.setRunner(runner); } ensureVertexIDNotNull(); } catch (IOException e) { diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java index 2ac3ebd..ca2c62c 100644 --- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java +++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java @@ -402,7 +402,7 @@ private void loadVertices( while ((record = peer.readNext()) != null) { converted = converter.convertRecord(record, conf); vertex = (Vertex) converted.getKey(); - vertex.runner = this; + vertex.setRunner(this); vertex.setup(conf); if (selfReference) { @@ -414,7 +414,7 @@ private void loadVertices( // Reinitializing vertex object for memory based implementations of // VerticesInfo vertex = GraphJobRunner. newVertexInstance(VERTEX_CLASS); - vertex.runner = this; + vertex.setRunner(this); } vertices.finishAdditions(); // finish the "superstep" because we have written a new file here diff --git a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java index fc148d2..29303cf 100644 --- a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java @@ -51,11 +51,11 @@ private CacheService> vertices; private boolean strict; - - private boolean finishedAdditions; + private GraphJobRunner runner; @Override public void init(GraphJobRunner runner, Configuration conf, TaskAttemptID attempt) throws IOException { + this.runner = runner; this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true); DirectMemory> dm = new DirectMemory>() .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100)) @@ -70,6 +70,7 @@ public void init(GraphJobRunner runner, Configuration conf, TaskAttempt } this.vertices = dm.newCacheService(); + } @Override @@ -83,7 +84,6 @@ public void addVertex(Vertex vertex) { @Override public void finishAdditions() { - finishedAdditions = true; } @Override @@ -96,6 +96,7 @@ public void finishSuperstep() throws IOException { @Override public void finishVertexComputation(Vertex vertex) throws IOException { + vertices.put(vertex.getVertexID(), vertex); } public void clear() { @@ -135,6 +136,9 @@ public boolean hasNext(V e, @Override public Vertex next() { currentIndex++; + if (currentVertex.getRunner() == null) { + currentVertex.setRunner(runner); + } return currentVertex; } diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java index b1e63d9..0cd516f 100644 --- a/graph/src/main/java/org/apache/hama/graph/Vertex.java +++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java @@ -50,7 +50,7 @@ public abstract class Vertex implements VertexInterface { - GraphJobRunner runner; + private transient GraphJobRunner runner; private V vertexID; private M value; @@ -342,4 +342,11 @@ public void writeState(DataOutput out) throws IOException { } + protected void setRunner(GraphJobRunner runner) { + this.runner = runner; + } + + protected GraphJobRunner getRunner() { + return runner; + } } diff --git a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java index 4e9a750..7d6fda8 100644 --- a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java +++ b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Random; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -114,9 +113,9 @@ public void testSubmitJob() throws Exception { } protected void injectVerticesInfo() { - Class verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3)); - LOG.info("using vertices info of type : "+verticesInfoClass.getName()); - configuration.setClass("hama.graph.vertices.info", verticesInfoClass, VerticesInfo.class); +// Class verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3)); +// LOG.info("using vertices info of type : "+verticesInfoClass.getName()); + configuration.setClass("hama.graph.vertices.info", OffHeapVerticesInfo.class, VerticesInfo.class); } private void verifyResult() throws IOException { -- 1.8.4 From 9114fa8bb389706c86601e57900d02320cf4f124 Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Fri, 19 Jul 2013 15:49:14 +0200 Subject: [PATCH 08/11] HAMA-732 - first working draft of OffHeapVerticesInfo --- graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java b/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java index 472959b..32c66e6 100644 --- a/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java +++ b/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java @@ -47,7 +47,6 @@ public void write(Vertex vertex, throws IOException { assert vertex.getVertexID() != null : "vertex id cannot be null"; assert vertex.getValue() != null : "vertex value cannot be null"; - System.err.println("vertexid: "+vertex.getVertexID()+" - vertexval: "+vertex.getValue()); peer.write(vertex.getVertexID(), vertex.getValue()); } -- 1.8.4 From 6b386e2d8f1ab526c5274be5d94a8952c37af5f7 Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Mon, 22 Jul 2013 09:30:27 +0200 Subject: [PATCH 09/11] HAMA-732 - readded randomized VI initialization for TestSubmitGraphJob --- graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java index 7d6fda8..4e265b9 100644 --- a/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java +++ b/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -113,9 +114,8 @@ public void testSubmitJob() throws Exception { } protected void injectVerticesInfo() { -// Class verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3)); -// LOG.info("using vertices info of type : "+verticesInfoClass.getName()); - configuration.setClass("hama.graph.vertices.info", OffHeapVerticesInfo.class, VerticesInfo.class); + Class verticesInfoClass = vi.get(Math.abs(new Random().nextInt() % 3)); + LOG.info("using vertices info of type : "+verticesInfoClass.getName()); } private void verifyResult() throws IOException { -- 1.8.4 From 799ed97ac8d6c1238ba10c211386c6b805a4c642 Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Mon, 22 Jul 2013 09:31:18 +0200 Subject: [PATCH 10/11] HAMA-732 - removed other serializers dependency, just let Kryo --- graph/pom.xml | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/graph/pom.xml b/graph/pom.xml index 0d91282..b8a7545 100644 --- a/graph/pom.xml +++ b/graph/pom.xml @@ -47,21 +47,6 @@ 0.2-SNAPSHOT - org.apache.directmemory - directmemory-msgpack - 0.2-SNAPSHOT - - - org.apache.directmemory - directmemory-protobuf - 0.2-SNAPSHOT - - - org.apache.directmemory - directmemory-protostuff - 0.2-SNAPSHOT - - org.apache.hama hama-core ${project.version} -- 1.8.4 From 38db2f4d8d9b3f5e5fc85d9e800f87267b976540 Mon Sep 17 00:00:00 2001 From: Tommaso Teofili Date: Mon, 16 Sep 2013 15:04:11 +0200 Subject: [PATCH 11/11] HAMA-732 - switched to DM 0.2 release --- graph/pom.xml | 4 ++-- .../src/main/java/org/apache/hama/graph/GraphJobRunner.java | 2 +- .../main/java/org/apache/hama/graph/OffHeapVerticesInfo.java | 12 +++++++++++- graph/src/main/java/org/apache/hama/graph/Vertex.java | 2 -- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/graph/pom.xml b/graph/pom.xml index 3368f12..617fadb 100644 --- a/graph/pom.xml +++ b/graph/pom.xml @@ -39,12 +39,12 @@ org.apache.directmemory directmemory-cache - 0.2-SNAPSHOT + 0.2 org.apache.directmemory directmemory-kryo - 0.2-SNAPSHOT + 0.2 org.apache.hama diff --git a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java index 183e549..873db3f 100644 --- a/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java +++ b/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java @@ -453,7 +453,7 @@ private void loadVertices( * @throws IOException */ private void addVertex(Vertex vertex) throws IOException { - vertex.runner = this; + vertex.setRunner(this); vertex.setup(conf); if (conf.getBoolean("hama.graph.self.ref", false)) { diff --git a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java index 29303cf..9a09263 100644 --- a/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java +++ b/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java @@ -36,7 +36,7 @@ /** * An off heap version of a {@link org.apache.hama.graph.Vertex} storage. */ -public class OffHeapVerticesInfo +public class OffHeapVerticesInfo, E extends Writable, M extends Writable> implements VerticesInfo { public static final String DM_STRICT_ITERATOR = "dm.iterator.strict"; @@ -146,4 +146,14 @@ public boolean hasNext(V e, } + @Override + public void removeVertex(V vertexID) { + throw new UnsupportedOperationException ("Not yet implemented"); + } + + @Override + public void finishRemovals() { + throw new UnsupportedOperationException ("Not yet implemented"); + } + } diff --git a/graph/src/main/java/org/apache/hama/graph/Vertex.java b/graph/src/main/java/org/apache/hama/graph/Vertex.java index 057ab71..9ff271b 100644 --- a/graph/src/main/java/org/apache/hama/graph/Vertex.java +++ b/graph/src/main/java/org/apache/hama/graph/Vertex.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -228,7 +227,6 @@ public int getNumPeers() { /** * @return the configured partitioner instance to message vertices. */ - @SuppressWarnings("unchecked") public Partitioner getPartitioner() { return (Partitioner) runner.getPartitioner(); } -- 1.8.4