Index: core/src/main/java/org/apache/hama/util/ByteUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/ByteUtils.java (revision 0) +++ core/src/main/java/org/apache/hama/util/ByteUtils.java (working copy) @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +/** + * Utilities class for byte operations and constants + */ +public class ByteUtils { + /** Bytes used in a boolean */ + public static final int SIZE_OF_BOOLEAN = 1; + /** Bytes used in a byte */ + public static final int SIZE_OF_BYTE = 1; + /** Bytes used in a char */ + public static final int SIZE_OF_CHAR = Character.SIZE / Byte.SIZE; + /** Bytes used in a short */ + public static final int SIZE_OF_SHORT = Short.SIZE / Byte.SIZE; + /** Bytes used in an int */ + public static final int SIZE_OF_INT = Integer.SIZE / Byte.SIZE; + /** Bytes used in a long */ + public static final int SIZE_OF_LONG = Long.SIZE / Byte.SIZE; + /** Bytes used in a float */ + public static final int SIZE_OF_FLOAT = Float.SIZE / Byte.SIZE; + /** Bytes used in a double */ + public static final int SIZE_OF_DOUBLE = Double.SIZE / Byte.SIZE; +} Index: core/src/main/java/org/apache/hama/util/ExtendedDataInput.java =================================================================== --- core/src/main/java/org/apache/hama/util/ExtendedDataInput.java (revision 0) +++ core/src/main/java/org/apache/hama/util/ExtendedDataInput.java (working copy) @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.util; + +import java.io.DataInput; + +/** + * Add some functionality to data input + */ +public interface ExtendedDataInput extends DataInput { + /** + * Get the position of what has been read + * + * @return How many bytes have been read? + */ + int getPos(); + + /** + * How many bytes are available? + * + * @return Bytes available + */ + int available(); + + /** + * Check if we read everything from the input + * + * @return True iff we read everything from the input + */ + boolean endOfInput(); +} Index: core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java =================================================================== --- core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java (revision 0) +++ core/src/main/java/org/apache/hama/util/ExtendedDataOutput.java (working copy) @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.DataOutput; + +/** + * Add a few features to data output + */ +public interface ExtendedDataOutput extends DataOutput { + /** + * Ensure that backing byte structure has at least minSize + * additional bytes + * + * @param minSize additional size required + */ + void ensureWritable(int minSize); + + /** + * Skip some number of bytes. + * + * @param bytesToSkip Number of bytes to skip + */ + void skipBytes(int bytesToSkip); + + /** + * In order to write a size as a first part of an data output, it is + * useful to be able to write an int at an arbitrary location in the stream + * + * @param pos Byte position in the output stream + * @param value Value to write + */ + void writeInt(int pos, int value); + + /** + * Get the position in the output stream + * + * @return Position in the output stream + */ + int getPos(); + + /** + * Get the internal byte array (if possible), read-only + * + * @return Internal byte array (do not modify) + */ + byte[] getByteArray(); + + /** + * Copies the internal byte array + * + * @return Copied byte array + */ + byte[] toByteArray(); + + /** + * Return a copy of slice of byte array + * + * @param offset offset of array + * @param length length of slice + * @return byte array + */ + byte[] toByteArray(int offset, int length); + + /** + * Clears the buffer + */ + void reset(); +} Index: core/src/main/java/org/apache/hama/util/UnsafeArrayReads.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeArrayReads.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeArrayReads.java (working copy) @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.IOException; +import java.lang.reflect.Field; + +/** + * Byte array input stream that uses Unsafe methods to deserialize + * much faster + */ +public class UnsafeArrayReads extends UnsafeReads { + /** Access to the unsafe class */ + private static final sun.misc.Unsafe UNSAFE; + static { + try { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + UNSAFE = (sun.misc.Unsafe) field.get(null); + // Checkstyle exception due to needing to check if unsafe is allowed + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + throw new RuntimeException("UnsafeArrayReads: Failed to " + + "get unsafe", e); + } + } + /** Offset of a byte array */ + private static final long BYTE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(byte[].class); + + /** Byte buffer */ + protected byte[] buf; + + /** + * Constructor + * + * @param buf Buffer to read from + */ + public UnsafeArrayReads(byte[] buf) { + super(buf.length); + this.buf = buf; + } + + /** + * Constructor. + * + * @param buf Buffer to read from + * @param offset Offsetin the buffer to start reading from + * @param length Max length of the buffer to read + */ + public UnsafeArrayReads(byte[] buf, int offset, int length) { + super(offset, length); + this.buf = buf; + } + + @Override + public int available() { + return (int) (bufLength - pos); + } + + @Override + public boolean endOfInput() { + return available() == 0; + } + + + @Override + public int getPos() { + return (int) pos; + } + + @Override + public void readFully(byte[] b) throws IOException { + ensureRemaining(b.length); + System.arraycopy(buf, (int) pos, b, 0, b.length); + pos += b.length; + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + ensureRemaining(len); + System.arraycopy(buf, (int) pos, b, off, len); + pos += len; + } + + @Override + public boolean readBoolean() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_BOOLEAN); + boolean value = UNSAFE.getBoolean(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_BOOLEAN; + return value; + } + + @Override + public byte readByte() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_BYTE); + byte value = UNSAFE.getByte(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_BYTE; + return value; + } + + @Override + public int readUnsignedByte() throws IOException { + return (short) (readByte() & 0xFF); + } + + @Override + public short readShort() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_SHORT); + short value = UNSAFE.getShort(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_SHORT; + return value; + } + + @Override + public int readUnsignedShort() throws IOException { + return readShort() & 0xFFFF; + } + + @Override + public char readChar() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_CHAR); + char value = UNSAFE.getChar(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_CHAR; + return value; + } + + @Override + public int readInt() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_INT); + int value = UNSAFE.getInt(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_INT; + return value; + } + + @Override + public long readLong() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_LONG); + long value = UNSAFE.getLong(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_LONG; + return value; + } + + @Override + public float readFloat() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_FLOAT); + float value = UNSAFE.getFloat(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_FLOAT; + return value; + } + + @Override + public double readDouble() throws IOException { + ensureRemaining(ByteUtils.SIZE_OF_DOUBLE); + double value = UNSAFE.getDouble(buf, + BYTE_ARRAY_OFFSET + pos); + pos += ByteUtils.SIZE_OF_DOUBLE; + return value; + } + + /** + * Get an int at an arbitrary position in a byte[] + * + * @param buf Buffer to get the int from + * @param pos Position in the buffer to get the int from + * @return Int at the buffer position + */ + public static int getInt(byte[] buf, int pos) { + return UNSAFE.getInt(buf, + BYTE_ARRAY_OFFSET + pos); + } +} Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (working copy) @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +/** + * UnsafeByteArrayInputStream + */ +public class UnsafeByteArrayInputStream extends UnsafeArrayReads { + + /** + * Constructor + * + * @param buf Buffer to read from + */ + public UnsafeByteArrayInputStream(byte[] buf) { + super(buf); + } + + /** + * Constructor. + * + * @param buf Buffer to read from + * @param offset Offsetin the buffer to start reading from + * @param length Max length of the buffer to read + */ + public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) { + super(buf, offset, length); + } +} Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (working copy) @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.Arrays; + +/** + * Byte array output stream that uses Unsafe methods to serialize/deserialize + * much faster + */ +public class UnsafeByteArrayOutputStream extends OutputStream + implements ExtendedDataOutput { + static { + try { + Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + UNSAFE = (sun.misc.Unsafe) field.get(null); + // Checkstyle exception due to needing to check if unsafe is allowed + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + throw new RuntimeException("UnsafeByteArrayOutputStream: Failed to " + + "get unsafe", e); + } + } + + /** Default number of bytes */ + private static final int DEFAULT_BYTES = 32; + /** Access to the unsafe class */ + private static final sun.misc.Unsafe UNSAFE; + + /** Offset of a byte array */ + private static final long BYTE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(byte[].class); + + /** Byte buffer */ + private byte[] buf; + /** Position in the buffer */ + private int pos = 0; + + /** + * Constructor + */ + public UnsafeByteArrayOutputStream() { + this(DEFAULT_BYTES); + } + + /** + * Constructor + * + * @param size Initial size of the underlying byte array + */ + public UnsafeByteArrayOutputStream(int size) { + buf = new byte[size]; + } + + /** + * Constructor to take in a buffer + * + * @param buf Buffer to start with, or if null, create own buffer + */ + public UnsafeByteArrayOutputStream(byte[] buf) { + if (buf == null) { + this.buf = new byte[DEFAULT_BYTES]; + } else { + this.buf = buf; + } + } + + /** + * Constructor to take in a buffer with a given position into that buffer + * + * @param buf Buffer to start with + * @param pos Position to write at the buffer + */ + public UnsafeByteArrayOutputStream(byte[] buf, int pos) { + this(buf); + this.pos = pos; + } + + /** + * Ensure that this buffer has enough remaining space to add the size. + * Creates and copies to a new buffer if necessary + * + * @param size Size to add + */ + private void ensureSize(int size) { + if (pos + size > buf.length) { + byte[] newBuf = new byte[(buf.length + size) << 1]; + System.arraycopy(buf, 0, newBuf, 0, pos); + buf = newBuf; + } + } + + @Override + public byte[] getByteArray() { + return buf; + } + + @Override + public byte[] toByteArray() { + return Arrays.copyOf(buf, pos); + } + + @Override + public byte[] toByteArray(int offset, int length) { + if (offset + length > pos) { + throw new IndexOutOfBoundsException(String.format("Offset: %d + " + + "Length: %d exceeds the size of buf : %d", offset, length, pos)); + } + return Arrays.copyOfRange(buf, offset, length); + } + + @Override + public void reset() { + pos = 0; + } + + @Override + public int getPos() { + return pos; + } + + @Override + public void write(int b) throws IOException { + ensureSize(ByteUtils.SIZE_OF_BYTE); + buf[pos] = (byte) b; + pos += ByteUtils.SIZE_OF_BYTE; + } + + @Override + public void write(byte[] b) throws IOException { + ensureSize(b.length); + System.arraycopy(b, 0, buf, pos, b.length); + pos += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + ensureSize(len); + System.arraycopy(b, off, buf, pos, len); + pos += len; + } + + @Override + public void writeBoolean(boolean v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_BOOLEAN); + UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_BOOLEAN; + } + + @Override + public void writeByte(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_BYTE); + UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) v); + pos += ByteUtils.SIZE_OF_BYTE; + } + + @Override + public void writeShort(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_SHORT); + UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) v); + pos += ByteUtils.SIZE_OF_SHORT; + } + + @Override + public void writeChar(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_CHAR); + UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) v); + pos += ByteUtils.SIZE_OF_CHAR; + } + + @Override + public void writeInt(int v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_INT); + UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_INT; + } + + @Override + public void ensureWritable(int minSize) { + if ((pos + minSize) > buf.length) { + buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize)); + } + } + + @Override + public void skipBytes(int bytesToSkip) { + ensureWritable(bytesToSkip); + pos += bytesToSkip; + } + + @Override + public void writeInt(int pos, int value) { + if (pos + ByteUtils.SIZE_OF_INT > this.pos) { + throw new IndexOutOfBoundsException( + "writeInt: Tried to write int to position " + pos + + " but current length is " + this.pos); + } + UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value); + } + + @Override + public void writeLong(long v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_LONG); + UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_LONG; + } + + @Override + public void writeFloat(float v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_FLOAT); + UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_FLOAT; + } + + @Override + public void writeDouble(double v) throws IOException { + ensureSize(ByteUtils.SIZE_OF_DOUBLE); + UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += ByteUtils.SIZE_OF_DOUBLE; + } + + @Override + public void writeBytes(String s) throws IOException { + // Note that this code is mostly copied from DataOutputStream + int len = s.length(); + ensureSize(len); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + writeByte(v); + } + } + + @Override + public void writeChars(String s) throws IOException { + // Note that this code is mostly copied from DataOutputStream + int len = s.length(); + ensureSize(len * ByteUtils.SIZE_OF_CHAR); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + writeChar(v); + } + } + + @Override + public void writeUTF(String s) throws IOException { + // Note that this code is mostly copied from DataOutputStream + int strlen = s.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + ensureSize(utflen + ByteUtils.SIZE_OF_SHORT); + writeShort(utflen); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = s.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + buf[pos++] = (byte) c; + } + + for (; i < strlen; i++) { + c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + buf[pos++] = (byte) c; + + } else if (c > 0x07FF) { + buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + } +} Index: core/src/main/java/org/apache/hama/util/UnsafeReads.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeReads.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeReads.java (working copy) @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import java.io.IOException; +import java.io.UTFDataFormatException; + +/** + * Byte array input stream that uses Unsafe methods to deserialize + * much faster + */ +public abstract class UnsafeReads implements ExtendedDataInput { + + /** Buffer length */ + protected int bufLength; + /** Position in the buffer */ + protected long pos = 0; + + /** + * Constructor + * + * @param length buf length + */ + public UnsafeReads(int length) { + bufLength = length; + } + + /** + * Constructor with offset + * + * @param offset offset in memory + * @param length buf length + */ + public UnsafeReads(long offset, int length) { + pos = offset; + bufLength = length; + } + + /** + * How many bytes are still available? + * + * @return Number of bytes available + */ + public abstract int available(); + + /** + * What position in the stream? + * + * @return Position + */ + public abstract int getPos(); + + /** + * Check whether there are enough remaining bytes for an operation + * + * @param requiredBytes Bytes required to read + * @throws IOException When there are not enough bytes to read + */ + protected void ensureRemaining(int requiredBytes) throws IOException { + if (available() < requiredBytes) { + throw new IOException("ensureRemaining: Only " + available() + + " bytes remaining, trying to read " + requiredBytes); + } + } + + @Override + public int skipBytes(int n) throws IOException { + ensureRemaining(n); + pos += n; + return n; + } + + @Override + public String readLine() throws IOException { + // Note that this code is mostly copied from DataInputStream + char[] tmpBuf = new char[128]; + + int room = tmpBuf.length; + int offset = 0; + int c; + + loop: + while (true) { + c = readByte(); + switch (c) { + case -1: + case '\n': + break loop; + case '\r': + int c2 = readByte(); + if ((c2 != '\n') && (c2 != -1)) { + pos -= 1; + } + break loop; + default: + if (--room < 0) { + char[] replacebuf = new char[offset + 128]; + room = replacebuf.length - offset - 1; + System.arraycopy(tmpBuf, 0, replacebuf, 0, offset); + tmpBuf = replacebuf; + } + tmpBuf[offset++] = (char) c; + break; + } + } + if ((c == -1) && (offset == 0)) { + return null; + } + return String.copyValueOf(tmpBuf, 0, offset); + } + + @Override + public String readUTF() throws IOException { + // Note that this code is mostly copied from DataInputStream + int utflen = readUnsignedShort(); + + byte[] bytearr = new byte[utflen]; + char[] chararr = new char[utflen]; + + int c; + int char2; + int char3; + int count = 0; + int chararrCount = 0; + + readFully(bytearr, 0, utflen); + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararrCount++] = (char) c; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx*/ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException( + "malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException( + "malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | + (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException( + "malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException( + "malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | + ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException( + "malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } +} Index: core/src/main/java/org/apache/hama/util/WritableUtils.java =================================================================== --- core/src/main/java/org/apache/hama/util/WritableUtils.java (revision 1679302) +++ core/src/main/java/org/apache/hama/util/WritableUtils.java (working copy) @@ -17,12 +17,7 @@ */ package org.apache.hama.util; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; +import java.io.*; import org.apache.hadoop.io.Writable; @@ -29,7 +24,7 @@ public class WritableUtils { public static byte[] serialize(Writable w) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); + UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream(); DataOutput output = new DataOutputStream(out); try { w.write(output); @@ -36,13 +31,26 @@ } catch (IOException e) { e.printStackTrace(); } + return out.toByteArray(); +// ByteArrayOutputStream out = new ByteArrayOutputStream(); +// DataOutput output = new DataOutputStream(out); +// try { +// w.write(output); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// return out.toByteArray(); } public static void deserialize(byte[] bytes, Writable obj) { - DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes)); +// DataInputStream in = new DataInputStream( +// new UnsafeByteArrayInputStream(bytes)); + +// DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes)); try { - obj.readFields(in); + obj.readFields(new UnsafeByteArrayInputStream(bytes)); + //obj.readFields(in); } catch (IOException e) { e.printStackTrace(); } Index: graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (revision 1679302) +++ graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hama.util.UnsafeByteArrayOutputStream; /** * A message that is either MapWritable (for meta communication purposes) or a @@ -52,7 +53,8 @@ private int numOfValues = 0; - private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream(); + //private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream(); + private final UnsafeByteArrayOutputStream byteBuffer = new UnsafeByteArrayOutputStream(); static { if (comparator == null) { @@ -137,7 +139,8 @@ vertexId.write(out); out.writeInt(numOfValues); - out.writeInt(byteBuffer.size()); + out.writeInt(byteBuffer.getPos()); + //out.writeInt(byteBuffer.size()); out.write(byteBuffer.toByteArray()); } else if (isMapMessage()) { map.write(out); @@ -145,7 +148,8 @@ integerMessage.write(out); } else if (isPartitioningMessage()) { out.writeInt(numOfValues); - out.writeInt(byteBuffer.size()); + out.writeInt(byteBuffer.getPos()); + //out.writeInt(byteBuffer.size()); out.write(byteBuffer.toByteArray()); } else { vertexId.write(out); Index: graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java =================================================================== --- graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (revision 1679302) +++ graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hama.bsp.sync.SyncException; import org.apache.hama.commons.util.KeyValuePair; import org.apache.hama.util.ReflectionUtils; +import org.apache.hama.util.UnsafeByteArrayInputStream; import org.apache.hama.util.WritableUtils; /** @@ -508,13 +509,14 @@ @Override public void run() { - ByteArrayInputStream bis = new ByteArrayInputStream(msg.getValuesBytes()); - DataInputStream dis = new DataInputStream(bis); + //ByteArrayInputStream bis = new ByteArrayInputStream(msg.getValuesBytes()); + UnsafeByteArrayInputStream bis = new UnsafeByteArrayInputStream(msg.getValuesBytes()); + //DataInputStream dis = new DataInputStream(bis); for (int i = 0; i < msg.getNumOfValues(); i++) { try { Vertex vertex = newVertexInstance(VERTEX_CLASS); - vertex.readFields(dis); + vertex.readFields(bis); addVertex(vertex); } catch (IOException e) { @@ -689,8 +691,8 @@ if (!storage.containsKey(vertexID)) { // To save bit memory we don't set vertexID twice storage.putIfAbsent(vertexID, new GraphJobMessage()); + storage.get(vertexID).add(msg); } - storage.get(vertexID).add(msg); } public void finishSuperstep() throws IOException { @@ -727,8 +729,9 @@ @Override public Iterator iterator() { return new Iterator() { - ByteArrayInputStream bis = new ByteArrayInputStream(valuesBytes); - DataInputStream dis = new DataInputStream(bis); + //ByteArrayInputStream bis = new ByteArrayInputStream(valuesBytes); + UnsafeByteArrayInputStream bis = new UnsafeByteArrayInputStream(valuesBytes); + //DataInputStream dis = new DataInputStream(bis); int index = 0; @Override @@ -740,7 +743,8 @@ public Writable next() { Writable v = createVertexValue(); try { - v.readFields(dis); + //v.readFields(dis); + v.readFields(bis); } catch (IOException e) { e.printStackTrace(); }