Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java
===================================================================
--- core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (revision 0)
+++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayInputStream.java (working copy)
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import sun.misc.Unsafe;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
+import java.lang.reflect.Field;
+
+public class UnsafeByteArrayInputStream extends InputStream {
+ private byte[] buffer;
+ private int markIndex;
+ private int upperLimit;
+ private int index;
+ /**
+ * Creates a new instance by not using any byte[] up front. If you use this
+ * constructor, you MUST call either of the {@link #reInit(byte[]) reInit}
+ * methods before you consume any byte from this instance.
+ * This constructor is for convenience purposes only, so that if one does not
+ * have the byte[] at the moment of creation, one is not forced to pass a
+ * new byte[0] or something. Obviously in that case, one will
+ * call either {@link #reInit(byte[]) reInit} methods before using the class.
+ */
+ public UnsafeByteArrayInputStream() {
+ markIndex = upperLimit = index = 0;
+ }
+ /**
+ * Creates an UnsafeByteArrayInputStream which uses a given byte array as
+ * the source of the stream. Default range is [0 , buffer.length)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ */
+ public UnsafeByteArrayInputStream(byte[] buffer) {
+ reInit(buffer, 0, buffer.length);
+ }
+ /**
+ * Creates an UnsafeByteArrayInputStream which uses a given byte array as
+ * the source of the stream, at the specific range: [startPos, endPos)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ * @param startPos
+ * first index (inclusive) to the data lying in the given buffer
+ * @param endPos
+ * an index (exclusive) where the data ends. data @
+ * buffer[endPos] will never be read
+ */
+ public UnsafeByteArrayInputStream(byte[] buffer, int startPos, int endPos) {
+ reInit(buffer, startPos, endPos);
+ }
+ @Override
+ public void mark(int readlimit) {
+ markIndex = index;
+ }
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+ /**
+ * Initialize the stream with a given buffer, using the default limits of
+ * [0, buffer.length)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ */
+ public void reInit(byte[] buffer) {
+ reInit(buffer, 0, buffer.length);
+ }
+ /**
+ * Initialize the stream with a given byte array as the source of the
+ * stream, at the specific range: [startPos, endPos)
+ *
+ * @param buffer
+ * byte array used as the source of this stream
+ * @param startPos
+ * first index (inclusive) to the data lying in the given buffer
+ * @param endPos
+ * an index (exclusive) where the data ends. data @
+ * buffer[endPos] will never be read
+ */
+ public void reInit(byte[] buffer, int startPos, int endPos) {
+ this.buffer = buffer;
+ markIndex = startPos;
+ upperLimit = endPos;
+ index = markIndex;
+ }
+ @Override
+ public int available() throws IOException {
+ return upperLimit - index;
+ }
+ /**
+ * Read a byte. Data returned as an integer [0,255] If end of stream
+ * reached, returns -1
+ */
+ @Override
+ public int read() throws IOException {
+ return index < upperLimit ? buffer[index++] & 0xff : -1;
+ }
+ /**
+ * Resets the stream back to its original state. Basically - moving the
+ * index back to start position.
+ */
+ @Override
+ public void reset() throws IOException {
+ index = markIndex;
+ }
+}
Index: core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java
===================================================================
--- core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (revision 0)
+++ core/src/main/java/org/apache/hama/util/UnsafeByteArrayOutputStream.java (working copy)
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import sun.misc.Unsafe;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * This class is used as a wrapper to a byte array, extending
+ * {@link OutputStream}. Data is written in the given byte[] buffer, until its
+ * length is insufficient. Than the buffer size is doubled and the data is
+ * written.
+ *
+ * This class is Unsafe as it is using a buffer which potentially can be changed
+ * from the outside. Moreover, when {@link #toByteArray()} is called, the buffer
+ * itself is returned, and not a copy.
+ *
+ * @lucene.experimental
+ */
+public class UnsafeByteArrayOutputStream extends OutputStream {
+ private byte[] buffer;
+ private int index;
+ private int startIndex;
+
+ /**
+ * Constructs a new output stream, with a default allocated buffer which can
+ * later be obtained via {@link #toByteArray()}.
+ */
+ public UnsafeByteArrayOutputStream() {
+ reInit(new byte[32], 0);
+ }
+
+ /**
+ * Constructs a new output stream, with a given buffer. Writing will start
+ * at index 0 as a default.
+ *
+ * @param buffer some space to which writing will be made
+ */
+ public UnsafeByteArrayOutputStream(byte[] buffer) {
+ reInit(buffer, 0);
+ }
+
+ /**
+ * Constructs a new output stream, with a given buffer. Writing will start
+ * at a given index.
+ *
+ * @param buffer some space to which writing will be made.
+ * @param startPos an index (inclusive) from white data will be written.
+ */
+ public UnsafeByteArrayOutputStream(byte[] buffer, int startPos) {
+ reInit(buffer, startPos);
+ }
+
+ private void grow(int newLength) {
+ // It actually should be: (Java 1.7, when its intrinsic on all machines)
+ // buffer = Arrays.copyOf(buffer, newLength);
+ byte[] newBuffer = new byte[newLength];
+ System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
+ buffer = newBuffer;
+ }
+
+ /**
+ * For reuse-ability, this stream object can be re-initialized with another
+ * given buffer and starting position.
+ *
+ * @param buffer some space to which writing will be made.
+ * @param startPos an index (inclusive) from white data will be written.
+ */
+ public void reInit(byte[] buffer, int startPos) {
+ if (buffer.length == 0) {
+ throw new IllegalArgumentException(
+ "initial buffer length must be greater than 0.");
+ }
+ this.buffer = buffer;
+ startIndex = startPos;
+ index = startIndex;
+ }
+
+ /**
+ * For reuse-ability, this stream object can be re-initialized with another
+ * given buffer, using 0 as default starting position.
+ *
+ * @param buffer some space to which writing will be made.
+ */
+ public void reInit(byte[] buffer) {
+ reInit(buffer, 0);
+ }
+
+ /**
+ * writes a given byte(at the form of an int) to the buffer. If the buffer's
+ * empty space is insufficient, the buffer is doubled.
+ *
+ * @param value byte value to be written
+ */
+ @Override
+ public void write(int value) throws IOException {
+ if (index >= buffer.length) {
+ grow(buffer.length << 1);
+ }
+ buffer[index++] = (byte) value;
+ }
+
+ /**
+ * writes a given byte[], with offset and length to the buffer. If the
+ * buffer's empty space is insufficient, the buffer is doubled until it
+ * could contain all the data.
+ *
+ * @param b byte buffer, containing the source data to be written
+ * @param off index from which data from the buffer b should be written
+ * @param len number of bytes that should be written
+ */
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ // If there's not enough space for the data
+ int targetLength = index + len;
+ if (targetLength >= buffer.length) {
+ // Calculating the new required length of the array, keeping the array
+ // size a power of 2 if it was initialized like that.
+ int newlen = buffer.length;
+ while ((newlen <<= 1) < targetLength) {
+ }
+ grow(newlen);
+ }
+ // Now that we have enough spare space, we could copy the rest of the
+ // data
+ System.arraycopy(b, off, buffer, index, len);
+ // Updating the index to next available index.
+ index += len;
+ }
+
+ /**
+ * Returns the byte array saved within the buffer AS IS.
+ *
+ * @return the actual inner buffer - not a copy of it.
+ */
+ public byte[] toByteArray() {
+ return buffer;
+ }
+
+ /**
+ * Returns the number of relevant bytes. This objects makes sure the buffer
+ * is at least the size of it's data. But it can also be twice as big. The
+ * user would want to process the relevant bytes only. For that he would
+ * need the count.
+ *
+ * @return number of relevant bytes
+ */
+ public int length() {
+ return index;
+ }
+
+ /**
+ * Returns the start position data was written to. This is useful in case you
+ * used {@link #reInit(byte[], int)} or
+ * {@link #UnsafeByteArrayOutputStream(byte[], int)} and passed a start
+ * position which is not 0.
+ */
+ public int getStartPos() {
+ return startIndex;
+ }
+}