Index: core/src/main/java/org/apache/hama/util/ByteUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/ByteUtils.java (revision 0) +++ core/src/main/java/org/apache/hama/util/ByteUtils.java (working copy) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +/** + * Utilities class for byte operations and constants + */ +public class ByteUtils { + /** Bytes used in a boolean */ + public static final int SIZE_OF_BOOLEAN = 1; + /** Bytes used in a byte */ + public static final int SIZE_OF_BYTE = 1; + /** Bytes used in a char */ + public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE; + /** Bytes used in a short */ + public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE; + /** Bytes used in an int */ + public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE; + /** Bytes used in a long */ + public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE; + /** Bytes used in a float */ + public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE; + /** Bytes used in a double */ + public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE; +} Index: core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java =================================================================== --- core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java (revision 0) +++ core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java (working copy) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.DataOutput; + +/** + * Add a few features to data output + */ +public interface ExtendedDataOutput extends DataOutput { + /** + * Ensure that backing byte structure has at least minSize + * additional bytes + * + * @param minSize additional size required + */ + void ensureWritable(int minSize); + + /** + * Skip some number of bytes. + * + * @param bytesToSkip Number of bytes to skip + */ + void skipBytes(int bytesToSkip); + + /** + * In order to write a size as a first part of an data output, it is + * useful to be able to write an int at an arbitrary location in the stream + * + * @param pos Byte position in the output stream + * @param value Value to write + */ + void writeInt(int pos, int value); + + /** + * Get the position in the output stream + * + * @return Position in the output stream + */ + int getPos(); + + /** + * Get the internal byte array (if possible), read-only + * + * @return Internal byte array (do not modify) + */ + byte[] getByteArray(); + + /** + * Copies the internal byte array + * + * @return Copied byte array + */ + byte[] toByteArray(); + + /** + * Return a copy of slice of byte array + * + * @param offset offset of array + * @param length length of slice + * @return byte array + */ + byte[] toByteArray(int offset, int length); + + /** + * Clears the buffer + */ + void reset(); +} Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (working copy) @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This class, much like {@link ByteArrayInputStream} uses a given buffer as a + * source of an InputStream. Unlike ByteArrayInputStream, this class does not + * "waste" memory by creating a local copy of the given buffer, but rather uses + * the given buffer as is. Hence the name Unsafe. While using this class one + * should remember that the byte[] buffer memory is shared and might be changed + * from outside. + * + * For reuse-ability, a call for {@link #reInit(byte[])} can be called, and + * initialize the stream with a new buffer. + * + * @lucene.experimental + */ +public class UnsafeByteArrayInputStream extends InputStream { + private byte[] buffer; + private int markIndex; + private int upperLimit; + private int index; + /** + * Creates a new instance by not using any byte[] up front. If you use this + * constructor, you MUST call either of the {@link #reInit(byte[]) reInit} + * methods before you consume any byte from this instance.
+ * This constructor is for convenience purposes only, so that if one does not + * have the byte[] at the moment of creation, one is not forced to pass a + * new byte[0] or something. Obviously in that case, one will + * call either {@link #reInit(byte[]) reInit} methods before using the class. + */ + public UnsafeByteArrayInputStream() { + markIndex = upperLimit = index = 0; + } + /** + * Creates an UnsafeByteArrayInputStream which uses a given byte array as + * the source of the stream. Default range is [0 , buffer.length) + * + * @param buffer + * byte array used as the source of this stream + */ + public UnsafeByteArrayInputStream(byte[] buffer) { + reInit(buffer, 0, buffer.length); + } + /** + * Creates an UnsafeByteArrayInputStream which uses a given byte array as + * the source of the stream, at the specific range: [startPos, endPos) + * + * @param buffer + * byte array used as the source of this stream + * @param startPos + * first index (inclusive) to the data lying in the given buffer + * @param endPos + * an index (exclusive) where the data ends. data @ + * buffer[endPos] will never be read + */ + public UnsafeByteArrayInputStream(byte[] buffer, int startPos, int endPos) { + reInit(buffer, startPos, endPos); + } + @Override + public void mark(int readlimit) { + markIndex = index; + } + @Override + public boolean markSupported() { + return true; + } + /** + * Initialize the stream with a given buffer, using the default limits of + * [0, buffer.length) + * + * @param buffer + * byte array used as the source of this stream + */ + public void reInit(byte[] buffer) { + reInit(buffer, 0, buffer.length); + } + /** + * Initialize the stream with a given byte array as the source of the + * stream, at the specific range: [startPos, endPos) + * + * @param buffer + * byte array used as the source of this stream + * @param startPos + * first index (inclusive) to the data lying in the given buffer + * @param endPos + * an index (exclusive) where the data ends. data @ + * buffer[endPos] will never be read + */ + public void reInit(byte[] buffer, int startPos, int endPos) { + this.buffer = buffer; + markIndex = startPos; + upperLimit = endPos; + index = markIndex; + } + @Override + public int available() throws IOException { + return upperLimit - index; + } + /** + * Read a byte. Data returned as an integer [0,255] If end of stream + * reached, returns -1 + */ + @Override + public int read() throws IOException { + return index < upperLimit ? buffer[index++] & 0xff : -1; + } + /** + * Resets the stream back to its original state. Basically - moving the + * index back to start position. + */ + @Override + public void reset() throws IOException { + index = markIndex; + } +} Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (working copy) @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.Arrays; + +/** + * Byte array output stream that uses Unsafe methods to serialize/deserialize + * much faster + */ +public class UnsafeByteArrayOutputStream extends OutputStream + implements ExtendedDataOutput { + static { + try { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + UNSAFE = (sun.misc.Unsafe) field.get(null); + // Checkstyle exception due to needing to check if unsafe is allowed + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " + + "get unsafe", e); + } + } + + /** Default number of bytes */ + private static final int DEFAULT_BYTES = 32; + /** Access to the unsafe class */ + private static final sun.misc.Unsafe UNSAFE; + + /** Offset of a byte array */ + private static final long BYTE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(byte[].class); + + /** Byte buffer */ + private byte[] buf; + /** Position in the buffer */ + private int pos = 0; + + /** + * Constructor + */ + public UnsafeByteArrayOutputStream() { + this(DEFAULT_BYTES); + } + + /** + * Constructor + * + * @param size Initial size of the underlying byte array + */ + public UnsafeByteArrayOutputStream(int size) { + buf = new byte[size]; + } + + /** + * Constructor to take in a buffer + * + * @param buf Buffer to start with, or if null, create own buffer + */ + public UnsafeByteArrayOutputStream(byte[] buf) { + if (buf == null) { + this.buf = new byte[DEFAULT_BYTES]; + } else { + this.buf = buf; + } + } + + /** + * Constructor to take in a buffer with a given position into that buffer + * + * @param buf Buffer to start with + * @param pos Position to write at the buffer + */ + public UnsafeByteArrayOutputStream(byte[] buf, int pos) { + this(buf); + this.pos = pos; + } + + /** + * Ensure that this buffer has enough remaining space to add the size. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + private void ensureSize(int size) { + if (pos + size > buf.length) { + byte[] newBuf = new byte[(buf.length + size) << 1]; + System.arraycopy(buf, 0, newBuf, 0, pos); + buf = newBuf; + } + } + + @Override + public byte[] getByteArray() { + return buf; + } + + @Override + public byte[] toByteArray() { + return Arrays.copyOf(buf, pos); + } + + @Override + public byte[] toByteArray(int offset, int length) { + if (offset + length > pos) { + throw new IndexOutOfBoundsException(String.format("Offset: %d + " + + "Length: %d exceeds the size of buf : %d", offset, length, pos)); + } + return Arrays.copyOfRange(buf, offset, length); + } + + @Override + public void reset() { + pos = 0; + } + + @Override + public int getPos() { + return pos; + } + + @Override + public void write(int b) throws IOException { + ensureSize(ByteUtils.SIZE_OF_BYTE); + buf[pos] = (byte) b; + pos += ByteUtils.SIZE_OF_BYTE; + } + + @Override + public void write(byte[] b) throws IOException { + ensureSize(b.length); + System.arraycopy(b, 0, buf, pos, b.length); + pos += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + ensureSize(len); + System.arraycopy(b, off, buf, pos, len); + pos += len; + } + + @Override + public void writeBoolean(boolean v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_BOOLEAN); + UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_BOOLEAN; + } + + @Override + public void writeByte(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_BYTE); + UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v); + pos += ByteUtils.SIZE_OF_BYTE; + } + + @Override + public void writeShort(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_SHORT); + UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v); + pos += ByteUtils.SIZE_OF_SHORT; + } + + @Override + public void writeChar(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_CHAR); + UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v); + pos += ByteUtils.SIZE_OF_CHAR; + } + + @Override + public void writeInt(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_INT); + UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_INT; + } + + @Override + public void ensureWritable(int minSize) { + if ((pos + minSize) > buf.length) { + buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize)); + } + } + + @Override + public void skipBytes(int bytesToSkip) { + ensureWritable(bytesToSkip); + pos += bytesToSkip; + } + + @Override + public void writeInt(int pos, int value) { + if (pos + ByteUtils.SIZE_OF_INT > this.pos) { + throw new IndexOutOfBoundsException( + "writeInt: Tried to write int to position " + pos + + " but current length is " + this.pos); + } + UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value); + } + + @Override + public void writeLong(long v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_LONG); + UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_LONG; + } + + @Override + public void writeFloat(float v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_FLOAT); + UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_FLOAT; + } + + @Override + public void writeDouble(double v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_DOUBLE); + UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_DOUBLE; + } + + @Override + public void writeBytes(String s) throws IOException { + // Note that this code is mostly copied from DataOutputStream + int len = s.length(); + ensureSize(len); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + writeByte(v); + } + } + + @Override + public void writeChars(String s) throws IOException { + // Note that this code is mostly copied from DataOutputStream + int len = s.length(); + ensureSize(len * ByteUtils.SIZE_OF_CHAR); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + writeChar(v); + } + } + + @Override + public void writeUTF(String s) throws IOException { + // Note that this code is mostly copied from DataOutputStream + int strlen = s.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + ensureSize(utflen + ByteUtils.SIZE_OF_SHORT); + writeShort(utflen); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = s.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + buf[pos++] = (byte) c; + } + + for (; i < strlen; i++) { + c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + buf[pos++] = (byte) c; + + } else if (c > 0x07FF) { + buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + } +} Index: core/src/main/java/org/apache/hama/util/WritableUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/WritableUtils.java (revision 1681695) +++ core/src/main/java/org/apache/hama/util/WritableUtils.java (working copy) @@ -17,14 +17,12 @@ */ package org.apache.hama.util; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; +import java.io.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hama.HamaConfiguration; public class WritableUtils { @@ -47,4 +45,24 @@ e.printStackTrace(); } } + + public static byte[] unsafeSerialize(Writable w) { + UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream(); + DataOutput output = new DataOutputStream(out); + try { + w.write(output); + } catch (IOException e) { + e.printStackTrace(); + } + return out.toByteArray(); + } + + public static void unsafeDeserialize(byte[] bytes, Writable obj) { + DataInputStream in = new DataInputStream(new UnsafeByteArrayInputStream(bytes)); + try { + obj.readFields(in); + } catch (IOException e) { + e.printStackTrace(); + } + } } Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1681695) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -454,6 +454,7 @@ executor.setRejectedExecutionHandler(retryHandler); KeyValuePair next = null; + while ((next = peer.readNext()) != null) { Vertex vertex = GraphJobRunner . newVertexInstance(VERTEX_CLASS); @@ -547,7 +548,11 @@ if (peer.getPeerIndex() == partition) { addVertex(vertex); } else { - messages.get(partition).add(WritableUtils.serialize(vertex)); + if (!conf.getBoolean("hama.use.unsafeserialization", false)) { + messages.get(partition).add(WritableUtils.serialize(vertex)); + } else { + messages.get(partition).add(WritableUtils.unsafeSerialize(vertex)); + } } } catch (Exception e) { throw new RuntimeException(e); @@ -709,9 +714,17 @@ it.remove(); if (combiner != null && e.getValue().getNumOfValues() > 1) { - GraphJobMessage combined = new GraphJobMessage(e.getKey(), - WritableUtils.serialize(combiner.combine(getIterableMessages(e - .getValue().getValuesBytes(), e.getValue().getNumOfValues())))); + GraphJobMessage combined; + if (conf.getBoolean("hama.useUnsafeSerialization", false)) { + combined = new GraphJobMessage(e.getKey(), WritableUtils.unsafeSerialize( + combiner.combine(getIterableMessages(e.getValue().getValuesBytes(), + e.getValue().getNumOfValues())))); + } else { + combined = new GraphJobMessage(e.getKey(), WritableUtils.serialize( + combiner.combine(getIterableMessages(e.getValue().getValuesBytes(), + e.getValue().getNumOfValues())))); + } + combined.setFlag(GraphJobMessage.VERTEX_FLAG); peer.send(getHostName(e.getKey()), combined); } else { Index: graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (revision 1681695) +++ graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java (working copy) @@ -43,6 +43,7 @@ private final ConcurrentHashMap vertices = new ConcurrentHashMap(); private GraphJobRunner runner; + private HamaConfiguration conf; private int activeVertices = 0; @@ -50,6 +51,7 @@ public void init(GraphJobRunner runner, HamaConfiguration conf, TaskAttemptID attempt) throws IOException { this.runner = runner; + this.conf = conf; } @Override @@ -84,7 +86,12 @@ public Vertex get(V vertexID) throws IOException { Vertex v = GraphJobRunner . newVertexInstance(GraphJobRunner.VERTEX_CLASS); - WritableUtils.deserialize(vertices.get(vertexID), v); + + if (!conf.getBoolean("hama.use.unsafeserialization", false)) { + WritableUtils.deserialize(vertices.get(vertexID), v); + } else { + WritableUtils.unsafeDeserialize(vertices.get(vertexID), v); + } v.setRunner(runner); return v; @@ -106,7 +113,12 @@ public Vertex next() { Vertex v = GraphJobRunner . newVertexInstance(GraphJobRunner.VERTEX_CLASS); - WritableUtils.deserialize(it.next(), v); + if (!conf.getBoolean("hama.use.unsafeserialization", false)) { + WritableUtils.deserialize(it.next(), v); + } else { + WritableUtils.unsafeDeserialize(it.next(), v); + } + v.setRunner(runner); return v; }