diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 056ecbc..5f41012 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -134,7 +135,7 @@ public class IPCUtil { } bufferSize = ClassSize.align((int)longSize); } - baos = new ByteBufferOutputStream(bufferSize); + baos = new ByteBufferOutputStream(bufferSize, false, ByteOrder.BIG_ENDIAN); } OutputStream os = baos; Compressor poolCompressor = null; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index c685a92..0bbc16f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +44,7 @@ import com.google.common.annotations.VisibleForTesting; * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this * class for a couple of seconds to get reporting on how it is running when deployed. * + *
This pool returns {@link ByteOrder#BIG_ENDIAN} byte ordered ByteBuffers. *
This class is thread safe.
*/
@InterfaceAudience.Private
@@ -95,6 +97,7 @@ public class BoundedByteBufferPool {
bb.clear();
} else {
bb = ByteBuffer.allocate(this.runningAverage);
+ bb.order(ByteOrder.BIG_ENDIAN);
this.allocations.incrementAndGet();
}
if (LOG.isTraceEnabled()) {
@@ -108,6 +111,7 @@ public class BoundedByteBufferPool {
public void putBuffer(ByteBuffer bb) {
// If buffer is larger than we want to keep around, just let it go.
if (bb.capacity() > this.maxByteBufferSizeToCache) return;
+ assert bb.order() == ByteOrder.BIG_ENDIAN;
boolean success = false;
int average = 0;
lock.lock();
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index af12113..7d3ff26 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
@@ -43,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream {
}
public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
- this(allocate(capacity, useDirectByteBuffer));
+ this(capacity, useDirectByteBuffer, null);
+ }
+
+ public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer, ByteOrder order) {
+ this(allocate(capacity, useDirectByteBuffer, order));
}
/**
@@ -65,8 +70,12 @@ public class ByteBufferOutputStream extends OutputStream {
return buf.position();
}
- private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
- return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
+ private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer,
+ ByteOrder bo) {
+ ByteBuffer bb = useDirectByteBuffer ? ByteBuffer.allocateDirect(capacity) : ByteBuffer
+ .allocate(capacity);
+ if (bo != null) bb.order(bo);
+ return bb;
}
/**
@@ -85,7 +94,7 @@ public class ByteBufferOutputStream extends OutputStream {
int newSize = (int)Math.min((((long)buf.capacity()) * 2),
(long)(Integer.MAX_VALUE));
newSize = Math.max(newSize, buf.position() + extra);
- ByteBuffer newBuf = allocate(newSize, buf.isDirect());
+ ByteBuffer newBuf = allocate(newSize, buf.isDirect(), buf.order());
buf.flip();
newBuf.put(buf);
buf = newBuf;
@@ -128,6 +137,17 @@ public class ByteBufferOutputStream extends OutputStream {
buf.put(b, off, len);
}
+ /**
+ * Writes an int to the underlying output stream as four
+ * bytes, high byte first.
+ * @param i the int to write
+ * @throws IOException if an I/O error occurs.
+ */
+ public void writeInte(int i) throws IOException {
+ checkSizeAndGrow(Bytes.SIZEOF_INT);
+ this.buf.putInt(i);
+ }
+
@Override
public void flush() throws IOException {
// noop
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
index 0b442a5..7145175 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.base.Preconditions;
@@ -182,10 +183,16 @@ public class StreamUtils {
}
public static void writeInt(OutputStream out, int v) throws IOException {
- out.write((byte) (0xff & (v >> 24)));
- out.write((byte) (0xff & (v >> 16)));
- out.write((byte) (0xff & (v >> 8)));
- out.write((byte) (0xff & v));
+ // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
+ // ByteBuffer in one step.
+ if (out instanceof ByteBufferOutputStream) {
+ ((ByteBufferOutputStream) out).writeInte(v);
+ } else {
+ out.write((byte) (0xff & (v >> 24)));
+ out.write((byte) (0xff & (v >> 16)));
+ out.write((byte) (0xff & (v >> 8)));
+ out.write((byte) (0xff & v));
+ }
}
public static void writeLong(OutputStream out, long v) throws IOException {