Index: core/src/main/java/org/apache/hama/util/ByteUtils.java
===================================================================
--- core/src/main/java/org/apache/hama/util/ByteUtils.java (revision 0)
+++ core/src/main/java/org/apache/hama/util/ByteUtils.java (working copy)
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+/**
+ * Utilities class for byte operations and constants
+ */
+public class ByteUtils {
+ /** Bytes used in a boolean */
+ public static final int SIZE_OF_BOOLEAN = 1;
+ /** Bytes used in a byte */
+ public static final int SIZE_OF_BYTE = 1;
+ /** Bytes used in a char */
+ public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE;
+ /** Bytes used in a short */
+ public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE;
+ /** Bytes used in an int */
+ public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE;
+ /** Bytes used in a long */
+ public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE;
+ /** Bytes used in a float */
+ public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE;
+ /** Bytes used in a double */
+ public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE;
+}
Index: core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java
===================================================================
--- core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java (revision 0)
+++ core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java (working copy)
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.DataOutput;
+
+/**
+ * Add a few features to data output
+ */
+public interface ExtendedDataOutput extends DataOutput {
+ /**
+ * Ensure that backing byte structure has at least minSize
+ * additional bytes
+ *
+ * @param minSize additional size required
+ */
+ void ensureWritable(int minSize);
+
+ /**
+ * Skip some number of bytes.
+ *
+ * @param bytesToSkip Number of bytes to skip
+ */
+ void skipBytes(int bytesToSkip);
+
+ /**
+ * In order to write a size as a first part of an data output, it is
+ * useful to be able to write an int at an arbitrary location in the stream
+ *
+ * @param pos Byte position in the output stream
+ * @param value Value to write
+ */
+ void writeInt(int pos, int value);
+
+ /**
+ * Get the position in the output stream
+ *
+ * @return Position in the output stream
+ */
+ int getPos();
+
+ /**
+ * Get the internal byte array (if possible), read-only
+ *
+ * @return Internal byte array (do not modify)
+ */
+ byte[] getByteArray();
+
+ /**
+ * Copies the internal byte array
+ *
+ * @return Copied byte array
+ */
+ byte[] toByteArray();
+
+ /**
+ * Return a copy of slice of byte array
+ *
+ * @param offset offset of array
+ * @param length length of slice
+ * @return byte array
+ */
+ byte[] toByteArray(int offset, int length);
+
+ /**
+ * Clears the buffer
+ */
+ void reset();
+}
Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java
===================================================================
--- core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (revision 0)
+++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (working copy)
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This class, much like {@link ByteArrayInputStream} uses a given buffer as a
+ * source of an InputStream. Unlike ByteArrayInputStream, this class does not
+ * "waste" memory by creating a local copy of the given buffer, but rather uses
+ * the given buffer as is. Hence the name Unsafe. While using this class one
+ * should remember that the byte[] buffer memory is shared and might be changed
+ * from outside.
+ *
+ * For reuse-ability, a call for {@link #reInit(byte[])} can be called, and
+ * initialize the stream with a new buffer.
+ *
+ * @lucene.experimental
+ */
+public class UnsafeByteArrayInputStream extends InputStream {
+ private byte[] buffer;
+ private int markIndex;
+ private int upperLimit;
+ private int index;
+ /**
+ * Creates a new instance by not using any byte[] up front. If you use this
+ * constructor, you MUST call either of the {@link #reInit(byte[]) reInit}
+ * methods before you consume any byte from this instance.
+ * This constructor is for convenience purposes only, so that if one does not
+ * have the byte[] at the moment of creation, one is not forced to pass a
+ * new byte[0] or something. Obviously in that case, one will
+ * call either {@link #reInit(byte[]) reInit} methods before using the class.
+ */
+ public UnsafeByteArrayInputStream() {
+ markIndex = upperLimit = index = 0;
+ }
+ /**
+ * Creates an UnsafeByteArrayInputStream which uses a given byte array as
+ * the source of the stream. Default range is [0 , buffer.length)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ */
+ public UnsafeByteArrayInputStream(byte[] buffer) {
+ reInit(buffer, 0, buffer.length);
+ }
+ /**
+ * Creates an UnsafeByteArrayInputStream which uses a given byte array as
+ * the source of the stream, at the specific range: [startPos, endPos)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ * @param startPos
+ * first index (inclusive) to the data lying in the given buffer
+ * @param endPos
+ * an index (exclusive) where the data ends. data @
+ * buffer[endPos] will never be read
+ */
+ public UnsafeByteArrayInputStream(byte[] buffer, int startPos, int endPos) {
+ reInit(buffer, startPos, endPos);
+ }
+ @Override
+ public void mark(int readlimit) {
+ markIndex = index;
+ }
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+ /**
+ * Initialize the stream with a given buffer, using the default limits of
+ * [0, buffer.length)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ */
+ public void reInit(byte[] buffer) {
+ reInit(buffer, 0, buffer.length);
+ }
+ /**
+ * Initialize the stream with a given byte array as the source of the
+ * stream, at the specific range: [startPos, endPos)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ * @param startPos
+ * first index (inclusive) to the data lying in the given buffer
+ * @param endPos
+ * an index (exclusive) where the data ends. data @
+ * buffer[endPos] will never be read
+ */
+ public void reInit(byte[] buffer, int startPos, int endPos) {
+ this.buffer = buffer;
+ markIndex = startPos;
+ upperLimit = endPos;
+ index = markIndex;
+ }
+ @Override
+ public int available() throws IOException {
+ return upperLimit - index;
+ }
+ /**
+ * Read a byte. Data returned as an integer [0,255] If end of stream
+ * reached, returns -1
+ */
+ @Override
+ public int read() throws IOException {
+ return index < upperLimit ? buffer[index++] & 0xff : -1;
+ }
+ /**
+ * Resets the stream back to its original state. Basically - moving the
+ * index back to start position.
+ */
+ @Override
+ public void reset() throws IOException {
+ index = markIndex;
+ }
+}
Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java
===================================================================
--- core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (revision 0)
+++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (working copy)
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Byte array output stream that uses Unsafe methods to serialize/deserialize
+ * much faster
+ */
+public class UnsafeByteArrayOutputStream extends OutputStream
+ implements ExtendedDataOutput {
+ static {
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ UNSAFE = (sun.misc.Unsafe) field.get(null);
+ // Checkstyle exception due to needing to check if unsafe is allowed
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " +
+ "get unsafe", e);
+ }
+ }
+
+ /** Default number of bytes */
+ private static final int DEFAULT_BYTES = 32;
+ /** Access to the unsafe class */
+ private static final sun.misc.Unsafe UNSAFE;
+
+ /** Offset of a byte array */
+ private static final long BYTE_ARRAY_OFFSET =
+ UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Byte buffer */
+ private byte[] buf;
+ /** Position in the buffer */
+ private int pos = 0;
+
+ /**
+ * Constructor
+ */
+ public UnsafeByteArrayOutputStream() {
+ this(DEFAULT_BYTES);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param size Initial size of the underlying byte array
+ */
+ public UnsafeByteArrayOutputStream(int size) {
+ buf = new byte[size];
+ }
+
+ /**
+ * Constructor to take in a buffer
+ *
+ * @param buf Buffer to start with, or if null, create own buffer
+ */
+ public UnsafeByteArrayOutputStream(byte[] buf) {
+ if (buf == null) {
+ this.buf = new byte[DEFAULT_BYTES];
+ } else {
+ this.buf = buf;
+ }
+ }
+
+ /**
+ * Constructor to take in a buffer with a given position into that buffer
+ *
+ * @param buf Buffer to start with
+ * @param pos Position to write at the buffer
+ */
+ public UnsafeByteArrayOutputStream(byte[] buf, int pos) {
+ this(buf);
+ this.pos = pos;
+ }
+
+ /**
+ * Ensure that this buffer has enough remaining space to add the size.
+ * Creates and copies to a new buffer if necessary
+ *
+ * @param size Size to add
+ */
+ private void ensureSize(int size) {
+ if (pos + size > buf.length) {
+ byte[] newBuf = new byte[(buf.length + size) << 1];
+ System.arraycopy(buf, 0, newBuf, 0, pos);
+ buf = newBuf;
+ }
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return buf;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return Arrays.copyOf(buf, pos);
+ }
+
+ @Override
+ public byte[] toByteArray(int offset, int length) {
+ if (offset + length > pos) {
+ throw new IndexOutOfBoundsException(String.format("Offset: %d + " +
+ "Length: %d exceeds the size of buf : %d", offset, length, pos));
+ }
+ return Arrays.copyOfRange(buf, offset, length);
+ }
+
+ @Override
+ public void reset() {
+ pos = 0;
+ }
+
+ @Override
+ public int getPos() {
+ return pos;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_BYTE);
+ buf[pos] = (byte) b;
+ pos += ByteUtils.SIZE_OF_BYTE;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ ensureSize(b.length);
+ System.arraycopy(b, 0, buf, pos, b.length);
+ pos += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ ensureSize(len);
+ System.arraycopy(b, off, buf, pos, len);
+ pos += len;
+ }
+
+ @Override
+ public void writeBoolean(boolean v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_BOOLEAN);
+ UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_BOOLEAN;
+ }
+
+ @Override
+ public void writeByte(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_BYTE);
+ UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v);
+ pos += ByteUtils.SIZE_OF_BYTE;
+ }
+
+ @Override
+ public void writeShort(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_SHORT);
+ UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v);
+ pos += ByteUtils.SIZE_OF_SHORT;
+ }
+
+ @Override
+ public void writeChar(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_CHAR);
+ UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v);
+ pos += ByteUtils.SIZE_OF_CHAR;
+ }
+
+ @Override
+ public void writeInt(int v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_INT);
+ UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_INT;
+ }
+
+ @Override
+ public void ensureWritable(int minSize) {
+ if ((pos + minSize) > buf.length) {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize));
+ }
+ }
+
+ @Override
+ public void skipBytes(int bytesToSkip) {
+ ensureWritable(bytesToSkip);
+ pos += bytesToSkip;
+ }
+
+ @Override
+ public void writeInt(int pos, int value) {
+ if (pos + ByteUtils.SIZE_OF_INT > this.pos) {
+ throw new IndexOutOfBoundsException(
+ "writeInt: Tried to write int to position " + pos +
+ " but current length is " + this.pos);
+ }
+ UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value);
+ }
+
+ @Override
+ public void writeLong(long v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_LONG);
+ UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_LONG;
+ }
+
+ @Override
+ public void writeFloat(float v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_FLOAT);
+ UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_FLOAT;
+ }
+
+ @Override
+ public void writeDouble(double v) throws IOException {
+ ensureSize(ByteUtils.SIZE_OF_DOUBLE);
+ UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v);
+ pos += ByteUtils.SIZE_OF_DOUBLE;
+ }
+
+ @Override
+ public void writeBytes(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ ensureSize(len);
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ writeByte(v);
+ }
+ }
+
+ @Override
+ public void writeChars(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int len = s.length();
+ ensureSize(len * ByteUtils.SIZE_OF_CHAR);
+ for (int i = 0; i < len; i++) {
+ int v = s.charAt(i);
+ writeChar(v);
+ }
+ }
+
+ @Override
+ public void writeUTF(String s) throws IOException {
+ // Note that this code is mostly copied from DataOutputStream
+ int strlen = s.length();
+ int utflen = 0;
+ int c;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ utflen++;
+ } else if (c > 0x07FF) {
+ utflen += 3;
+ } else {
+ utflen += 2;
+ }
+ }
+
+ ensureSize(utflen + ByteUtils.SIZE_OF_SHORT);
+ writeShort(utflen);
+
+ int i = 0;
+ for (i = 0; i < strlen; i++) {
+ c = s.charAt(i);
+ if (!((c >= 0x0001) && (c <= 0x007F))) {
+ break;
+ }
+ buf[pos++] = (byte) c;
+ }
+
+ for (; i < strlen; i++) {
+ c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F)) {
+ buf[pos++] = (byte) c;
+
+ } else if (c > 0x07FF) {
+ buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ } else {
+ buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+ }
+ }
+ }
+}
Index: core/src/main/java/org/apache/hama/util/WritableUtils.java
===================================================================
--- core/src/main/java/org/apache/hama/util/WritableUtils.java (revision 1681695)
+++ core/src/main/java/org/apache/hama/util/WritableUtils.java (working copy)
@@ -17,14 +17,12 @@
*/
package org.apache.hama.util;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
public class WritableUtils {
@@ -47,4 +45,24 @@
e.printStackTrace();
}
}
+
+ public static byte[] unsafeSerialize(Writable w) {
+ UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(out);
+ try {
+ w.write(output);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return out.toByteArray();
+ }
+
+ public static void unsafeDeserialize(byte[] bytes, Writable obj) {
+ DataInputStream in = new DataInputStream(new UnsafeByteArrayInputStream(bytes));
+ try {
+ obj.readFields(in);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
}
Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1681695)
+++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy)
@@ -454,6 +454,7 @@
executor.setRejectedExecutionHandler(retryHandler);
KeyValuePair next = null;
+
while ((next = peer.readNext()) != null) {
Vertex vertex = GraphJobRunner
. newVertexInstance(VERTEX_CLASS);
@@ -547,7 +548,11 @@
if (peer.getPeerIndex() == partition) {
addVertex(vertex);
} else {
- messages.get(partition).add(WritableUtils.serialize(vertex));
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ messages.get(partition).add(WritableUtils.serialize(vertex));
+ } else {
+ messages.get(partition).add(WritableUtils.unsafeSerialize(vertex));
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -709,9 +714,17 @@
it.remove();
if (combiner != null && e.getValue().getNumOfValues() > 1) {
- GraphJobMessage combined = new GraphJobMessage(e.getKey(),
- WritableUtils.serialize(combiner.combine(getIterableMessages(e
- .getValue().getValuesBytes(), e.getValue().getNumOfValues()))));
+ GraphJobMessage combined;
+ if (conf.getBoolean("hama.useUnsafeSerialization", false)) {
+ combined = new GraphJobMessage(e.getKey(), WritableUtils.unsafeSerialize(
+ combiner.combine(getIterableMessages(e.getValue().getValuesBytes(),
+ e.getValue().getNumOfValues()))));
+ } else {
+ combined = new GraphJobMessage(e.getKey(), WritableUtils.serialize(
+ combiner.combine(getIterableMessages(e.getValue().getValuesBytes(),
+ e.getValue().getNumOfValues()))));
+ }
+
combined.setFlag(GraphJobMessage.VERTEX_FLAG);
peer.send(getHostName(e.getKey()), combined);
} else {
Index: graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
===================================================================
--- graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (revision 1681695)
+++ graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (working copy)
@@ -43,6 +43,7 @@
private final ConcurrentHashMap vertices = new ConcurrentHashMap();
private GraphJobRunner runner;
+ private HamaConfiguration conf;
private int activeVertices = 0;
@@ -50,6 +51,7 @@
public void init(GraphJobRunner runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
this.runner = runner;
+ this.conf = conf;
}
@Override
@@ -84,7 +86,12 @@
public Vertex get(V vertexID) throws IOException {
Vertex v = GraphJobRunner
. newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- WritableUtils.deserialize(vertices.get(vertexID), v);
+
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ WritableUtils.deserialize(vertices.get(vertexID), v);
+ } else {
+ WritableUtils.unsafeDeserialize(vertices.get(vertexID), v);
+ }
v.setRunner(runner);
return v;
@@ -106,7 +113,12 @@
public Vertex next() {
Vertex v = GraphJobRunner
. newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- WritableUtils.deserialize(it.next(), v);
+ if (!conf.getBoolean("hama.use.unsafeserialization", false)) {
+ WritableUtils.deserialize(it.next(), v);
+ } else {
+ WritableUtils.unsafeDeserialize(it.next(), v);
+ }
+
v.setRunner(runner);
return v;
}