Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (working copy) @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import sun.misc.Unsafe; + +import java.io.DataInput; +import java.io.IOException; +import java.io.UTFDataFormatException; +import java.lang.reflect.Field; + +public class UnsafeByteArrayInputStream implements DataInput { + public static final Log LOG = LogFactory.getLog(UnsafeByteArrayInputStream.class); + + private static final Unsafe UNSAFE; + static { + try { + Field field = Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + UNSAFE = (Unsafe)field.get(null); + } catch (Exception e) { + throw new RuntimeException("UnSafeByteArrayInputStream: Failed to " + + "get unsafe" + e); + } + } + + /** Offset of a byte array */ + private static final long BYTE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(byte[].class); + /** Offset of a long array */ + private static final long LONG_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(long[].class); + /** Offset of a double array */ + private static final long DOUBLE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(double[].class); + + private final byte[] buf; + private final int bufLength; + private int pos = 0; + + public UnsafeByteArrayInputStream(byte[] buf) { + this.buf = buf; + this.bufLength = buf.length; + } + + public UnsafeByteArrayInputStream(byte[] buf, int offset, int length) { + this.buf = buf; + this.pos = offset; + this.bufLength = length; + } + + /** + * Available bytes in the stream. + * + * @return The number of bytes available + */ + public int isAvailable() { + return bufLength - pos; + } + + /** + * Current position in the stream. + * + * @return Position + */ + public int getPos() { + return pos; + } + + /** + * Check enough remaining bytes for an operation. + * + * @param requiredBytes Bytes required to read + * @throws IOException When there are not enough bytes to read + */ + private void checkRemaining(int requiredBytes) throws IOException { + if (bufLength - pos < requiredBytes) { + throw new IOException("Only " + (bufLength - pos) + + "bytes remaining, trying to read " + requiredBytes); + } + } + + @Override + public void readFully(byte[] bytes) throws IOException { + checkRemaining(bytes.length); + System.arraycopy(buf, pos, bytes, 0, bytes.length); + pos += bytes.length; + } + + @Override + public void readFully(byte[] bytes, int off, int len) throws IOException { + checkRemaining(len); + System.arraycopy(buf, pos, bytes, off, len); + pos += len; + } + + @Override + public int skipBytes(int i) throws IOException { + checkRemaining(i); + pos += i; + return i; + } + + @Override + public boolean readBoolean() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_BOOLEAN); + boolean value = UNSAFE.getBoolean(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_BOOLEAN; + + return value; + } + + @Override + public byte readByte() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_BYTE); + byte value = UNSAFE.getByte(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_BYTE; + + return value; + } + + @Override + public int readUnsignedByte() throws IOException { + return (short)(readByte() & 0xFF); + } + + @Override + public short readShort() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_SHORT); + short value = UNSAFE.getShort(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_SHORT; + + return value; + } + + @Override + public int readUnsignedShort() throws IOException { + return readShort() & 0xFFFF; + } + + @Override + public char readChar() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_CHAR); + char value = UNSAFE.getChar(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_CHAR; + + return value; + } + + @Override + public int readInt() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_INT); + int value = UNSAFE.getInt(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_INT; + + return value; + } + + @Override + public long readLong() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_LONG); + long value = UNSAFE.getLong(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_LONG; + + return value; + } + + @Override + public float readFloat() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_FLOAT); + float value = UNSAFE.getFloat(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_FLOAT; + + return value; + } + + @Override + public double readDouble() throws IOException { + checkRemaining(UnsafeConstants.SIZE_OF_DOUBLE); + double value = UNSAFE.getDouble(buf, BYTE_ARRAY_OFFSET + pos); + pos += UnsafeConstants.SIZE_OF_DOUBLE; + + return value; + } + + @Override + public String readLine() throws IOException { + char[] tmpBuf = new char[128]; + + int room = tmpBuf.length; + int offset = 0; + int c; + +loop: + while (true) { + c = readByte(); + switch (c) { + case -1: + case '\n': + break loop; + case '\r': + int c2 = readByte(); + if ((c2 != '\n') && (c2 != -1)) + pos -= 1; + break loop; + default: + if (--room < 0) { + char[] replaceBuf = new char[offset + 128]; + room = replaceBuf.length - offset - 1; + System.arraycopy(tmpBuf, 0, replaceBuf, 0, offset); + tmpBuf = replaceBuf; + } + tmpBuf[offset++] = (char)c; + break; + } + } + if ((c == -1) && (offset == 0)) + return null; + + return String.copyValueOf(tmpBuf, 0, offset); + } + + @Override + public String readUTF() throws IOException { + // Note that this code is mostly copied from DataInputStream + int utflen = readUnsignedShort(); + + byte[] bytearr = new byte[utflen]; + char[] chararr = new char[utflen]; + + int c; + int char2; + int char3; + int count = 0; + int chararrCount = 0; + + readFully(bytearr, 0, utflen); + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + if (c > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) c; + } + + while (count < utflen) { + c = (int) bytearr[count] & 0xff; + switch (c >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararrCount++] = (char) c; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx*/ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException( + "malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException( + "malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | + (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException( + "malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException( + "malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | + ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException( + "malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } +} Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (working copy) @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +import sun.misc.Unsafe; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.Arrays; + +public class UnsafeByteArrayOutputStream extends OutputStream + implements DataOutput { + + private static final Unsafe UNSAFE; + static { + try { + Field field = Unsafe.class.getDeclaredField("theUnsafe"); + field.setAccessible(true); + UNSAFE = (sun.misc.Unsafe)field.get(null); + } catch (Exception e) { + throw new RuntimeException("Failed to " + + "get unsafe", e); + } + } + + /** Offset of a byte array */ + private static final long BYTE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(byte[].class); + /** Offset of a long array */ + private static final long LONG_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(long[].class); + /** Offset of a double array */ + private static final long DOUBLE_ARRAY_OFFSET = + UNSAFE.arrayBaseOffset(double[].class); + + /** Default number of bytes */ + public static final int DEFAULT_BYTES = 32; + + private byte[] buf; + /** Position in the buffer */ + private int pos = 0; + + public UnsafeByteArrayOutputStream() { + this(DEFAULT_BYTES); + } + + public UnsafeByteArrayOutputStream(int size) { + buf = new byte[size]; + } + + public UnsafeByteArrayOutputStream(byte[] buf) { + if (buf == null) { + this.buf = new byte[DEFAULT_BYTES]; + } else { + this.buf = buf; + } + } + + public UnsafeByteArrayOutputStream(byte[] buf, int pos) { + this(buf); + this.pos = pos; + } + + /** + * Check that this buffer has enough remaining space to add the size. + * + * @param size Size to add + */ + private void checkSize(int size) { + if (pos + size > buf.length) { + byte[] newBuf = new byte[(buf.length + size) << 1]; + System.arraycopy(buf, 0, newBuf, 0, pos); + buf = newBuf; + } + } + + public byte[] getByteArray() { + return buf; + } + + public byte[] toByteArray() { + return Arrays.copyOf(buf, pos); + } + + public byte[] toByteArray(int offset, int length) { + if (offset + length > pos) { + throw new IndexOutOfBoundsException(String.format("Offset: %d + " + + "Length: %d exceeds the size of buf : %d", offset, length, pos)); + } + return Arrays.copyOfRange(buf, offset, length); + } + + public void reset() { + pos = 0; + } + + public int getPos() { + return pos; + } + + public void checkWritable(int minSize) { + if ((pos + minSize) > buf.length) { + buf = Arrays.copyOf(buf, Math.max(buf.length << 1, pos + minSize)); + } + } + + public void skipBytes(int bytesToSkip) { + checkWritable(bytesToSkip); + pos += bytesToSkip; + } + + @Override + public void writeBoolean(boolean b) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_BOOLEAN); + UNSAFE.putBoolean(buf, BYTE_ARRAY_OFFSET + pos, b); + pos += UnsafeConstants.SIZE_OF_BOOLEAN; + } + + @Override + public void writeByte(int i) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_BYTE); + UNSAFE.putByte(buf, BYTE_ARRAY_OFFSET + pos, (byte) i); + pos += UnsafeConstants.SIZE_OF_BYTE; + } + + @Override + public void writeShort(int i) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_SHORT); + UNSAFE.putShort(buf, BYTE_ARRAY_OFFSET + pos, (short) i); + pos += UnsafeConstants.SIZE_OF_SHORT; + } + + @Override + public void writeChar(int i) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_CHAR); + UNSAFE.putChar(buf, BYTE_ARRAY_OFFSET + pos, (char) i); + pos += UnsafeConstants.SIZE_OF_CHAR; + } + + @Override + public void writeInt(int i) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_INT); + UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, i); + pos += UnsafeConstants.SIZE_OF_INT; + } + + public void writeInt(int pos, int value) { + if (pos + UnsafeConstants.SIZE_OF_INT > this.pos) { + throw new IndexOutOfBoundsException( + "writeInt: Tried to write int to position " + pos + + " but current length is " + this.pos); + } + UNSAFE.putInt(buf, BYTE_ARRAY_OFFSET + pos, value); + } + + @Override + public void writeLong(long l) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_LONG); + UNSAFE.putLong(buf, BYTE_ARRAY_OFFSET + pos, l); + pos += UnsafeConstants.SIZE_OF_LONG; + } + + @Override + public void writeFloat(float v) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_FLOAT); + UNSAFE.putFloat(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += UnsafeConstants.SIZE_OF_FLOAT; + } + + @Override + public void writeDouble(double v) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_DOUBLE); + UNSAFE.putDouble(buf, BYTE_ARRAY_OFFSET + pos, v); + pos += UnsafeConstants.SIZE_OF_DOUBLE; + } + + @Override + public void writeBytes(String s) throws IOException { + int len = s.length(); + checkSize(len); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + writeByte(v); + } + } + + @Override + public void writeChars(String s) throws IOException { + int len = s.length(); + checkSize(len * UnsafeConstants.SIZE_OF_CHAR); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + writeChar(v); + } + } + + @Override + public void writeUTF(String s) throws IOException { + int strlen = s.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + checkSize(utflen + UnsafeConstants.SIZE_OF_SHORT); + writeShort(utflen); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = s.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + buf[pos++] = (byte) c; + } + + for (; i < strlen; i++) { + c = s.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + buf[pos++] = (byte) c; + + } else if (c > 0x07FF) { + buf[pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + buf[pos++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } else { + buf[pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + buf[pos++] = (byte) (0x80 | ((c >> 0) & 0x3F)); + } + } + } + + @Override + public void write(int i) throws IOException { + checkSize(UnsafeConstants.SIZE_OF_BYTE); + buf[pos] = (byte)i; + pos += UnsafeConstants.SIZE_OF_BYTE; + } + + @Override + public void write(byte[] b) throws IOException { + checkSize(b.length); + System.arraycopy(b, 0, buf, pos, b.length); + pos += b.length; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkSize(len); + System.arraycopy(b, off, buf, pos, len); + pos += len; + } +} Index: core/src/main/java/org/apache/hama/util/UnsafeConstants.java =================================================================== --- core/src/main/java/org/apache/hama/util/UnsafeConstants.java (revision 0) +++ core/src/main/java/org/apache/hama/util/UnsafeConstants.java (working copy) @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.util; + +public class UnsafeConstants { + /** Bytes used in a boolean */ + public static final int SIZE_OF_BOOLEAN = 1; + /** Bytes used in a byte */ + public static final int SIZE_OF_BYTE = 1; + /** Bytes used in a char */ + public static final int SIZE_OF_CHAR = 2; + /** Bytes used in a short */ + public static final int SIZE_OF_SHORT = 2; + /** Bytes used in a medium */ + public static final int SIZE_OF_MEDIUM = 3; + /** Bytes used in an int */ + public static final int SIZE_OF_INT = 4; + /** Bytes used in a float */ + public static final int SIZE_OF_FLOAT = 4; + /** Bytes used in a long */ + public static final int SIZE_OF_LONG = 8; + /** Bytes used in a double */ + public static final int SIZE_OF_DOUBLE = 8; +}