.../java/org/apache/hadoop/hbase/KeyValue.java | 12 +- .../hbase/io/MultiByteBufferInputStream.java | 101 ++++++++++ .../hbase/io/encoding/CopyKeyDataBlockEncoder.java | 11 +- .../hadoop/hbase/io/encoding/DataBlockEncoder.java | 3 +- .../hbase/io/encoding/DiffKeyDeltaEncoder.java | 13 +- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 17 +- .../io/encoding/HFileBlockDecodingContext.java | 5 +- .../encoding/HFileBlockDefaultDecodingContext.java | 6 +- .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 17 +- .../apache/hadoop/hbase/io/hfile/BlockType.java | 5 +- .../apache/hadoop/hbase/nio/MultiByteBuffer.java | 204 +++++++++++++++++++- .../apache/hadoop/hbase/util/ByteBufferArray.java | 49 +++++ .../apache/hadoop/hbase/util/ByteBufferUtils.java | 137 +++++++++----- .../java/org/apache/hadoop/hbase/util/Hash.java | 2 + .../org/apache/hadoop/hbase/util/UnsafeAccess.java | 209 ++++++++++++++++++++- .../hadoop/hbase/nio/TestMultiByteBuffer.java | 2 +- .../hbase/codec/prefixtree/PrefixTreeCodec.java | 5 +- .../hbase/io/hfile/CacheableDeserializer.java | 6 +- .../hadoop/hbase/io/hfile/CompoundBloomFilter.java | 8 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 3 +- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 100 +++++----- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 37 ++-- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 119 ++++++------ .../hadoop/hbase/io/hfile/MemcachedBlockCache.java | 3 +- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 12 +- .../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 14 ++ .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 18 ++ .../hadoop/hbase/io/hfile/bucket/IOEngine.java | 19 ++ .../hadoop/hbase/regionserver/StoreFile.java | 11 +- .../org/apache/hadoop/hbase/util/BloomFilter.java | 5 +- .../apache/hadoop/hbase/util/BloomFilterChunk.java | 5 +- .../apache/hadoop/hbase/util/BloomFilterUtil.java | 19 +- .../hbase/io/encoding/TestDataBlockEncoders.java | 4 +- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 9 +- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 5 +- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 5 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 10 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 25 +-- .../io/hfile/TestHFileBlockCompatibility.java | 5 +- .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 7 +- .../hadoop/hbase/io/hfile/TestHFileWriterV2.java | 5 +- .../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 5 +- .../io/hfile/bucket/TestByteBufferIOEngine.java | 44 +++++ .../hbase/regionserver/TestSplitTransaction.java | 1 + .../hadoop/hbase/util/TestBloomFilterChunk.java | 68 +++---- .../hadoop/hbase/util/TestByteBufferUtils.java | 2 +- 46 files changed, 1063 insertions(+), 309 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 2fc7975..237f7e5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -2676,6 +2676,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells */ public static class KeyOnlyKeyValue extends KeyValue { + private short rowLen = -1; public KeyOnlyKeyValue() { } @@ -2705,6 +2706,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, this.bytes = key; this.offset = offset; this.length = length; + this.rowLen = -1; } @Override @@ -2762,7 +2764,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, @Override public short getRowLength() { - return Bytes.toShort(this.bytes, getKeyOffset()); + if (rowLen == -1) { + rowLen = Bytes.toShort(this.bytes, getKeyOffset()); + } + return rowLen; } @Override @@ -2832,5 +2837,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, public boolean equals(Object other) { return super.equals(other); } + + @Override + public long heapSize() { + return super.heapSize() + Bytes.SIZEOF_LONG; + } } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/MultiByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/MultiByteBufferInputStream.java new file mode 100644 index 0000000..98b6c01 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/MultiByteBufferInputStream.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.InputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; + +/** + * Not thread safe! + *

+ * Please note that the reads will cause position movement on wrapped ByteBuffer. + */ +@InterfaceAudience.Private +public class MultiByteBufferInputStream extends InputStream { + + private MultiByteBuffer buf; + + public MultiByteBufferInputStream(MultiByteBuffer buf) { + this.buf = buf; + } + + /** + * Reads the next byte of data from this input stream. The value byte is returned as an + * int in the range 0 to 255. If no byte is available + * because the end of the stream has been reached, the value -1 is returned. + * @return the next byte of data, or -1 if the end of the stream has been reached. + */ + public int read() { + if (this.buf.hasRemaining()) { + return (this.buf.get() & 0xff); + } + return -1; + } + + /** + * Reads up to next len bytes of data from buffer into passed array(starting from + * given offset). + * @param b the array into which the data is read. + * @param off the start offset in the destination array b + * @param len the maximum number of bytes to read. + * @return the total number of bytes actually read into the buffer, or -1 if not even + * 1 byte can be read because the end of the stream has been reached. + */ + public int read(byte b[], int off, int len) { + int avail = available(); + if (avail <= 0) { + return -1; + } + + if (len > avail) { + len = avail; + } + if (len <= 0) { + return 0; + } + + this.buf.get(b, off, len); + return len; + } + + /** + * Skips n bytes of input from this input stream. Fewer bytes might be skipped if the + * end of the input stream is reached. The actual number k of bytes to be skipped is + * equal to the smaller of n and remaining bytes in the stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + public long skip(long n) { + long k = Math.min(n, available()); + if (k < 0) { + k = 0; + } + this.buf.skip((int) k); + return k; + } + + /** + * @return the number of remaining bytes that can be read (or skipped + * over) from this input stream. + */ + public int available() { + return this.buf.remaining(); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java index 4eea272..bd1ceca 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; @@ -66,13 +67,13 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(MultiByteBuffer block) { int keyLength = block.getInt(Bytes.SIZEOF_INT); - ByteBuffer dup = block.duplicate(); int pos = 3 * Bytes.SIZEOF_INT; - dup.position(pos); - dup.limit(pos + keyLength); - return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength); + ByteBuffer key = block.asSubBuffer(pos + keyLength).duplicate(); + key.position(pos); + key.limit(pos + keyLength); + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java index eddc689..6f6d05e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; /** * Encoding of KeyValue. It aims to be fast and efficient using assumptions: @@ -90,7 +91,7 @@ public interface DataBlockEncoder { * @param block encoded block we want index, the position will not change * @return First key in block as a cell. */ - Cell getFirstKeyCellInBlock(ByteBuffer block); + Cell getFirstKeyCellInBlock(MultiByteBuffer block); /** * Create a HFileBlock seeker which find KeyValues within a block. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java index f2d4751..944591e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -305,15 +306,15 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(MultiByteBuffer block) { block.mark(); block.position(Bytes.SIZEOF_INT); byte familyLength = block.get(); - ByteBufferUtils.skip(block, familyLength); + block.skip(familyLength); byte flag = block.get(); - int keyLength = ByteBufferUtils.readCompressedInt(block); - ByteBufferUtils.readCompressedInt(block); // valueLength - ByteBufferUtils.readCompressedInt(block); // commonLength + int keyLength = MultiByteBuffer.readCompressedInt(block); + MultiByteBuffer.readCompressedInt(block); // valueLength + MultiByteBuffer.readCompressedInt(block); // commonLength ByteBuffer result = ByteBuffer.allocate(keyLength); // copy row @@ -341,7 +342,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { // copy the timestamp and type int timestampFitInBytes = ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; - long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes); + long timestamp = MultiByteBuffer.readLong(block, timestampFitInBytes); if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { timestamp = -timestamp; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index f750e09..da0073e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -354,18 +355,16 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(MultiByteBuffer block) { block.mark(); block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); - int keyLength = ByteBufferUtils.readCompressedInt(block); - ByteBufferUtils.readCompressedInt(block); // valueLength - ByteBufferUtils.readCompressedInt(block); // commonLength - int pos = block.position(); + int keyLength = MultiByteBuffer.readCompressedInt(block); + MultiByteBuffer.readCompressedInt(block); // valueLength + MultiByteBuffer.readCompressedInt(block); // commonLength + ByteBuffer key = block.asSubBuffer(keyLength).duplicate(); block.reset(); - ByteBuffer dup = block.duplicate(); - dup.position(pos); - dup.limit(pos + keyLength); - return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength); + key.limit(key.position() + keyLength); + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java index 37001cc..2c0f847 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; /** * A decoding context that is created by a reader's encoder, and is shared @@ -46,8 +47,8 @@ public interface HFileBlockDecodingContext { void prepareDecoding( int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - ByteBuffer blockBufferWithoutHeader, - ByteBuffer onDiskBlock + MultiByteBuffer blockBufferWithoutHeader, + MultiByteBuffer onDiskBlock ) throws IOException; /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java index 78bb0d6..053a613 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java @@ -24,12 +24,14 @@ import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.MultiByteBufferInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Decryptor; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.Bytes; /** @@ -51,8 +53,8 @@ public class HFileBlockDefaultDecodingContext implements @Override public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException { - InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock)); + MultiByteBuffer blockBufferWithoutHeader, MultiByteBuffer onDiskBlock) throws IOException { + InputStream in = new DataInputStream(new MultiByteBufferInputStream(onDiskBlock)); Encryption.Context cryptoContext = fileContext.getEncryptionContext(); if (cryptoContext != Encryption.Context.NONE) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 15608cc..03b0893 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -172,22 +173,20 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(MultiByteBuffer block) { block.mark(); block.position(Bytes.SIZEOF_INT); - int keyLength = ByteBufferUtils.readCompressedInt(block); - ByteBufferUtils.readCompressedInt(block); - int commonLength = ByteBufferUtils.readCompressedInt(block); + int keyLength = MultiByteBuffer.readCompressedInt(block); + MultiByteBuffer.readCompressedInt(block); + int commonLength = MultiByteBuffer.readCompressedInt(block); if (commonLength != 0) { throw new AssertionError("Nonzero common length in the first key in " + "block: " + commonLength); } - int pos = block.position(); + ByteBuffer key = block.asSubBuffer(keyLength).duplicate(); block.reset(); - ByteBuffer dup = block.duplicate(); - dup.position(pos); - dup.limit(pos + keyLength); - return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength); + key.limit(key.position() + keyLength); + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java index 0db584e..3ebaefc 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.Bytes; /** @@ -131,7 +132,7 @@ public enum BlockType { out.write(magic); } - public void write(ByteBuffer buf) { + public void write(MultiByteBuffer buf) { buf.put(magic); } @@ -161,7 +162,7 @@ public enum BlockType { return parse(buf, 0, buf.length); } - public static BlockType read(ByteBuffer buf) throws IOException { + public static BlockType read(MultiByteBuffer buf) throws IOException { byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)]; buf.get(magicBuf); BlockType blockType = parse(magicBuf, 0, magicBuf.length); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java index 5fcc34d..1cae863 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java @@ -140,7 +140,7 @@ public class MultiByteBuffer { */ public byte get(int index) { if (singleItem) { - return this.curItem.get(index); + return ByteBufferUtils.getByte(curItem, index); } int itemIndex = getItemIndex(index); return this.items[itemIndex].get(index - this.itemBeginPos[itemIndex]); @@ -151,7 +151,7 @@ public class MultiByteBuffer { */ private int getItemIndex(int elemIndex) { int index = 1; - while (elemIndex > this.itemBeginPos[index]) { + while (elemIndex >= this.itemBeginPos[index]) { index++; if (index == this.itemBeginPos.length) { throw new IndexOutOfBoundsException(); @@ -252,6 +252,28 @@ public class MultiByteBuffer { return getInt(index, itemIndex); } + /** + * Fetches the short at the given index. Does not change position of the underlying ByteBuffers. The + * difference for this API from {@link #getShort(int)} is the caller is sure that the index will be + * after the current position of this MBB. + * + * @param index + * @return the short value at the given index + */ + public short getShortStrictlyForward(int index) { + if (singleItem) { + return ByteBufferUtils.toShort(this.curItem, index); + } + // Mostly the index specified will land within this current item. Short circuit for that + int itemIndex; + if (this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndexFromCurItemIndex(index); + } + return getShort(index, itemIndex); + } + private int getInt(int index, int itemIndex) { ByteBuffer item = items[itemIndex]; int offsetInItem = index - this.itemBeginPos[itemIndex]; @@ -277,6 +299,56 @@ public class MultiByteBuffer { return l; } + private short getShort(int index, int itemIndex) { + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + int remainingLen = item.limit() - offsetInItem; + if (remainingLen >= Bytes.SIZEOF_SHORT) { + return ByteBufferUtils.toShort(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a int. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available bytes from this item and remaining from next + short l = 0; + for (int i = offsetInItem; i < item.capacity(); i++) { + l <<= 8; + l ^= item.get(i) & 0xFF; + } + for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { + l <<= 8; + l ^= nextItem.get(i) & 0xFF; + } + return l; + } + + private long getLong(int index, int itemIndex) { + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + int remainingLen = item.limit() - offsetInItem; + if (remainingLen >= Bytes.SIZEOF_LONG) { + return ByteBufferUtils.toLong(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available bytes from this item and remaining from next + long l = 0; + for (int i = offsetInItem; i < item.capacity(); i++) { + l <<= 8; + l ^= item.get(i) & 0xFF; + } + for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) { + l <<= 8; + l ^= nextItem.get(i) & 0xFF; + } + return l; + } + /** * Fetches the long at the given index. Does not change position of the underlying ByteBuffers * @param index @@ -284,7 +356,7 @@ public class MultiByteBuffer { */ public long getLong(int index) { if (singleItem) { - return this.curItem.getLong(index); + return ByteBufferUtils.getLong(curItem, index); } // Mostly the index specified will land within this current item. Short circuit for that int itemIndex; @@ -319,6 +391,28 @@ public class MultiByteBuffer { } /** + * Fetches the long at the given index. Does not change position of the underlying ByteBuffers. The + * difference for this API from {@link #getLong(int)} is the caller is sure that the index will be + * after the current position of this MBB. + * + * @param index + * @return the long value at the given index + */ + public long getLongStrictlyForward(int index) { + if (singleItem) { + return ByteBufferUtils.toLong(this.curItem, index); + } + // Mostly the index specified will land within this current item. Short circuit for that + int itemIndex; + if (this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndexFromCurItemIndex(index); + } + return getLong(index, itemIndex); + } + + /** * @return this MBB's current position */ public int position() { @@ -556,6 +650,22 @@ public class MultiByteBuffer { return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); } + public long getVLong(int pos) { + byte firstByte = get(pos); + int len = WritableUtils.decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + byte b; + for (int idx = 0; idx < len - 1; idx++) { + b = get(pos); + i = i << 8; + i = i | (b & 0xFF); + } + return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } + /** * Copies the content from this MBB's current position to the byte array and fills it. Also * advances the position of the MBB by the length of the byte[]. @@ -941,7 +1051,7 @@ public class MultiByteBuffer { while (length > 0) { int toRead = Math.min(length, locCurItem.remaining()); ByteBufferUtils - .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead); + .copyFromBufferToByteArray(dupB, locCurItem, locCurItem.position(), offset, toRead); length -= toRead; if (length == 0) break; @@ -985,7 +1095,7 @@ public class MultiByteBuffer { int destOffset = 0; while (length > 0) { int toRead = Math.min(length, item.limit() - offset); - ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); + ByteBufferUtils.copyFromBufferToByteArray(dst, item, offset, destOffset, toRead); length -= toRead; if (length == 0) break; itemIndex++; @@ -1026,6 +1136,90 @@ public class MultiByteBuffer { return len1 - len2; } + /** + * Copies the content from an MBB to a ByteBuffer + * @param out the ByteBuffer to which the copy has to happen + * @param in the MBB from which the copy has to happen + * @param sourceOffset the offset in the MBB from which the elements has + * to be copied + * @param length the length in the MBB upto which the elements has to be copied + */ + public static void copyFromBufferToBuffer(ByteBuffer out, MultiByteBuffer in, int sourceOffset, + int length) { + if (in.hasArray() && out.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.position() + + out.arrayOffset(), length); + ByteBufferUtils.skip(out, length); + } else { + // Not used from real read path actually. So not going with + // optimization + for (int i = 0; i < length; ++i) { + out.put(in.get(sourceOffset + i)); + } + } + } + + /** + * Reads an integer that is stored in a compressed format + * @param buffer + * @return integer + */ + public static int readCompressedInt(MultiByteBuffer buffer) { + byte b = buffer.get(); + if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) { + return (b & ByteBufferUtils.VALUE_MASK) + + (readCompressedInt(buffer) << ByteBufferUtils.NEXT_BIT_SHIFT); + } + return b & ByteBufferUtils.VALUE_MASK; + } + + /** + * Copy the content from this MBB to a byte[] based on the given offset and + * length + * + * @param offset + * the position from where the copy should start + * @param length + * the length upto which the copy has to be done + * @return byte[] with the copied contents from this MBB. + */ + public byte[] toBytes(int offset, int length) { + byte[] output = new byte[length]; + if (singleItem) { + ByteBufferUtils.copyFromBufferToByteArray(output, this.curItem, offset, 0, length); + return output; + } + int itemIndex = getItemIndex(offset); + ByteBuffer item = this.items[itemIndex]; + int toRead = item.limit() - offset; + int destinationOffset = 0; + while (length > 0) { + toRead = Math.min(length, toRead); + ByteBufferUtils.copyFromBufferToByteArray(output, item, offset, destinationOffset, toRead); + length -= toRead; + if (length == 0) + break; + destinationOffset += toRead; + offset = 0; + item = items[++itemIndex]; + toRead = item.remaining(); + } + return output; + } + + /** + * Returns the a long + * @param in + * @param fitInBytes + * @return + */ + public static long readLong(MultiByteBuffer in, final int fitInBytes) { + long tmpLength = 0; + for (int i = 0; i < fitInBytes; ++i) { + tmpLength |= (in.get() & 0xffl) << (8l * i); + } + return tmpLength; + } @Override public boolean equals(Object obj) { if (!(obj instanceof MultiByteBuffer)) return false; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index d3414dd..e279895 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.util.StringUtils; /** @@ -200,4 +201,52 @@ public final class ByteBufferArray { } assert srcIndex == len; } + + public MultiByteBuffer subArray(long offset, int len) { + assert len >= 0; + long end = offset + len; + int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize); + int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize); + assert startBuffer >= 0 && startBuffer < bufferCount; + assert endBuffer >= 0 && endBuffer < bufferCount + || (endBuffer == bufferCount && endBufferOffset == 0); + if (startBuffer >= locks.length || startBuffer < 0) { + String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer + + ",bufferSize=" + bufferSize; + LOG.error(msg); + throw new RuntimeException(msg); + } + int srcIndex = 0, cnt = -1; + ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1]; + for (int i = startBuffer,j=0; i <= endBuffer; ++i,j++) { + Lock lock = locks[i]; + lock.lock(); + try { + ByteBuffer bb = buffers[i]; + if (i == startBuffer) { + cnt = bufferSize - startBufferOffset; + if (cnt > len) cnt = len; + ByteBuffer dup = bb.duplicate(); + dup.limit(startBufferOffset + cnt).position(startBufferOffset); + mbb[j] = dup.slice(); + } else if (i == endBuffer) { + cnt = endBufferOffset; + ByteBuffer dup = bb.duplicate(); + dup.position(0).limit(cnt); + mbb[j] = dup.slice(); + } else { + cnt = bufferSize ; + ByteBuffer dup = bb.duplicate(); + dup.position(0).limit(cnt); + mbb[j] = dup.slice(); + } + srcIndex += cnt; + } finally { + lock.unlock(); + } + } + assert srcIndex == len; + MultiByteBuffer bb = new MultiByteBuffer(mbb); + return bb; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 33e5cc6..335455f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -37,9 +37,9 @@ import org.apache.hadoop.io.WritableUtils; public final class ByteBufferUtils { // "Compressed integer" serialization helper constants. - private final static int VALUE_MASK = 0x7f; - private final static int NEXT_BIT_SHIFT = 7; - private final static int NEXT_BIT_MASK = 1 << 7; + public final static int VALUE_MASK = 0x7f; + public final static int NEXT_BIT_SHIFT = 7; + public final static int NEXT_BIT_MASK = 1 << 7; private ByteBufferUtils() { } @@ -135,6 +135,48 @@ public final class ByteBufferUtils { } } + public static byte getByte(ByteBuffer buffer, int offset) { + if (UnsafeAccess.isAvailable()) { + return UnsafeAccess.get(buffer, offset); + } else { + return buffer.get(offset); + } + } + + public static long getLong(ByteBuffer buffer, int offset) { + if (UnsafeAccess.isAvailable()) { + return UnsafeAccess.toLongUnsafe(buffer, offset); + } else { + long l = 0; + for (int i = offset; i < offset + Bytes.SIZEOF_LONG; i++) { + l <<= 8; + l ^= getByte(buffer, i) & 0xFF; + } + return l; + } + } + /** + * Copies specified number of bytes from given offset of 'in' ByteBuffer to the array. + * @param out + * @param in + * @param sourceOffset + * @param destinationOffset + * @param length + */ + public static int copyFromBufferToByteArray(byte[] out, ByteBuffer in, + int sourceOffset, int destinationOffset, int length) { + if (in.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length); + } else if (UnsafeAccess.isAvailable()) { + UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); + } else { + for (int i = 0; i < length; i++) { + out[destinationOffset + i] = in.get(sourceOffset + i); + } + } + return destinationOffset + length; + } + /** * Copy the data to the output stream and update position in buffer. * @param out the stream to write bytes to @@ -178,6 +220,15 @@ public final class ByteBufferUtils { return fitInBytes; } + public static int putByte(ByteBuffer buffer, int offset, byte b) { + if (UnsafeAccess.isAvailable()) { + return UnsafeAccess.putByte(buffer, offset, b); + } else { + buffer.put(offset, b); + return offset + 1; + } + } + /** * Check how many bytes are required to store value. * @param value Value which size will be tested. @@ -330,30 +381,6 @@ public final class ByteBufferUtils { } /** - * Copy from one buffer to another from given offset. - *

- * Note : This will advance the position marker of {@code out} but not change the position maker - * for {@code in} - * @param out destination buffer - * @param in source buffer - * @param sourceOffset offset in the source buffer - * @param length how many bytes to copy - */ - public static void copyFromBufferToBuffer(ByteBuffer out, - ByteBuffer in, int sourceOffset, int length) { - if (in.hasArray() && out.hasArray()) { - System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), - out.array(), out.position() + - out.arrayOffset(), length); - skip(out, length); - } else { - for (int i = 0; i < length; ++i) { - out.put(in.get(sourceOffset + i)); - } - } - } - - /** * Copy from one buffer to another from given offset. This will be absolute positional copying and * won't affect the position of any of the buffers. * @param out @@ -362,16 +389,47 @@ public final class ByteBufferUtils { * @param destinationOffset * @param length */ - public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset, + public static int copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset, int destinationOffset, int length) { if (in.hasArray() && out.hasArray()) { System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset() + destinationOffset, length); + } else if (UnsafeAccess.isAvailable()) { + UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); } else { + // TODO buf.put(buf) will be better.. Deal with pos then. for (int i = 0; i < length; ++i) { - out.put((destinationOffset + i), in.get(sourceOffset + i)); + putByte(out, destinationOffset + i, getByte(in, sourceOffset + i)); } } + return destinationOffset + length; + } + + /** + * Copy from one buffer to another from given offset. + *

+ * Note : This will advance the position marker of {@code out} but not change the position maker + * for {@code in} + * @param out destination buffer + * @param in source buffer + * @param sourceOffset offset in the source buffer + * @param length how many bytes to copy + */ + public static void copyFromBufferToBuffer(ByteBuffer out, + ByteBuffer in, int sourceOffset, int length) { + if (in.hasArray() && out.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.position() + + out.arrayOffset(), length); + } else if (UnsafeAccess.isAvailable()) { + UnsafeAccess.copy(in, sourceOffset, out, out.position(), length); + } else { + // TODO buf.put(buf) will be better.. Deal with pos then. + int destOffset = out.position(); + for (int i = 0; i < length; ++i) { + putByte(out, destOffset + i, getByte(in, sourceOffset + i)); + } + } + skip(out, length); } /** @@ -653,25 +711,4 @@ public final class ByteBufferUtils { out.put(in, inOffset, length); } } - - /** - * Copies specified number of bytes from given offset of 'in' ByteBuffer to the array. - * @param out - * @param in - * @param sourceOffset - * @param destinationOffset - * @param length - */ - public static void copyFromBufferToArray(byte[] out, ByteBuffer in, - int sourceOffset, int destinationOffset, int length) { - if (in.hasArray()) { - System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length); - } else if (UnsafeAccess.isAvailable()) { - UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); - } else { - for (int i = 0; i < length; i++) { - out[destinationOffset + i] = in.get(sourceOffset + i); - } - } - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java index 34d9f90..aa0795d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java @@ -140,4 +140,6 @@ public abstract class Hash { * @return hash value */ public abstract int hash(byte[] bytes, int offset, int length, int initval); + + // TODO : a buffer based hash function would be needed.. Not adding it for now } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index deb9a1a..7af69df 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -274,15 +274,16 @@ public final class UnsafeAccess { } /** - * Copies specified number of bytes from given offset of 'in' ByteBuffer to the array. + * Copies specified number of bytes from given offset of 'in' ByteBuffer to + * the array. + * * @param src * @param srcOffset * @param dest * @param destOffset * @param length */ - public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset, - int length) { + public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset, int length) { long srcAddress = srcOffset; Object srcBase = null; if (src.isDirect()) { @@ -294,4 +295,206 @@ public final class UnsafeAccess { long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET; theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length); } + + public static long getLong(ByteBuffer buf, int offset) { + if (buf.isDirect()) { + return theUnsafe.getLong(((DirectBuffer) buf).address() + offset); + } else { + return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); + } + } + + public static int putShort(ByteBuffer buf, int offset, short val) { + if (buf.isDirect()) { + theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val); + } else { + theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); + } + return offset + Bytes.SIZEOF_SHORT; + } + + public static int putInt(ByteBuffer buf, int offset, int val) { + if (buf.isDirect()) { + theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val); + } else { + theUnsafe.putInt(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); + } + return offset + Bytes.SIZEOF_INT; + } + + public static int putLong(ByteBuffer buf, int offset, long val) { + if (buf.isDirect()) { + theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val); + } else { + theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); + } + return offset + Bytes.SIZEOF_LONG; + } + + public static int getInt(ByteBuffer buf, int offset) { + if (buf.isDirect()) { + return theUnsafe.getInt(((DirectBuffer) buf).address() + offset); + } else { + return theUnsafe.getInt(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); + } + } + + public static byte get(ByteBuffer buf, int offset) { + if (buf.isDirect()) { + return theUnsafe.getByte(((DirectBuffer) buf).address() + offset); + } else { + return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); + } + } + + public static int putByte(ByteBuffer buf, int offset, byte b) { + if (buf.isDirect()) { + theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b); + } else { + theUnsafe.putByte(buf.array(), + BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, b); + } + return offset + 1; + } + + public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset, int length) { + long srcAddress = srcOffset; + Object srcBase = null; + if (src.isDirect()) { + srcAddress = srcAddress + ((DirectBuffer) src).address(); + } else { + srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset(); + srcBase = src.array(); + } + long destAddress = destOffset; + Object destBase = null; + if (dest.isDirect()) { + destAddress = destAddress + ((DirectBuffer) dest).address(); + } else { + destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); + destBase = dest.array(); + } + theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length); + } + + public static short getShort(ByteBuffer buf, int offset) { + if (buf.isDirect()) { + return theUnsafe.getShort(((DirectBuffer) buf).address() + offset); + } else { + assert buf.hasArray(); + return theUnsafe.getShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); + } + } + + public static int toIntUnsafe(ByteBuffer buf, int offset) { + if (littleEndian) { + return Integer.reverseBytes(getInt(buf, offset)); + } else { + return getInt(buf, offset); + } + } + + public static short toShortUnsafe(ByteBuffer buf, int offset) { + if (littleEndian) { + return Short.reverseBytes(getShort(buf, offset)); + } else { + return getShort(buf, offset); + } + } + + public static long toLongUnsafe(ByteBuffer buf, int offset) { + if (littleEndian) { + return Long.reverseBytes(getLong(buf, offset)); + } else { + return getLong(buf, offset); + } + } + + public static int putShortUnsafe(ByteBuffer buffer, int offset, short val) { + if (littleEndian) { + val = Short.reverseBytes(val); + } + return putShort(buffer, offset, val); + } + + public static int putIntUnsafe(ByteBuffer buffer, int offset, int val) { + if (littleEndian) { + val = Integer.reverseBytes(val); + } + return putInt(buffer, offset, val); + } + + public static int putLongUnsafe(ByteBuffer buffer, int offset, long val) { + if (littleEndian) { + val = Long.reverseBytes(val); + } + return putLong(buffer, offset, val); + } + + // Fix should be : already reverse endian is done in the bytes.toIntUnsafe + // and in the lessThanUnsignedInt we are again doing reverseEndian and hence + // the problem + static int compareTo(ByteBuffer buf1, int o1, int l1, byte[] buf2, int o2, int l2) { + final int minLength = Math.min(l1, l2); + final int minWords = minLength / Bytes.SIZEOF_LONG; + final long offset2Adj = o2 + BYTE_ARRAY_BASE_OFFSET; + for (int i = 0; i < minWords * Bytes.SIZEOF_LONG; i += Bytes.SIZEOF_LONG) { + long lw = getLong(buf1, o1 + i); + long rw = theUnsafe.getLong(buf2, (long) offset2Adj + i); + long diff = lw ^ rw; + if (diff != 0) { + return lessThanUnsignedLong(lw, rw) ? -1 : 1; + } + } + int offset = minWords * Bytes.SIZEOF_LONG; + + if (minLength - offset >= Bytes.SIZEOF_INT) { + int il = getInt(buf1, o1 + offset); + int ir = theUnsafe.getInt(buf2, offset2Adj + offset); + if (il != ir) { + return lessThanUnsignedInt(il, ir) ? -1 : 1; + } + offset += Bytes.SIZEOF_INT; + } + if (minLength - offset >= Bytes.SIZEOF_SHORT) { + short sl = getShort(buf1, o1 + offset); + short sr = theUnsafe.getShort(buf2, offset2Adj + offset); + if (sl != sr) { + return lessThanUnsignedShort(sl, sr) ? -1 : 1; + } + offset += Bytes.SIZEOF_SHORT; + } + if (minLength - offset == 1) { + int a = (buf1.get(o1 + offset) & 0xff); + int b = (buf2[o2 + offset] & 0xff); + if (a != b) { + return a - b; + } + } + return l1 - l2; + } + + static boolean lessThanUnsignedLong(long x1, long x2) { + if (littleEndian) { + x1 = Long.reverseBytes(x1); + x2 = Long.reverseBytes(x2); + } + return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE); + } + + static boolean lessThanUnsignedInt(int x1, int x2) { + if (littleEndian) { + x1 = Integer.reverseBytes(x1); + x2 = Integer.reverseBytes(x2); + } + return (x1 & 0xffffffffL) < (x2 & 0xffffffffL); + } + + static boolean lessThanUnsignedShort(short x1, short x2) { + if (littleEndian) { + x1 = Short.reverseBytes(x1); + x2 = Short.reverseBytes(x2); + } + return (x1 & 0xffff) < (x2 & 0xffff); + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java index 27f3484..918af05 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java @@ -99,7 +99,7 @@ public class TestMultiByteBuffer { mbb.position(21); assertEquals(b1, mbb.get()); mbb.put(b); - assertEquals(l2, mbb.getLong(22)); + assertEquals(l2, mbb.getLongStrictlyForward(22)); } @Test diff --git a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java index 7fceaa5..ce20bbc 100644 --- a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java +++ b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeCodec.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.io.WritableUtils; @@ -114,12 +115,12 @@ public class PrefixTreeCodec implements DataBlockEncoder { @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(MultiByteBuffer block) { block.rewind(); PrefixTreeArraySearcher searcher = null; try { // should i includeMemstoreTS (second argument)? i think PrefixKeyDeltaEncoder is, so i will - searcher = DecoderFactory.checkOut(block, true); + searcher = DecoderFactory.checkOut(block.asSubBuffer(block.limit() - block.position()), true); if (!searcher.positionAtFirstCell()) { return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java index f56a921..b361377 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java @@ -18,9 +18,9 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; /** * Interface for a deserializer. Throws an IOException if the serialized data is @@ -33,7 +33,7 @@ public interface CacheableDeserializer { * * @return T the deserialized object. */ - T deserialize(ByteBuffer b) throws IOException; + T deserialize(MultiByteBuffer b) throws IOException; /** * @@ -43,7 +43,7 @@ public interface CacheableDeserializer { * @return T the deserialized object. * @throws IOException */ - T deserialize(ByteBuffer b, boolean reuse) throws IOException; + T deserialize(MultiByteBuffer b, boolean reuse) throws IOException; /** * Get the identifier of this deserialiser. Identifier is unique for each diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java index 11436ce..3d148b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java @@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.DataInput; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -93,7 +93,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase } @Override - public boolean contains(byte[] key, int keyOffset, int keyLength, ByteBuffer bloom) { + public boolean contains(byte[] key, int keyOffset, int keyLength) { // We try to store the result in this variable so we can update stats for // testing, but when an error happens, we log a message and return. @@ -120,7 +120,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase + Bytes.toStringBinary(key, keyOffset, keyLength), ex); } - ByteBuffer bloomBuf = bloomBlock.getBufferReadOnly(); + MultiByteBuffer bloomBuf = bloomBlock.getBufferReadOnly(); result = BloomFilterUtil.contains(key, keyOffset, keyLength, bloomBuf, bloomBlock.headerSize(), bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount); @@ -137,7 +137,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase } @Override - public boolean contains(Cell keyCell, ByteBuffer bloom) { + public boolean contains(Cell keyCell) { // We try to store the result in this variable so we can update stats for // testing, but when an error happens, we log a message and return. int block = index.rootBlockContainingKey(keyCell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index d18dada..8dee67e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -389,7 +390,7 @@ public class HFile { HFileScanner getScanner(boolean cacheBlocks, final boolean pread, final boolean isCompaction); - ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; + MultiByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException; Map loadFileInfo() throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index f3bf0b7..feeaee1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -34,14 +34,14 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.MultiByteBufferInputStream; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; -import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -104,8 +104,8 @@ public class HFileBlock implements Cacheable { static final byte[] DUMMY_HEADER_NO_CHECKSUM = new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM]; - public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase( - ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); + public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase( + new MultiByteBuffer(ByteBuffer.wrap(new byte[0], 0, 0)).getClass(), false); // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT @@ -118,14 +118,16 @@ public class HFileBlock implements Cacheable { static final CacheableDeserializer blockDeserializer = new CacheableDeserializer() { - public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{ + public HFileBlock deserialize(MultiByteBuffer buf, boolean reuse) throws IOException{ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind(); - ByteBuffer newByteBuffer; + MultiByteBuffer newByteBuffer; if (reuse) { newByteBuffer = buf.slice(); } else { - newByteBuffer = ByteBuffer.allocate(buf.limit()); - newByteBuffer.put(buf); + int len = buf.limit(); + ByteBuffer bb = ByteBuffer.allocate(len); + MultiByteBuffer.copyFromBufferToBuffer(bb, buf, 0, len); + newByteBuffer = new MultiByteBuffer(bb); } buf.position(buf.limit()); buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE); @@ -145,7 +147,7 @@ public class HFileBlock implements Cacheable { } @Override - public HFileBlock deserialize(ByteBuffer b) throws IOException { + public HFileBlock deserialize(MultiByteBuffer b) throws IOException { return deserialize(b, false); } }; @@ -174,7 +176,7 @@ public class HFileBlock implements Cacheable { private final int onDiskDataSizeWithHeader; /** The in-memory representation of the hfile block */ - private ByteBuffer buf; + private MultiByteBuffer buf; /** Meta data that holds meta information on the hfileblock */ private HFileContext fileContext; @@ -209,7 +211,7 @@ public class HFileBlock implements Cacheable { * @param fileContext HFile meta data */ HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, + long prevBlockOffset, MultiByteBuffer buf, boolean fillHeader, long offset, int onDiskDataSizeWithHeader, HFileContext fileContext) { this.blockType = blockType; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; @@ -224,6 +226,13 @@ public class HFileBlock implements Cacheable { this.buf.rewind(); } + HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, + int onDiskDataSizeWithHeader, HFileContext fileContext) { + this(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, + new MultiByteBuffer(buf), fillHeader, offset, onDiskDataSizeWithHeader, fileContext); + } + /** * Copy constructor. Creates a shallow copy of {@code that}'s buffer. */ @@ -239,6 +248,9 @@ public class HFileBlock implements Cacheable { this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader; } + HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException { + this(new MultiByteBuffer(b), usesHBaseChecksum); + } /** * Creates a block from an existing buffer starting with a header. Rewinds * and takes ownership of the buffer. By definition of rewind, ignores the @@ -247,7 +259,7 @@ public class HFileBlock implements Cacheable { * because majorNumbers indicate the format of a HFile whereas minorNumbers * indicate the format inside a HFileBlock. */ - HFileBlock(ByteBuffer b, boolean usesHBaseChecksum) throws IOException { + HFileBlock(MultiByteBuffer b, boolean usesHBaseChecksum) throws IOException { b.rewind(); blockType = BlockType.read(b); onDiskSizeWithoutHeader = b.getInt(); @@ -334,8 +346,8 @@ public class HFileBlock implements Cacheable { * * @return the buffer with header skipped and checksum omitted. */ - public ByteBuffer getBufferWithoutHeader() { - ByteBuffer dup = this.buf.duplicate(); + public MultiByteBuffer getBufferWithoutHeader() { + MultiByteBuffer dup = this.buf.duplicate(); dup.position(headerSize()); dup.limit(buf.limit() - totalChecksumBytes()); return dup.slice(); @@ -343,15 +355,15 @@ public class HFileBlock implements Cacheable { /** * Returns the buffer this block stores internally. The clients must not - * modify the buffer object. This method has to be public because it is - * used in {@link CompoundBloomFilter} to avoid object - * creation on every Bloom filter lookup, but has to be used with caution. - * Checksum data is not included in the returned buffer but header data is. - * + * modify the buffer object. This method has to be public because it is used + * in {@link CompoundBloomFilter} to avoid object creation on every Bloom + * filter lookup, but has to be used with caution. Checksum data is not + * included in the returned buffer but header data is. + * * @return the buffer of this block for read-only operations */ - public ByteBuffer getBufferReadOnly() { - ByteBuffer dup = this.buf.duplicate(); + public MultiByteBuffer getBufferReadOnly() { + MultiByteBuffer dup = this.buf.duplicate(); dup.limit(buf.limit() - totalChecksumBytes()); return dup.slice(); } @@ -363,8 +375,8 @@ public class HFileBlock implements Cacheable { * * @return the buffer with header and checksum included for read-only operations */ - public ByteBuffer getBufferReadOnlyWithHeader() { - ByteBuffer dup = this.buf.duplicate(); + public MultiByteBuffer getBufferReadOnlyWithHeader() { + MultiByteBuffer dup = this.buf.duplicate(); return dup.slice(); } @@ -374,8 +386,8 @@ public class HFileBlock implements Cacheable { * * @return the byte buffer with header and checksum included */ - ByteBuffer getBufferWithHeader() { - ByteBuffer dupBuf = buf.duplicate(); + MultiByteBuffer getBufferWithHeader() { + MultiByteBuffer dupBuf = buf.duplicate(); dupBuf.rewind(); return dupBuf; } @@ -463,7 +475,7 @@ public class HFileBlock implements Cacheable { dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())); } else { - ByteBuffer bufWithoutHeader = getBufferWithoutHeader(); + MultiByteBuffer bufWithoutHeader = getBufferWithoutHeader(); byte[] dataBeginBytes = new byte[Math.min(32, bufWithoutHeader.limit() - bufWithoutHeader.position())]; bufWithoutHeader.get(dataBeginBytes); @@ -489,7 +501,7 @@ public class HFileBlock implements Cacheable { if (buf.hasArray()) { dataBegin = Bytes.toStringBinary(buf.array(), buf.arrayOffset(), Math.min(32, buf.limit())); } else { - ByteBuffer bufDup = getBufferReadOnly(); + MultiByteBuffer bufDup = getBufferReadOnly(); byte[] dataBeginBytes = new byte[Math.min(32, bufDup.limit() - bufDup.position())]; bufDup.get(dataBeginBytes); dataBegin = Bytes.toStringBinary(dataBeginBytes); @@ -521,7 +533,7 @@ public class HFileBlock implements Cacheable { HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); - ByteBuffer dup = this.buf.duplicate(); + MultiByteBuffer dup = this.buf.duplicate(); dup.position(this.headerSize()); dup = dup.slice(); ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), @@ -534,16 +546,14 @@ public class HFileBlock implements Cacheable { // Below call to copyFromBufferToBuffer() will try positional read/write from/to buffers when // any of the buffer is DBB. So we change the limit on a dup buffer. No copying just create // new BB objects - ByteBuffer inDup = this.buf.duplicate(); + MultiByteBuffer inDup = this.buf.duplicate(); inDup.limit(inDup.limit() + headerSize()); - ByteBuffer outDup = unpacked.buf.duplicate(); + MultiByteBuffer outDup = unpacked.buf.duplicate(); outDup.limit(outDup.limit() + unpacked.headerSize()); - ByteBufferUtils.copyFromBufferToBuffer( - outDup, - inDup, - this.onDiskDataSizeWithHeader, + outDup.put( unpacked.headerSize() + unpacked.uncompressedSizeWithoutHeader - + unpacked.totalChecksumBytes(), unpacked.headerSize()); + + unpacked.totalChecksumBytes(), inDup, this.onDiskDataSizeWithHeader, + unpacked.headerSize()); } return unpacked; } @@ -571,11 +581,11 @@ public class HFileBlock implements Cacheable { // Copy header bytes into newBuf. // newBuf is HBB so no issue in calling array() - ByteBuffer dup = buf.duplicate(); - dup.position(0); + MultiByteBuffer dup = buf.duplicate(); + dup.rewind(); dup.get(newBuf.array(), newBuf.arrayOffset(), headerSize); - buf = newBuf; + buf = new MultiByteBuffer(newBuf); // set limit to exclude next block's header buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); } @@ -627,16 +637,16 @@ public class HFileBlock implements Cacheable { * @return a byte stream reading the data + checksum of this block */ public DataInputStream getByteStream() { - ByteBuffer dup = this.buf.duplicate(); + MultiByteBuffer dup = this.buf.duplicate(); dup.position(this.headerSize()); - return new DataInputStream(new ByteBufferInputStream(dup)); + return new DataInputStream(new MultiByteBufferInputStream(dup)); } @Override public long heapSize() { long size = ClassSize.align( ClassSize.OBJECT + - // Block type, byte buffer and meta references + // Block type, multi byte buffer and meta references 3 * ClassSize.REFERENCE + // On-disk size, uncompressed size, and next block's on-disk size // bytePerChecksum and onDiskDataSize @@ -649,7 +659,7 @@ public class HFileBlock implements Cacheable { if (buf != null) { // Deep overhead of the byte buffer. Needs to be aligned separately. - size += ClassSize.align(buf.capacity() + BYTE_BUFFER_HEAP_SIZE); + size += ClassSize.align(buf.capacity() + MULTI_BYTE_BUFFER_HEAP_SIZE); } return ClassSize.align(size); @@ -1724,7 +1734,7 @@ public class HFileBlock implements Cacheable { @Override public void serialize(ByteBuffer destination) { - ByteBufferUtils.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength() + MultiByteBuffer.copyFromBufferToBuffer(destination, this.buf, 0, getSerializedLength() - EXTRA_SERIALIZATION_SPACE); serializeExtraInfo(destination); } @@ -1786,7 +1796,7 @@ public class HFileBlock implements Cacheable { if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { return false; } - if (ByteBufferUtils.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, + if (MultiByteBuffer.compareTo(this.buf, 0, this.buf.limit(), castedComparison.buf, 0, castedComparison.buf.limit()) != 0) { return false; } @@ -1876,7 +1886,7 @@ public class HFileBlock implements Cacheable { * This is mostly helpful for debugging. This assumes that the block * has minor version > 0. */ - static String toStringHeader(ByteBuffer buf) throws IOException { + static String toStringHeader(MultiByteBuffer buf) throws IOException { byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), BlockType.MAGIC_LENGTH)]; buf.get(magicBuf); BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 86b5e15..8699586 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -45,9 +45,11 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFile.CachingBlockReader; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.util.StringUtils; @@ -342,7 +344,7 @@ public class HFileBlockIndex { // Locate the entry corresponding to the given key in the non-root // (leaf or intermediate-level) index block. - ByteBuffer buffer = block.getBufferWithoutHeader(); + MultiByteBuffer buffer = block.getBufferWithoutHeader(); index = locateNonRootIndexEntry(buffer, key, comparator); if (index == -1) { // This has to be changed @@ -396,14 +398,14 @@ public class HFileBlockIndex { midLeafBlockOffset, midLeafBlockOnDiskSize, true, true, false, true, BlockType.LEAF_INDEX, null); - ByteBuffer b = midLeafBlock.getBufferWithoutHeader(); + MultiByteBuffer b = midLeafBlock.getBufferWithoutHeader(); int numDataBlocks = b.getInt(); - int keyRelOffset = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 1)); - int keyLen = b.getInt(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - + int keyRelOffset = b.getIntStrictlyForward(Bytes.SIZEOF_INT * (midKeyEntry + 1)); + int keyLen = b.getIntStrictlyForward(Bytes.SIZEOF_INT * (midKeyEntry + 2)) - keyRelOffset; int keyOffset = Bytes.SIZEOF_INT * (numDataBlocks + 2) + keyRelOffset + SECONDARY_INDEX_ENTRY_OVERHEAD; - byte[] bytes = ByteBufferUtils.toBytes(b, keyOffset, keyLen); + byte[] bytes = b.toBytes(keyOffset, keyLen); targetMidKey = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length); } else { // The middle of the root-level index. @@ -653,7 +655,7 @@ public class HFileBlockIndex { * @param i the ith position * @return The indexed key at the ith position in the nonRootIndex. */ - protected byte[] getNonRootIndexedKey(ByteBuffer nonRootIndex, int i) { + protected byte[] getNonRootIndexedKey(MultiByteBuffer nonRootIndex, int i) { int numEntries = nonRootIndex.getInt(0); if (i < 0 || i >= numEntries) { return null; @@ -678,7 +680,7 @@ public class HFileBlockIndex { targetKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD; // TODO check whether we can make BB backed Cell here? So can avoid bytes copy. - return ByteBufferUtils.toBytes(nonRootIndex, targetKeyOffset, targetKeyLength); + return nonRootIndex.toBytes(targetKeyOffset, targetKeyLength); } /** @@ -697,10 +699,10 @@ public class HFileBlockIndex { * -1 otherwise * @throws IOException */ - static int binarySearchNonRootIndex(Cell key, ByteBuffer nonRootIndex, + static int binarySearchNonRootIndex(Cell key, MultiByteBuffer nonRootIndex, CellComparator comparator) { - int numEntries = nonRootIndex.getInt(0); + int numEntries = nonRootIndex.getIntStrictlyForward(0); int low = 0; int high = numEntries - 1; int mid = 0; @@ -717,8 +719,7 @@ public class HFileBlockIndex { mid = (low + high) >>> 1; // Midkey's offset relative to the end of secondary index - int midKeyRelOffset = nonRootIndex.getInt( - Bytes.SIZEOF_INT * (mid + 1)); + int midKeyRelOffset = nonRootIndex.getIntStrictlyForward(Bytes.SIZEOF_INT * (mid + 1)); // The offset of the middle key in the blockIndex buffer int midKeyOffset = entriesOffset // Skip secondary index @@ -728,7 +729,7 @@ public class HFileBlockIndex { // We subtract the two consecutive secondary index elements, which // gives us the size of the whole (offset, onDiskSize, key) tuple. We // then need to subtract the overhead of offset and onDiskSize. - int midLength = nonRootIndex.getInt(Bytes.SIZEOF_INT * (mid + 2)) - + int midLength = nonRootIndex.getIntStrictlyForward(Bytes.SIZEOF_INT * (mid + 2)) - midKeyRelOffset - SECONDARY_INDEX_ENTRY_OVERHEAD; // we have to compare in this order, because the comparator order @@ -736,8 +737,9 @@ public class HFileBlockIndex { // TODO make KeyOnlyKeyValue to be Buffer backed and avoid array() call. This has to be // done after HBASE-12224 & HBASE-12282 // TODO avaoid array call. - nonRootIndexKV.setKey(nonRootIndex.array(), - nonRootIndex.arrayOffset() + midKeyOffset, midLength); + Pair p = nonRootIndex.asSubBuffer(midKeyOffset, midLength); + nonRootIndexKV.setKey(p.getFirst().array(), + p.getFirst().arrayOffset() + midKeyOffset, midLength); int cmp = comparator.compareKeyIgnoresMvcc(key, nonRootIndexKV); // key lives above the midpoint @@ -787,19 +789,20 @@ public class HFileBlockIndex { * return -1 in the case the given key is before the first key. * */ - static int locateNonRootIndexEntry(ByteBuffer nonRootBlock, Cell key, + static int locateNonRootIndexEntry(MultiByteBuffer nonRootBlock, Cell key, CellComparator comparator) { int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator); if (entryIndex != -1) { - int numEntries = nonRootBlock.getInt(0); + int numEntries = nonRootBlock.getIntStrictlyForward(0); // The end of secondary index and the beginning of entries themselves. int entriesOffset = Bytes.SIZEOF_INT * (numEntries + 2); // The offset of the entry we are interested in relative to the end of // the secondary index. - int entryRelOffset = nonRootBlock.getInt(Bytes.SIZEOF_INT * (1 + entryIndex)); + int entryRelOffset = nonRootBlock + .getIntStrictlyForward(Bytes.SIZEOF_INT * (1 + entryIndex)); nonRootBlock.position(entriesOffset + entryRelOffset); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 86544c9..1c6c2fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -49,11 +49,13 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -435,7 +437,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } protected static class HFileScannerImpl implements HFileScanner { - private ByteBuffer blockBuffer; + private MultiByteBuffer blockBuffer; protected final boolean cacheBlocks; protected final boolean pread; protected final boolean isCompaction; @@ -509,19 +511,20 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // inlined and is not too big to compile. We also manage position in ByteBuffer ourselves // because it is faster than going via range-checked ByteBuffer methods or going through a // byte buffer array a byte at a time. - int p = blockBuffer.position() + blockBuffer.arrayOffset(); // Get a long at a time rather than read two individual ints. In micro-benchmarking, even // with the extra bit-fiddling, this is order-of-magnitude faster than getting two ints. - long ll = Bytes.toLong(blockBuffer.array(), p); + // Trying to imitate what was done - need to profile if this is better or earlier way is better by doing mark and reset? + // But ensure that you read long instead of two ints + long ll = blockBuffer.getLongStrictlyForward(blockBuffer.position()); // Read top half as an int of key length and bottom int as value length this.currKeyLen = (int)(ll >> Integer.SIZE); this.currValueLen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); checkKeyValueLen(); // Move position past the key and value lengths and then beyond the key and value - p += (Bytes.SIZEOF_LONG + currKeyLen + currValueLen); + int p = blockBuffer.position() + (Bytes.SIZEOF_LONG + currKeyLen + currValueLen); if (reader.getFileContext().isIncludesTags()) { // Tags length is a short. - this.currTagsLen = Bytes.toShort(blockBuffer.array(), p); + this.currTagsLen = blockBuffer.getShortStrictlyForward(p); checkTagsLen(); p += (Bytes.SIZEOF_SHORT + currTagsLen); } @@ -559,14 +562,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // This is Bytes#bytesToVint inlined so can save a few instructions in this hot method; i.e. // previous if one-byte vint, we'd redo the vint call to find int size. // Also the method is kept small so can be inlined. - byte firstByte = blockBuffer.array()[position]; + byte firstByte = blockBuffer.get(position); int len = WritableUtils.decodeVIntSize(firstByte); if (len == 1) { this.currMemstoreTS = firstByte; } else { long i = 0; for (int idx = 0; idx < len - 1; idx++) { - byte b = blockBuffer.array()[position + 1 + idx]; + byte b = blockBuffer.get(position + 1 + idx); i = i << 8; i = i | (b & 0xFF); } @@ -597,13 +600,18 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { */ protected int blockSeek(Cell key, boolean seekBefore) { int klen, vlen, tlen = 0; - long memstoreTS = 0; - int memstoreTSLen = 0; + //long memstoreTS = 0; + //int memstoreTSLen = 0; int lastKeyValueSize = -1; + int pos = -1; do { - blockBuffer.mark(); - klen = blockBuffer.getInt(); - vlen = blockBuffer.getInt(); + pos = blockBuffer.position(); + // do the same way as in readkeyvalue? + // none of the APIs are using the BBUtils because all these API make use of BB API only + // Better to ensure that we use the BB Utils here? + long ll = blockBuffer.getLongStrictlyForward(pos); + klen = (int)(ll >> Integer.SIZE); + vlen = (int)(Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll); if (klen < 0 || vlen < 0 || klen > blockBuffer.limit() || vlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid klen " + klen + " or vlen " @@ -611,77 +619,66 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } - ByteBufferUtils.skip(blockBuffer, klen + vlen); + pos += Bytes.SIZEOF_LONG; + Pair bb = blockBuffer.asSubBuffer(pos, klen); + // TODO :change here after Bufferbackedcells come + keyOnlyKv.setKey(bb.getFirst().array(), bb.getFirst().arrayOffset() + bb.getSecond(), klen); + int comp = reader.getComparator().compareKeyIgnoresMvcc(key, keyOnlyKv); + pos += klen + vlen; if (this.reader.getFileContext().isIncludesTags()) { // Read short as unsigned, high byte first - tlen = ((blockBuffer.get() & 0xff) << 8) ^ (blockBuffer.get() & 0xff); + tlen = ((blockBuffer.get(pos) & 0xff) << 8) ^ (blockBuffer.get(pos + 1) & 0xff); if (tlen < 0 || tlen > blockBuffer.limit()) { throw new IllegalStateException("Invalid tlen " + tlen + ". Block offset: " + block.getOffset() + ", block length: " + blockBuffer.limit() + ", position: " + blockBuffer.position() + " (without header)."); } - ByteBufferUtils.skip(blockBuffer, tlen); + pos += tlen + (2 * Bytes.SIZEOF_BYTE); } if (this.reader.shouldIncludeMemstoreTS()) { - if (this.reader.isDecodeMemstoreTS()) { - memstoreTS = Bytes.readAsVLong(blockBuffer.array(), blockBuffer.arrayOffset() - + blockBuffer.position()); - memstoreTSLen = WritableUtils.getVIntSize(memstoreTS); - } else { - memstoreTS = 0; - memstoreTSLen = 1; - } + // we can change this to below way - so that we can make it use BButils? + readMvccVersion(pos); } - blockBuffer.reset(); - int keyOffset = - blockBuffer.arrayOffset() + blockBuffer.position() + (Bytes.SIZEOF_INT * 2); - keyOnlyKv.setKey(blockBuffer.array(), keyOffset, klen); - int comp = reader.getComparator().compareKeyIgnoresMvcc(key, keyOnlyKv); - if (comp == 0) { if (seekBefore) { if (lastKeyValueSize < 0) { throw new IllegalStateException("blockSeek with seekBefore " - + "at the first key of the block: key=" - + CellUtil.getCellKeyAsString(key) + + "at the first key of the block: key=" + CellUtil.getCellKeyAsString(key) + ", blockOffset=" + block.getOffset() + ", onDiskSize=" + block.getOnDiskSizeWithHeader()); } - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + blockBuffer.moveBack(lastKeyValueSize); readKeyValueLen(); return 1; // non exact match. } currKeyLen = klen; currValueLen = vlen; currTagsLen = tlen; - if (this.reader.shouldIncludeMemstoreTS()) { - currMemstoreTS = memstoreTS; - currMemstoreTSLen = memstoreTSLen; - } return 0; // indicate exact match } else if (comp < 0) { - if (lastKeyValueSize > 0) - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + if (lastKeyValueSize > 0) { + blockBuffer.moveBack(lastKeyValueSize); + } readKeyValueLen(); - if (lastKeyValueSize == -1 && blockBuffer.position() == 0) { + if (lastKeyValueSize == -1 && blockBuffer.position() == 0 + && this.reader.getTrailer().getMinorVersion() >= MINOR_VERSION_WITH_FAKED_KEY) { return HConstants.INDEX_KEY_MAGIC; } return 1; } - // The size of this key/value tuple, including key/value length fields. - lastKeyValueSize = klen + vlen + memstoreTSLen + KEY_VALUE_LEN_SIZE; + lastKeyValueSize = klen + vlen + currMemstoreTSLen + KEY_VALUE_LEN_SIZE; // include tag length also if tags included with KV - if (this.reader.getFileContext().isIncludesTags()) { + if (reader.getFileContext().isIncludesTags()) { lastKeyValueSize += tlen + Bytes.SIZEOF_SHORT; } - blockBuffer.position(blockBuffer.position() + lastKeyValueSize); - } while (blockBuffer.remaining() > 0); + blockBuffer.skip(lastKeyValueSize); + } while (blockBuffer.hasRemaining()); // Seek to the last key we successfully read. This will happen if this is // the last key/value pair in the file, in which case the following call // to next() has to return false. - blockBuffer.position(blockBuffer.position() - lastKeyValueSize); + blockBuffer.moveBack(lastKeyValueSize); readKeyValueLen(); return 1; // didn't exactly find it. } @@ -840,6 +837,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public ByteBuffer getKey() { assertSeeked(); + // TODO : change here after BufferBacked cells come return ByteBuffer.wrap( blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() @@ -849,6 +847,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { @Override public ByteBuffer getValue() { assertSeeked(); + // TODO : change here after BufferBacked cells come return ByteBuffer.wrap( blockBuffer.array(), blockBuffer.arrayOffset() + blockBuffer.position() @@ -1030,14 +1029,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } protected Cell getFirstKeyCellInBlock(HFileBlock curBlock) { - ByteBuffer buffer = curBlock.getBufferWithoutHeader(); + MultiByteBuffer buffer = curBlock.getBufferWithoutHeader(); // It is safe to manipulate this buffer because we own the buffer object. buffer.rewind(); int klen = buffer.getInt(); - buffer.getInt(); - ByteBuffer keyBuff = buffer.slice(); - keyBuff.limit(klen); - keyBuff.rewind(); + buffer.skip(Bytes.SIZEOF_INT);// Skip value len part + ByteBuffer keyBuff = buffer.asSubBuffer(klen); + keyBuff.limit(keyBuff.position() + klen); // Create a KeyOnlyKv now. // TODO : Will change when Buffer backed cells come return new KeyValue.KeyOnlyKeyValue(keyBuff.array(), keyBuff.arrayOffset() @@ -1188,7 +1186,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { * @throws IOException */ @Override - public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) + public MultiByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException { if (trailer.getMetaIndexCount() == 0) { return null; // there are no meta blocks @@ -1457,22 +1455,21 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { + " doesn't support data block encoding " + DataBlockEncoding.getNameFromId(dataBlockEncoderId)); } - - seeker.setCurrentBuffer(getEncodedBuffer(newBlock)); + MultiByteBuffer encodedBuffer = getEncodedBuffer(newBlock); + seeker.setCurrentBuffer(encodedBuffer.asSubBuffer(encodedBuffer.limit())); blockFetches++; // Reset the next indexed key this.nextIndexedKey = null; } - private ByteBuffer getEncodedBuffer(HFileBlock newBlock) { - ByteBuffer origBlock = newBlock.getBufferReadOnly(); - ByteBuffer encodedBlock = ByteBuffer.wrap(origBlock.array(), - origBlock.arrayOffset() + newBlock.headerSize() + - DataBlockEncoding.ID_SIZE, - newBlock.getUncompressedSizeWithoutHeader() - - DataBlockEncoding.ID_SIZE).slice(); - return encodedBlock; + private MultiByteBuffer getEncodedBuffer(HFileBlock newBlock) { + MultiByteBuffer origBlock = newBlock.getBufferReadOnly(); + int pos = newBlock.headerSize() + DataBlockEncoding.ID_SIZE; + origBlock.position(pos); + origBlock + .limit(pos + newBlock.getUncompressedSizeWithoutHeader() - DataBlockEncoding.ID_SIZE); + return origBlock.slice(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java index 57e7f28..2ef6486 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.Addressing; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -255,7 +256,7 @@ public class MemcachedBlockCache implements BlockCache { @Override public HFileBlock decode(CachedData d) { try { - ByteBuffer buf = ByteBuffer.wrap(d.getData()); + MultiByteBuffer buf = new MultiByteBuffer(ByteBuffer.wrap(d.getData())); return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, true); } catch (IOException e) { LOG.warn("Error deserializing data from memcached",e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index dfada87..fa3fc39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; @@ -416,9 +417,14 @@ public class BucketCache implements BlockCache, HeapSize { // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check // existence here. if (bucketEntry.equals(backingMap.get(key))) { + /*int len = bucketEntry.getLength(); + MultiByteBuffer bb = ioEngine.read(bucketEntry.offset(), len); + // TODO : change this area - should be removed after server cells are available + bb = new MultiByteBuffer(bb.asSubBuffer(bb.limit()));*/ int len = bucketEntry.getLength(); - ByteBuffer bb = ByteBuffer.allocate(len); - int lenRead = ioEngine.read(bb, bucketEntry.offset()); + ByteBuffer buf = ByteBuffer.allocate(len); + int lenRead = ioEngine.read(buf, bucketEntry.offset()); + MultiByteBuffer bb = new MultiByteBuffer(buf); if (lenRead != len) { throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected"); } @@ -1269,7 +1275,7 @@ public class BucketCache implements BlockCache, HeapSize { try { if (data instanceof HFileBlock) { HFileBlock block = (HFileBlock) data; - ByteBuffer sliceBuf = block.getBufferReadOnlyWithHeader(); + MultiByteBuffer sliceBuf = block.getBufferReadOnlyWithHeader(); sliceBuf.rewind(); assert len == sliceBuf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE || len == sliceBuf.limit() + block.headerSize() + HFileBlock.EXTRA_SERIALIZATION_SPACE; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index de10667..9cc6d50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ByteBufferArray; /** @@ -78,6 +79,11 @@ public class ByteBufferIOEngine implements IOEngine { dstBuffer.arrayOffset()); } + @Override + public MultiByteBuffer read(long offset, int len) throws IOException { + return bufferArray.subArray(offset, len); + } + /** * Transfers data from the given byte buffer to the buffer array * @param srcBuffer the given byte buffer from which bytes are to be read @@ -92,6 +98,14 @@ public class ByteBufferIOEngine implements IOEngine { srcBuffer.arrayOffset()); } + @Override + public void write(MultiByteBuffer srcBuffer, long offset) throws IOException { + // When caching block into BucketCache there will be single buffer backing for this HFileBlock. + // This will work for now. But from the DFS itself if we get DBB then this may not hold true. + assert srcBuffer.hasArray(); + bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(), + srcBuffer.arrayOffset()); + } /** * No operation for the sync in the memory IO engine */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index 7b6b25f..c9e70fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -26,6 +26,7 @@ import java.nio.channels.FileChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.util.StringUtils; /** @@ -125,4 +126,21 @@ public class FileIOEngine implements IOEngine { LOG.error("Can't shutdown cleanly", ex); } } + + @Override + public MultiByteBuffer read(long offset, int len) throws IOException { + ByteBuffer dstBuffer = ByteBuffer.allocate(len); + int read = read(dstBuffer, offset); + dstBuffer.limit(read); + return new MultiByteBuffer(dstBuffer); + } + + @Override + public void write(MultiByteBuffer srcBuffer, long offset) throws IOException { + // When caching block into BucketCache there will be single buffer backing for this HFileBlock. + assert srcBuffer.hasArray(); + fileChannel.write( + ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset); + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 430c5af..8f39d2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; /** * A class implementing IOEngine interface supports data services for @@ -44,6 +45,16 @@ public interface IOEngine { int read(ByteBuffer dstBuffer, long offset) throws IOException; /** + * Transfers data from IOEngine at the given offset to an MultiByteBuffer + * @param offset the offset from which the underlying buckets should be read + * @param len the length upto which the buckets should be read + * @return the MultiByteBuffer formed from the underlying ByteBuffers forming the + * buckets + * @throws IOException + */ + MultiByteBuffer read(long offset, int len) throws IOException; + + /** * Transfers data from the given byte buffer to IOEngine * @param srcBuffer the given byte buffer from which bytes are to be read * @param offset The offset in the IO engine where the first byte to be @@ -53,6 +64,14 @@ public interface IOEngine { void write(ByteBuffer srcBuffer, long offset) throws IOException; /** + * Transfers the data from the given MultiByteBuffer to IOEngine + * @param srcBuffer the given MultiBytebufffers from which bytes are to be read + * @param offset the offset in the IO engine where the first byte to be written + * @throws IOException + */ + void write(MultiByteBuffer srcBuffer, long offset) throws IOException; + + /** * Sync the data to IOEngine after writing * @throws IOException */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index fc94d3d..230a2f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.BloomFilterFactory; @@ -1222,7 +1223,7 @@ public class StoreFile { if (!bloomFilter.supportsAutoLoading()) { return true; } - return bloomFilter.contains(row, rowOffset, rowLen, null); + return bloomFilter.contains(row, rowOffset, rowLen); } catch (IllegalArgumentException e) { LOG.error("Bad Delete Family bloom filter data -- proceeding without", e); @@ -1286,7 +1287,7 @@ public class StoreFile { try { boolean shouldCheckBloom; - ByteBuffer bloom; + MultiByteBuffer bloom; if (bloomFilter.supportsAutoLoading()) { bloom = null; shouldCheckBloom = true; @@ -1327,12 +1328,12 @@ public class StoreFile { exists = false; } else { exists = - bloomFilter.contains(kvKey, bloom) || - bloomFilter.contains(rowBloomKey, bloom); + bloomFilter.contains(kvKey) || + bloomFilter.contains(rowBloomKey); } } else { exists = !keyIsAfterLast - && bloomFilter.contains(key, 0, key.length, bloom); + && bloomFilter.contains(key, 0, key.length); } return exists; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java index f119086..ee39d18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilter.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; /** * @@ -84,7 +85,7 @@ public interface BloomFilter extends BloomFilterBase { * is supported. * @return true if matched by bloom, false if not */ - boolean contains(Cell keyCell, ByteBuffer bloom); + boolean contains(Cell keyCell); /** * Check if the specified key is contained in the bloom filter. @@ -96,7 +97,7 @@ public interface BloomFilter extends BloomFilterBase { * is supported. * @return true if matched by bloom, false if not */ - boolean contains(byte[] buf, int offset, int length, ByteBuffer bloom); + boolean contains(byte[] buf, int offset, int length); /** * @return true if this Bloom filter can automatically load its data diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java index 9fff872..32421ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterChunk.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import com.google.common.annotations.VisibleForTesting; @@ -208,8 +209,8 @@ public class BloomFilterChunk implements BloomFilterBase { + " theBloom.limit()=" + theBloom.limit() + ", byteSize=" + byteSize); } - return BloomFilterUtil.contains(buf, offset, length, theBloom, 0, (int) byteSize, hash, - hashCount); + return BloomFilterUtil.contains(buf, offset, length, new MultiByteBuffer(theBloom), 0, + (int) byteSize, hash, hashCount); } //--------------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java index ff08f4b..f997195 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterUtil.java @@ -22,6 +22,7 @@ import java.text.NumberFormat; import java.util.Random; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; /** * Utility methods related to BloomFilters @@ -198,7 +199,7 @@ public final class BloomFilterUtil { } public static boolean contains(byte[] buf, int offset, int length, - ByteBuffer bloomBuf, int bloomOffset, int bloomSize, Hash hash, + MultiByteBuffer bloomBuf, int bloomOffset, int bloomSize, Hash hash, int hashCount) { int hash1 = hash.hash(buf, offset, length, 0); @@ -241,6 +242,22 @@ public final class BloomFilterUtil { curByte &= bitvals[bitPos]; return (curByte != 0); } + + + /** + * Check if bit at specified index is 1. + * + * @param pos index of bit + * @return true if bit at specified index is 1, false if 0. + */ + static boolean get(int pos, MultiByteBuffer bloomBuf, int bloomOffset) { + int bytePos = pos >> 3; //pos / 8 + int bitPos = pos & 0x7; //pos % 8 + // TODO access this via Util API which can do Unsafe access if possible(?) + byte curByte = bloomBuf.get(bloomOffset + bytePos); + curByte &= bitvals[bitPos]; + return (curByte != 0); + } /** * A human-readable string with statistics for the given Bloom filter. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java index ede0aef..2b2b489 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -302,7 +303,8 @@ public class TestDataBlockEncoders { DataBlockEncoder encoder = encoding.getEncoder(); ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv, getEncodingContext(Compression.Algorithm.NONE, encoding)); - Cell key = encoder.getFirstKeyCellInBlock(encodedBuffer); + Cell key = encoder.getFirstKeyCellInBlock(new MultiByteBuffer( + encodedBuffer)); KeyValue keyBuffer = null; if(encoding == DataBlockEncoding.PREFIX_TREE) { // This is not an actual case. So the Prefix tree block is not loaded in case of Prefix_tree diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index b0a2ba2..275005e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ChecksumType; public class CacheTestUtils { @@ -253,7 +254,7 @@ public class CacheTestUtils { new CacheableDeserializer() { @Override - public Cacheable deserialize(ByteBuffer b) throws IOException { + public Cacheable deserialize(MultiByteBuffer b) throws IOException { int len = b.getInt(); Thread.yield(); byte buf[] = new byte[len]; @@ -267,7 +268,7 @@ public class CacheTestUtils { } @Override - public Cacheable deserialize(ByteBuffer b, boolean reuse) + public Cacheable deserialize(MultiByteBuffer b, boolean reuse) throws IOException { return deserialize(b); } @@ -326,8 +327,8 @@ public class CacheTestUtils { // declare our data size to be smaller than it by the serialization space // required. - ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize - - HFileBlock.EXTRA_SERIALIZATION_SPACE); + MultiByteBuffer cachedBuffer = new MultiByteBuffer(ByteBuffer.allocate(blockSize + - HFileBlock.EXTRA_SERIALIZATION_SPACE)); rand.nextBytes(cachedBuffer.array()); cachedBuffer.rewind(); int onDiskSizeWithoutHeader = blockSize diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index ce78a37..a0d0107 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Before; @@ -71,13 +72,13 @@ public class TestCacheConfig { } @Override - public Cacheable deserialize(ByteBuffer b, boolean reuse) throws IOException { + public Cacheable deserialize(MultiByteBuffer b, boolean reuse) throws IOException { LOG.info("Deserialized " + b + ", reuse=" + reuse); return cacheable; } @Override - public Cacheable deserialize(ByteBuffer b) throws IOException { + public Cacheable deserialize(MultiByteBuffer b) throws IOException { LOG.info("Deserialized " + b); return cacheable; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index de8d3b9..c8cc639 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.util.ChecksumType; import org.junit.Before; import org.junit.Test; @@ -126,7 +127,7 @@ public class TestChecksum { HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl( is, totalSize, (HFileSystem) fs, path, meta); HFileBlock b = hbr.readBlockData(0, -1, -1, false); - ByteBuffer data = b.getBufferWithoutHeader(); + MultiByteBuffer data = b.getBufferWithoutHeader(); for (int i = 0; i < 1000; i++) { assertEquals(i, data.getInt()); } @@ -194,7 +195,7 @@ public class TestChecksum { assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); // read data back from the hfile, exclude header and checksum - ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data + MultiByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data DataInputStream in = new DataInputStream( new ByteArrayInputStream( bb.array(), bb.arrayOffset(), bb.limit())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index 8f42e6b..a216d74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -329,11 +330,14 @@ public class TestHFile extends HBaseTestCase { private void readNumMetablocks(Reader reader, int n) throws IOException { for (int i = 0; i < n; i++) { - ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); + MultiByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); - assertEquals("failed to match metadata", - Bytes.toStringBinary(expected), Bytes.toStringBinary(actual)); + assertEquals( + "failed to match metadata", + Bytes.toStringBinary(expected), + Bytes.toStringBinary(actual.array(), actual.arrayOffset() + actual.position(), + actual.capacity())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 1bfd18c..c8b609e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -437,7 +438,7 @@ public class TestHFileBlock { assertTrue("Packed heapSize should be < unpacked heapSize", packedHeapsize < blockUnpacked.heapSize()); } - ByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader(); + MultiByteBuffer actualBuffer = blockUnpacked.getBufferWithoutHeader(); if (encoding != DataBlockEncoding.NONE) { // We expect a two-byte big-endian encoding id. assertEquals( @@ -454,14 +455,14 @@ public class TestHFileBlock { expectedBuffer.rewind(); // test if content matches, produce nice message - assertBuffersEqual(expectedBuffer, actualBuffer, algo, encoding, pread); + assertBuffersEqual(new MultiByteBuffer(expectedBuffer), actualBuffer, algo, encoding, pread); // test serialized blocks for (boolean reuseBuffer : new boolean[] { false, true }) { ByteBuffer serialized = ByteBuffer.allocate(blockFromHFile.getSerializedLength()); blockFromHFile.serialize(serialized); - HFileBlock deserialized = - (HFileBlock) blockFromHFile.getDeserializer().deserialize(serialized, reuseBuffer); + HFileBlock deserialized = (HFileBlock) blockFromHFile.getDeserializer().deserialize( + new MultiByteBuffer(serialized), reuseBuffer); assertEquals( "Serialization did not preserve block state. reuseBuffer=" + reuseBuffer, blockFromHFile, deserialized); @@ -483,8 +484,8 @@ public class TestHFileBlock { return String.format("compression %s, encoding %s, pread %s", compression, encoding, pread); } - static void assertBuffersEqual(ByteBuffer expectedBuffer, - ByteBuffer actualBuffer, Compression.Algorithm compression, + static void assertBuffersEqual(MultiByteBuffer expectedBuffer, + MultiByteBuffer actualBuffer, Compression.Algorithm compression, DataBlockEncoding encoding, boolean pread) { if (!actualBuffer.equals(expectedBuffer)) { int prefix = 0; @@ -506,7 +507,7 @@ public class TestHFileBlock { * Convert a few next bytes in the given buffer at the given position to * string. Used for error messages. */ - private static String nextBytesToStr(ByteBuffer buf, int pos) { + private static String nextBytesToStr(MultiByteBuffer buf, int pos) { int maxBytes = buf.limit() - pos; int numBytes = Math.min(16, maxBytes); return Bytes.toStringBinary(buf.array(), buf.arrayOffset() + pos, @@ -595,7 +596,7 @@ public class TestHFileBlock { b = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only - ByteBuffer bufRead = b.getBufferWithHeader(); + MultiByteBuffer bufRead = b.getBufferWithHeader(); ByteBuffer bufExpected = expectedContents.get(i); boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), bufRead.arrayOffset(), @@ -617,7 +618,7 @@ public class TestHFileBlock { bufRead.arrayOffset(), Math.min(32 + 10, bufRead.limit())); if (detailedLogging) { LOG.warn("expected header" + - HFileBlock.toStringHeader(bufExpected) + + HFileBlock.toStringHeader(new MultiByteBuffer(bufExpected)) + "\nfound header" + HFileBlock.toStringHeader(bufRead)); LOG.warn("bufread offset " + bufRead.arrayOffset() + @@ -821,9 +822,9 @@ public class TestHFileBlock { protected void testBlockHeapSizeInternals() { if (ClassSize.is32BitJVM()) { - assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 64); + assertTrue(HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE == 64); } else { - assertTrue(HFileBlock.BYTE_BUFFER_HEAP_SIZE == 80); + assertTrue(HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE == 112); } for (int size : new int[] { 100, 256, 12345 }) { @@ -840,7 +841,7 @@ public class TestHFileBlock { HFileBlock.FILL_HEADER, -1, 0, meta); long byteBufferExpectedSize = - ClassSize.align(ClassSize.estimateBase(buf.getClass(), true) + ClassSize.align(ClassSize.estimateBase(new MultiByteBuffer(buf).getClass(), true) + HConstants.HFILEBLOCK_HEADER_SIZE + size); long hfileMetaSize = ClassSize.align(ClassSize.estimateBase(HFileContext.class, true)); long hfileBlockExpectedSize = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java index fc44f3c..d65a53e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -310,7 +311,7 @@ public class TestHFileBlockCompatibility { assertEquals((int) encodedSizes.get(blockId), b.getUncompressedSizeWithoutHeader()); - ByteBuffer actualBuffer = b.getBufferWithoutHeader(); + MultiByteBuffer actualBuffer = b.getBufferWithoutHeader(); if (encoding != DataBlockEncoding.NONE) { // We expect a two-byte big-endian encoding id. assertEquals(0, actualBuffer.get(0)); @@ -323,7 +324,7 @@ public class TestHFileBlockCompatibility { expectedBuffer.rewind(); // test if content matches, produce nice message - TestHFileBlock.assertBuffersEqual(expectedBuffer, actualBuffer, + TestHFileBlock.assertBuffersEqual(new MultiByteBuffer(expectedBuffer), actualBuffer, algo, encoding, pread); } is.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index a657c21..54a9f60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexChunk; import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -407,7 +408,7 @@ public class TestHFileBlockIndex { KeyValue.KeyOnlyKeyValue cell = new KeyValue.KeyOnlyKeyValue( arrayHoldingKey, searchKey.length / 2, searchKey.length); int searchResult = BlockIndexReader.binarySearchNonRootIndex(cell, - nonRootIndex, CellComparator.COMPARATOR); + new MultiByteBuffer(nonRootIndex), CellComparator.COMPARATOR); String lookupFailureMsg = "Failed to look up key #" + i + " (" + Bytes.toStringBinary(searchKey) + ")"; @@ -432,7 +433,7 @@ public class TestHFileBlockIndex { // Now test we can get the offset and the on-disk-size using a // higher-level API function.s boolean locateBlockResult = - (BlockIndexReader.locateNonRootIndexEntry(nonRootIndex, cell, + (BlockIndexReader.locateNonRootIndexEntry(new MultiByteBuffer(nonRootIndex), cell, CellComparator.COMPARATOR) != -1); if (i == 0) { @@ -605,7 +606,7 @@ public class TestHFileBlockIndex { while ((block = iter.nextBlock()) != null) { if (block.getBlockType() != BlockType.LEAF_INDEX) return; - ByteBuffer b = block.getBufferReadOnly(); + MultiByteBuffer b = block.getBufferReadOnly(); int n = b.getInt(); // One int for the number of items, and n + 1 for the secondary index. int entriesOffset = Bytes.SIZEOF_INT * (n + 2); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java index 883f60e..fb03174 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV2.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -194,7 +195,7 @@ public class TestHFileWriterV2 { assertFalse(block.isUnpacked()); block = block.unpack(meta, blockReader); } - ByteBuffer buf = block.getBufferWithoutHeader(); + MultiByteBuffer buf = block.getBufferWithoutHeader(); while (buf.hasRemaining()) { int keyLen = buf.getInt(); int valueLen = buf.getInt(); @@ -239,7 +240,7 @@ public class TestHFileWriterV2 { .unpack(meta, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); - ByteBuffer buf = block.getBufferWithoutHeader(); + MultiByteBuffer buf = block.getBufferWithoutHeader(); if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index e9ba089..ce592f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -221,7 +222,7 @@ public class TestHFileWriterV3 { HFileBlock block = blockReader.readBlockData(curBlockPos, -1, -1, false) .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); - ByteBuffer buf = block.getBufferWithoutHeader(); + MultiByteBuffer buf = block.getBufferWithoutHeader(); int keyLen = -1; while (buf.hasRemaining()) { @@ -281,7 +282,7 @@ public class TestHFileWriterV3 { .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); - ByteBuffer buf = block.getBufferWithoutHeader(); + MultiByteBuffer buf = block.getBufferWithoutHeader(); if (Writables.getWritable(buf.array(), buf.arrayOffset(), buf.limit(), t) == null) { throw new IOException("Failed to deserialize block " + this + " into a " + t.getClass().getSimpleName()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index 511f942..0a19957 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; @@ -73,4 +74,47 @@ public class TestByteBufferIOEngine { assert testOffsetAtStartNum == 0; assert testOffsetAtEndNum == 0; } + + @Test + public void testByteBufferIOEngineWithMBB() throws Exception { + int capacity = 32 * 1024 * 1024; // 32 MB + int testNum = 100; + int maxBlockSize = 64 * 1024; + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, false); + int testOffsetAtStartNum = testNum / 10; + int testOffsetAtEndNum = testNum / 10; + for (int i = 0; i < testNum; i++) { + byte val = (byte) (Math.random() * 255); + int blockSize = (int) (Math.random() * maxBlockSize); + if (blockSize == 0) { + blockSize = 1; + } + byte[] byteArray = new byte[blockSize]; + for (int j = 0; j < byteArray.length; ++j) { + byteArray[j] = val; + } + ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray); + int offset = 0; + if (testOffsetAtStartNum > 0) { + testOffsetAtStartNum--; + offset = 0; + } else if (testOffsetAtEndNum > 0) { + testOffsetAtEndNum--; + offset = capacity - blockSize; + } else { + offset = (int) (Math.random() * (capacity - maxBlockSize)); + } + ioEngine.write(srcBuffer, offset); + //ByteBuffer dstBuffer = ByteBuffer.allocate(blockSize); + //ioEngine.read(dstBuffer, offset); + //MultiByteBuffer read = new MultiByteBuffer(dstBuffer); + // TODO : this will get changed after HBASE-12295 goes in + MultiByteBuffer read = ioEngine.read(offset, blockSize); + for (int j = 0; j < byteArray.length; ++j) { + assertTrue(srcBuffer.get(j) == read.get(j)); + } + } + assert testOffsetAtStartNum == 0; + assert testOffsetAtEndNum == 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java index 4f371bd..68dba30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java @@ -362,6 +362,7 @@ public class TestSplitTransaction { while (hasNext) { hasNext = scanner.next(kvs); if (!kvs.isEmpty()) rowcount++; + System.out.println(rowcount); } } finally { scanner.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java index 4d8ad4b..33808ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBloomFilterChunk.java @@ -24,6 +24,8 @@ import java.io.DataOutputStream; import java.nio.ByteBuffer; import junit.framework.TestCase; + +import org.apache.hadoop.hbase.nio.MultiByteBuffer; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.experimental.categories.Category; @@ -44,14 +46,14 @@ public class TestBloomFilterChunk extends TestCase { bf1.add(key1); bf2.add(key2); - assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, bf1.bloom, 0, (int) bf1.byteSize, - bf1.hash, bf1.hashCount)); - assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, bf1.bloom, 0, (int) bf1.byteSize, - bf1.hash, bf1.hashCount)); - assertFalse(BloomFilterUtil.contains(key1, 0, key1.length, bf2.bloom, 0, (int) bf2.byteSize, - bf2.hash, bf2.hashCount)); - assertTrue(BloomFilterUtil.contains(key2, 0, key2.length, bf2.bloom, 0, (int) bf2.byteSize, - bf2.hash, bf2.hashCount)); + assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, new MultiByteBuffer(bf1.bloom), 0, + (int) bf1.byteSize, bf1.hash, bf1.hashCount)); + assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, new MultiByteBuffer(bf1.bloom), 0, + (int) bf1.byteSize, bf1.hash, bf1.hashCount)); + assertFalse(BloomFilterUtil.contains(key1, 0, key1.length, new MultiByteBuffer(bf2.bloom), 0, + (int) bf2.byteSize, bf2.hash, bf2.hashCount)); + assertTrue(BloomFilterUtil.contains(key2, 0, key2.length, new MultiByteBuffer(bf2.bloom), 0, + (int) bf2.byteSize, bf2.hash, bf2.hashCount)); byte [] bkey = {1,2,3,4}; byte [] bval = "this is a much larger byte array".getBytes(); @@ -59,12 +61,12 @@ public class TestBloomFilterChunk extends TestCase { bf1.add(bkey); bf1.add(bval, 1, bval.length-1); - assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, bf1.bloom, 0, (int) bf1.byteSize, - bf1.hash, bf1.hashCount)); - assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, bf1.bloom, 0, (int) bf1.byteSize, - bf1.hash, bf1.hashCount)); - assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, bf1.bloom, 0, (int) bf1.byteSize, - bf1.hash, bf1.hashCount)); + assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, new MultiByteBuffer(bf1.bloom), 0, + (int) bf1.byteSize, bf1.hash, bf1.hashCount)); + assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, new MultiByteBuffer(bf1.bloom), + 0, (int) bf1.byteSize, bf1.hash, bf1.hashCount)); + assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, new MultiByteBuffer(bf1.bloom), 0, + (int) bf1.byteSize, bf1.hash, bf1.hashCount)); // test 2: serialization & deserialization. // (convert bloom to byte array & read byte array back in as input) @@ -73,18 +75,18 @@ public class TestBloomFilterChunk extends TestCase { ByteBuffer bb = ByteBuffer.wrap(bOut.toByteArray()); BloomFilterChunk newBf1 = new BloomFilterChunk(1000, (float)0.01, Hash.MURMUR_HASH, 0); - assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, bb, 0, (int) newBf1.byteSize, - newBf1.hash, newBf1.hashCount)); - assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, bb, 0, (int) newBf1.byteSize, - newBf1.hash, newBf1.hashCount)); - assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, bb, 0, (int) newBf1.byteSize, - newBf1.hash, newBf1.hashCount)); - assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, bb, 0, (int) newBf1.byteSize, - newBf1.hash, newBf1.hashCount)); - assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, bb, 0, (int) newBf1.byteSize, - newBf1.hash, newBf1.hashCount)); - assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, bb, 0, (int) newBf1.byteSize, - newBf1.hash, newBf1.hashCount)); + assertTrue(BloomFilterUtil.contains(key1, 0, key1.length, new MultiByteBuffer(bb), 0, + (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount)); + assertFalse(BloomFilterUtil.contains(key2, 0, key2.length, new MultiByteBuffer(bb), 0, + (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount)); + assertTrue(BloomFilterUtil.contains(bkey, 0, bkey.length, new MultiByteBuffer(bb), 0, + (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount)); + assertTrue(BloomFilterUtil.contains(bval, 1, bval.length - 1, new MultiByteBuffer(bb), 0, + (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount)); + assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, new MultiByteBuffer(bb), 0, + (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount)); + assertFalse(BloomFilterUtil.contains(bval, 0, bval.length, new MultiByteBuffer(bb), 0, + (int) newBf1.byteSize, newBf1.hash, newBf1.hashCount)); System.out.println("Serialized as " + bOut.size() + " bytes"); assertTrue(bOut.size() - bf1.byteSize < 10); //... allow small padding @@ -105,9 +107,10 @@ public class TestBloomFilterChunk extends TestCase { int falsePositives = 0; for (int i = 0; i < 25; ++i) { byte[] bytes = Bytes.toBytes(i); - if (BloomFilterUtil.contains(bytes, 0, bytes.length, b.bloom, 0, (int) b.byteSize, b.hash, - b.hashCount)) { - if(i >= 12) falsePositives++; + if (BloomFilterUtil.contains(bytes, 0, bytes.length, new MultiByteBuffer(b.bloom), 0, + (int) b.byteSize, b.hash, b.hashCount)) { + if (i >= 12) + falsePositives++; } else { assertFalse(i < 12); } @@ -143,9 +146,10 @@ public class TestBloomFilterChunk extends TestCase { for (int i = 0; i < 2*1000*1000; ++i) { byte[] bytes = Bytes.toBytes(i); - if (BloomFilterUtil.contains(bytes, 0, bytes.length, b.bloom, 0, (int) b.byteSize, b.hash, - b.hashCount)) { - if(i >= 1*1000*1000) falsePositives++; + if (BloomFilterUtil.contains(bytes, 0, bytes.length, new MultiByteBuffer(b.bloom), 0, + (int) b.byteSize, b.hash, b.hashCount)) { + if (i >= 1 * 1000 * 1000) + falsePositives++; } else { assertFalse(i < 1*1000*1000); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java index 974b0d2..d6c4db9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java @@ -367,7 +367,7 @@ public class TestByteBufferUtils { buffer.putInt(i); buffer.putLong(l); byte[] b = new byte[15]; - ByteBufferUtils.copyFromBufferToArray(b, buffer, 1, 1, 14); + ByteBufferUtils.copyFromBufferToByteArray(b, buffer, 1, 1, 14); assertEquals(s, Bytes.toShort(b, 1)); assertEquals(i, Bytes.toInt(b, 3)); assertEquals(l, Bytes.toLong(b, 7));