diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 056ecbc..5f41012 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -134,7 +135,7 @@ public class IPCUtil { } bufferSize = ClassSize.align((int)longSize); } - baos = new ByteBufferOutputStream(bufferSize); + baos = new ByteBufferOutputStream(bufferSize, false, ByteOrder.BIG_ENDIAN); } OutputStream os = baos; Compressor poolCompressor = null; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index c685a92..0bbc16f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +44,7 @@ import com.google.common.annotations.VisibleForTesting; * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this * class for a couple of seconds to get reporting on how it is running when deployed. * + *

This pool returns {@link ByteOrder#BIG_ENDIAN} byte ordered ByteBuffers. *

This class is thread safe. */ @InterfaceAudience.Private @@ -95,6 +97,7 @@ public class BoundedByteBufferPool { bb.clear(); } else { bb = ByteBuffer.allocate(this.runningAverage); + bb.order(ByteOrder.BIG_ENDIAN); this.allocations.incrementAndGet(); } if (LOG.isTraceEnabled()) { @@ -108,6 +111,7 @@ public class BoundedByteBufferPool { public void putBuffer(ByteBuffer bb) { // If buffer is larger than we want to keep around, just let it go. if (bb.capacity() > this.maxByteBufferSizeToCache) return; + assert bb.order() == ByteOrder.BIG_ENDIAN; boolean success = false; int average = 0; lock.lock(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index af12113..7d3ff26 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; @@ -43,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream { } public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { - this(allocate(capacity, useDirectByteBuffer)); + this(capacity, useDirectByteBuffer, null); + } + + public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer, ByteOrder order) { + this(allocate(capacity, useDirectByteBuffer, order)); } /** @@ -65,8 +70,12 @@ public class ByteBufferOutputStream extends OutputStream { return buf.position(); } - private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { - return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); + private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer, + ByteOrder bo) { + ByteBuffer bb = useDirectByteBuffer ? ByteBuffer.allocateDirect(capacity) : ByteBuffer + .allocate(capacity); + if (bo != null) bb.order(bo); + return bb; } /** @@ -85,7 +94,7 @@ public class ByteBufferOutputStream extends OutputStream { int newSize = (int)Math.min((((long)buf.capacity()) * 2), (long)(Integer.MAX_VALUE)); newSize = Math.max(newSize, buf.position() + extra); - ByteBuffer newBuf = allocate(newSize, buf.isDirect()); + ByteBuffer newBuf = allocate(newSize, buf.isDirect(), buf.order()); buf.flip(); newBuf.put(buf); buf = newBuf; @@ -128,6 +137,17 @@ public class ByteBufferOutputStream extends OutputStream { buf.put(b, off, len); } + /** + * Writes an int to the underlying output stream as four + * bytes, high byte first. + * @param i the int to write + * @throws IOException if an I/O error occurs. + */ + public void writeInte(int i) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_INT); + this.buf.putInt(i); + } + @Override public void flush() throws IOException { // noop diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java index 0b442a5..7145175 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -182,10 +183,16 @@ public class StreamUtils { } public static void writeInt(OutputStream out, int v) throws IOException { - out.write((byte) (0xff & (v >> 24))); - out.write((byte) (0xff & (v >> 16))); - out.write((byte) (0xff & (v >> 8))); - out.write((byte) (0xff & v)); + // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying + // ByteBuffer in one step. + if (out instanceof ByteBufferOutputStream) { + ((ByteBufferOutputStream) out).writeInte(v); + } else { + out.write((byte) (0xff & (v >> 24))); + out.write((byte) (0xff & (v >> 16))); + out.write((byte) (0xff & (v >> 8))); + out.write((byte) (0xff & v)); + } } public static void writeLong(OutputStream out, long v) throws IOException {