diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 056ecbc..5f41012 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -134,7 +135,7 @@ public class IPCUtil { } bufferSize = ClassSize.align((int)longSize); } - baos = new ByteBufferOutputStream(bufferSize); + baos = new ByteBufferOutputStream(bufferSize, false, ByteOrder.BIG_ENDIAN); } OutputStream os = baos; Compressor poolCompressor = null; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index b2343f1..315e9a3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -37,6 +37,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -2532,12 +2533,22 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, if (!withTags) { length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE; } - // This does same as DataOuput#writeInt (big-endian, etc.) - StreamUtils.writeInt(out, length); + writeInt(out, length); out.write(this.bytes, this.offset, length); return length + Bytes.SIZEOF_INT; } + // This does same as DataOuput#writeInt (big-endian, etc.) + public static void writeInt(OutputStream out, int v) throws IOException { + // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying + // ByteBuffer in one step. + if (out instanceof ByteBufferOutputStream) { + ((ByteBufferOutputStream) out).writeInt(v); + } else { + StreamUtils.writeInt(out, v); + } + } + /** * Comparator that compares row component only of a KeyValue. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 3b0c05c..5035666 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -679,11 +679,11 @@ public class KeyValueUtil { int tlen = cell.getTagsLength(); // write total length - StreamUtils.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags)); + KeyValue.writeInt(out, length(rlen, flen, qlen, vlen, tlen, withTags)); // write key length - StreamUtils.writeInt(out, keyLength(rlen, flen, qlen)); + KeyValue.writeInt(out, keyLength(rlen, flen, qlen)); // write value length - StreamUtils.writeInt(out, vlen); + KeyValue.writeInt(out, vlen); // Write rowkey - 2 bytes rk length followed by rowkey bytes StreamUtils.writeShort(out, rlen); out.write(cell.getRowArray(), cell.getRowOffset(), rlen); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java index 6de6653..a571580 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/NoTagsKeyValue.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.util.Bytes; /** @@ -44,8 +43,7 @@ public class NoTagsKeyValue extends KeyValue { public int write(OutputStream out, boolean withTags) throws IOException { // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls // check KeyValueUtil#oswrite also and do necessary changes. - // This does same as DataOuput#writeInt (big-endian, etc.) - StreamUtils.writeInt(out, this.length); + writeInt(out, this.length); out.write(this.bytes, this.offset, this.length); return this.length + Bytes.SIZEOF_INT; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java index c685a92..7bee173 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -43,6 +44,7 @@ import com.google.common.annotations.VisibleForTesting; * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this * class for a couple of seconds to get reporting on how it is running when deployed. * + *

This pool returns {@link ByteOrder#BIG_ENDIAN} byte ordered ByteBuffers. *

This class is thread safe. */ @InterfaceAudience.Private @@ -95,6 +97,7 @@ public class BoundedByteBufferPool { bb.clear(); } else { bb = ByteBuffer.allocate(this.runningAverage); + bb.order(ByteOrder.BIG_ENDIAN); this.allocations.incrementAndGet(); } if (LOG.isTraceEnabled()) { @@ -108,6 +111,7 @@ public class BoundedByteBufferPool { public void putBuffer(ByteBuffer bb) { // If buffer is larger than we want to keep around, just let it go. if (bb.capacity() > this.maxByteBufferSizeToCache) return; + assert bb.order() == ByteOrder.BIG_ENDIAN; boolean success = false; int average = 0; lock.lock(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index af12113..d5430bf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; @@ -43,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream { } public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { - this(allocate(capacity, useDirectByteBuffer)); + this(capacity, useDirectByteBuffer, null); + } + + public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer, ByteOrder order) { + this(allocate(capacity, useDirectByteBuffer, order)); } /** @@ -65,8 +70,12 @@ public class ByteBufferOutputStream extends OutputStream { return buf.position(); } - private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { - return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); + private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer, + ByteOrder bo) { + ByteBuffer bb = useDirectByteBuffer ? ByteBuffer.allocateDirect(capacity) : ByteBuffer + .allocate(capacity); + if (bo != null) bb.order(bo); + return bb; } /** @@ -85,7 +94,7 @@ public class ByteBufferOutputStream extends OutputStream { int newSize = (int)Math.min((((long)buf.capacity()) * 2), (long)(Integer.MAX_VALUE)); newSize = Math.max(newSize, buf.position() + extra); - ByteBuffer newBuf = allocate(newSize, buf.isDirect()); + ByteBuffer newBuf = allocate(newSize, buf.isDirect(), buf.order()); buf.flip(); newBuf.put(buf); buf = newBuf; @@ -128,6 +137,17 @@ public class ByteBufferOutputStream extends OutputStream { buf.put(b, off, len); } + /** + * Writes an int to the underlying output stream as four + * bytes, high byte first. + * @param i the int to write + * @throws IOException if an I/O error occurs. + */ + public void writeInt(int i) throws IOException { + checkSizeAndGrow(Bytes.SIZEOF_INT); + this.buf.putInt(i); + } + @Override public void flush() throws IOException { // noop diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 62e81ab..8406a1c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Streamable; import org.apache.hadoop.hbase.SettableSequenceId; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.hfile.BlockType; @@ -548,9 +549,9 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { public int write(OutputStream out, boolean withTags) throws IOException { int lenToWrite = KeyValueUtil.length(rowLength, familyLength, qualifierLength, valueLength, tagsLength, withTags); - StreamUtils.writeInt(out, lenToWrite); - StreamUtils.writeInt(out, keyOnlyBuffer.length); - StreamUtils.writeInt(out, valueLength); + writeInt(out, lenToWrite); + writeInt(out, keyOnlyBuffer.length); + writeInt(out, valueLength); // Write key out.write(keyOnlyBuffer); // Write value @@ -574,6 +575,16 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } } + private static void writeInt(OutputStream out, int v) throws IOException { + // We have writeInt in ByteBufferOutputStream so that it can directly write int to underlying + // ByteBuffer in one step. + if (out instanceof ByteBufferOutputStream) { + ((ByteBufferOutputStream) out).writeInt(v); + } else { + StreamUtils.writeInt(out, v); + } + } + protected abstract static class BufferedEncodedSeeker implements EncodedSeeker {