diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 056ecbc..5f41012 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -134,7 +135,7 @@ public class IPCUtil { } bufferSize = ClassSize.align((int)longSize); } - baos = new ByteBufferOutputStream(bufferSize); + baos = new ByteBufferOutputStream(bufferSize, false, ByteOrder.BIG_ENDIAN); } OutputStream os = baos; Compressor poolCompressor = null; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index b2343f1..315e9a3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -37,6 +37,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -2532,12 +2533,22 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, if (!withTags) { length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; } - // This does same as DataOuput#writeInt (big-endian, etc.) - StreamUtils.writeInt(out, length); + writeInt(out, length); out.write(this.bytes, this.offset, length); return length + Bytes.SIZEOF_INT; } + // This does same as DataOuput#writeInt (big-endian, etc.) + public static void writeInt(OutputStream out, int v) throws IOException { + // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying + // ByteBuffer in one step. + if (out instanceof ByteBufferOutputStream) { + ((ByteBufferOutputStream) out).writeInt(v); + } else { + StreamUtils.writeInt(out, v); + } + } + /** * Comparator that compares row component only of a KeyValue. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 3b0c05c..5035666 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -679,11 +679,11 @@ public class KeyValueUtil { int tlen = cell.getTagsLength(); // write total length - StreamUtils.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags)); + KeyValue.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags)); // write key length - StreamUtils.writeInt(out, keyLength(rlen, flen, qlen)); + KeyValue.writeInt(out, keyLength(rlen, flen, qlen)); // write value length - StreamUtils.writeInt(out, vlen); + KeyValue.writeInt(out, vlen); // Write rowkey - 2 bytes rk length followed by rowkey bytes StreamUtils.writeShort(out, rlen); out.write(cell.getRowArray(), cell.getRowOffset(), rlen); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java index 6de6653..a571580 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; /** @@ -44,8 +43,7 @@ public class NoTagsKeyValue extends KeyValue { public int write(OutputStream out, boolean withTags) throws IOException { // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls // check KeyValueUtil#oswrite also and do necessary changes. - // This does same as DataOuput#writeInt (big-endian, etc.) - StreamUtils.writeInt(out, this.length); + writeInt(out, this.length); out.write(this.bytes, this.offset, this.length); return this.length + Bytes.SIZEOF_INT; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index c685a92..7bee173 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +44,7 @@ import com.google.common.annotations.VisibleForTesting; * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this * class for a couple of seconds to get reporting on how it is running when deployed. * + *
This pool returns {@link ByteOrder#BIG_ENDIAN} byte ordered ByteBuffers. *
This class is thread safe.
*/
@InterfaceAudience.Private
@@ -95,6 +97,7 @@ public class BoundedByteBufferPool {
bb.clear();
} else {
bb = ByteBuffer.allocate(this.runningAverage);
+ bb.order(ByteOrder.BIG_ENDIAN);
this.allocations.incrementAndGet();
}
if (LOG.isTraceEnabled()) {
@@ -108,6 +111,7 @@ public class BoundedByteBufferPool {
public void putBuffer(ByteBuffer bb) {
// If buffer is larger than we want to keep around, just let it go.
if (bb.capacity() > this.maxByteBufferSizeToCache) return;
+ assert bb.order() == ByteOrder.BIG_ENDIAN;
boolean success = false;
int average = 0;
lock.lock();
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index af12113..d5430bf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
@@ -43,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream {
}
public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
- this(allocate(capacity, useDirectByteBuffer));
+ this(capacity, useDirectByteBuffer, null);
+ }
+
+ public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer, ByteOrder order) {
+ this(allocate(capacity, useDirectByteBuffer, order));
}
/**
@@ -65,8 +70,12 @@ public class ByteBufferOutputStream extends OutputStream {
return buf.position();
}
- private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
- return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
+ private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer,
+ ByteOrder bo) {
+ ByteBuffer bb = useDirectByteBuffer ? ByteBuffer.allocateDirect(capacity) : ByteBuffer
+ .allocate(capacity);
+ if (bo != null) bb.order(bo);
+ return bb;
}
/**
@@ -85,7 +94,7 @@ public class ByteBufferOutputStream extends OutputStream {
int newSize = (int)Math.min((((long)buf.capacity()) * 2),
(long)(Integer.MAX_VALUE));
newSize = Math.max(newSize, buf.position() + extra);
- ByteBuffer newBuf = allocate(newSize, buf.isDirect());
+ ByteBuffer newBuf = allocate(newSize, buf.isDirect(), buf.order());
buf.flip();
newBuf.put(buf);
buf = newBuf;
@@ -128,6 +137,17 @@ public class ByteBufferOutputStream extends OutputStream {
buf.put(b, off, len);
}
+ /**
+ * Writes an int to the underlying output stream as four
+ * bytes, high byte first.
+ * @param i the int to write
+ * @throws IOException if an I/O error occurs.
+ */
+ public void writeInt(int i) throws IOException {
+ checkSizeAndGrow(Bytes.SIZEOF_INT);
+ this.buf.putInt(i);
+ }
+
@Override
public void flush() throws IOException {
// noop
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 62e81ab..8406a1c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Streamable;
import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.hfile.BlockType;
@@ -548,9 +549,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
public int write(OutputStream out, boolean withTags) throws IOException {
int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength,
tagsLength, withTags);
- StreamUtils.writeInt(out, lenToWrite);
- StreamUtils.writeInt(out, keyOnlyBuffer.length);
- StreamUtils.writeInt(out, valueLength);
+ writeInt(out, lenToWrite);
+ writeInt(out, keyOnlyBuffer.length);
+ writeInt(out, valueLength);
// Write key
out.write(keyOnlyBuffer);
// Write value
@@ -574,6 +575,16 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
}
}
+ private static void writeInt(OutputStream out, int v) throws IOException {
+ // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying
+ // ByteBuffer in one step.
+ if (out instanceof ByteBufferOutputStream) {
+ ((ByteBufferOutputStream) out).writeInt(v);
+ } else {
+ StreamUtils.writeInt(out, v);
+ }
+ }
+
protected abstract static class
BufferedEncodedSeeker