diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java index fb2cafa..cb9dc89 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; @@ -35,10 +37,12 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -236,17 +240,16 @@ class CellBlockBuilder { * @throws IOException if encoding fails */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte[] cellBlock) throws IOException { + byte[] cellBlock) throws IOException { // Use this method from Client side to create the CellScanner - ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock); if (compressor != null) { - cellBlockBuf = decompress(compressor, cellBlockBuf); + cellBlock = decompress(compressor, cellBlock); } // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will // make Cells directly over the passed BB. This method is called at client side and we don't // want the Cells to share the same byte[] where the RPC response is being read. Caching of any // of the Cells at user's app level will make it not possible to GC the response byte[] - return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); + return codec.getDecoder(new ByteArrayInputStream(cellBlock)); } /** @@ -258,7 +261,7 @@ class CellBlockBuilder { * @throws IOException if cell encoding fails */ public CellScanner createCellScannerReusingBuffers(final Codec codec, - final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { + final CompressionCodec compressor, ByteBuff cellBlock) throws IOException { // Use this method from HRS to create the CellScanner // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -268,27 +271,39 @@ class CellBlockBuilder { return codec.getDecoder(cellBlock); } - private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) + private byte[] decompress(CompressionCodec compressor, byte[] compressedCellBlock) + throws IOException { + ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock), + compressedCellBlock.length * this.cellBlockDecompressionMultiplier); + assert cellBlock.hasArray(); + return cellBlock.array(); + } + + private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock) throws IOException { + ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock), + compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier); + return new SingleByteBuff(cellBlock); + } + + private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream, + int osInitialSize) throws IOException { // GZIPCodec fails w/ NPE if no configuration. if (compressor instanceof Configurable) { ((Configurable) compressor).setConf(this.conf); } Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); - CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), - poolDecompressor); + CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor); ByteBufferOutputStream bbos; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream( - cellBlock.remaining() * this.cellBlockDecompressionMultiplier); + bbos = new ByteBufferOutputStream(osInitialSize); IOUtils.copy(cis, bbos); bbos.close(); - cellBlock = bbos.getByteBuffer(); + return bbos.getByteBuffer(); } finally { CodecPool.returnDecompressor(poolDecompressor); } - return cellBlock; } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java index ccabe66..9addaa5 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.io.SizedCellScanner; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -78,7 +79,8 @@ public class TestCellBlockBuilder { CellScanner cellScanner = sized ? getSizedCellScanner(cells) : CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner); - cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, bb); + cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, + new SingleByteBuff(bb)); int i = 0; while (cellScanner.advance()) { i++; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index 2165362..06a0ed6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -36,10 +36,10 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { protected final ByteBuffer buf; protected final int offset; protected final int length; + protected final boolean hasTags; private final short rowLen; private final int keyLen; private long seqId = 0; - private final boolean hasTags; // TODO : See if famLen can be cached or not? private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE @@ -57,6 +57,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { this.seqId = seqId; } + public OffheapKeyValue(ByteBuffer buf, int offset, int length) { + assert buf.isDirect(); + this.buf = buf; + this.offset = offset; + this.length = length; + rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET); + keyLen = ByteBufferUtils.toInt(this.buf, this.offset); + int tagsLen = this.length + - (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + this.hasTags = tagsLen > 0; + } + @Override public byte[] getRowArray() { return CellUtil.cloneRow(this); @@ -265,16 +277,19 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { @Override public void setTimestamp(long ts) throws IOException { - // This Cell implementation is not yet used in write path. - // TODO when doing HBASE-15179 - throw new UnsupportedOperationException(); + ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), Bytes.toBytes(ts), 0, + Bytes.SIZEOF_LONG); + } + + private int getTimestampOffset() { + return this.offset + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + this.keyLen + - KeyValue.TIMESTAMP_TYPE_SIZE; } @Override public void setTimestamp(byte[] ts, int tsOffset) throws IOException { - // This Cell implementation is not yet used in write path. - // TODO when doing HBASE-15179 - throw new UnsupportedOperationException(); + ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), ts, tsOffset, + Bytes.SIZEOF_LONG); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index d6b64f6..ca2e3e8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; /** @@ -118,8 +118,8 @@ public class CellCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java index 7326884..2dca10a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; /** @@ -119,8 +119,8 @@ public class CellCodecWithTags implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java index c8a4cdc..d1463ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.codec; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.CellOutputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; /** * Encoder/Decoder for Cell. @@ -51,6 +51,6 @@ public interface Codec { interface Decoder extends CellScanner {}; Decoder getDecoder(InputStream is); - Decoder getDecoder(ByteBuffer buf); + Decoder getDecoder(ByteBuff buf); Encoder getEncoder(OutputStream os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index 2609398..7cb3d16 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NoTagsKeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -76,12 +78,12 @@ public class KeyValueCodec implements Codec { } } - public static class ByteBufferedKeyValueDecoder implements Codec.Decoder { + public static class ByteBuffedKeyValueDecoder implements Codec.Decoder { - protected final ByteBuffer buf; + protected final ByteBuff buf; protected Cell current = null; - public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + public ByteBuffedKeyValueDecoder(ByteBuff buf) { this.buf = buf; } @@ -90,10 +92,14 @@ public class KeyValueCodec implements Codec { if (this.buf.remaining() <= 0) { return false; } - int len = ByteBufferUtils.toInt(buf); - assert buf.hasArray(); - this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len); - buf.position(buf.position() + len); + int len = buf.getInt(); + ByteBuffer bb = buf.asSubByteBuffer(len); + if (bb.isDirect()) { + this.current = createCell(bb, bb.position(), len); + } else { + this.current = createCell(bb.array(), bb.arrayOffset() + bb.position(), len); + } + buf.skip(len); return true; } @@ -106,6 +112,11 @@ public class KeyValueCodec implements Codec { return new ShareableMemoryNoTagsKeyValue(buf, offset, len); } + protected Cell createCell(ByteBuffer bb, int pos, int len) { + // We know there is not going to be any tags. + return new ShareableMemoryOffheapKeyValue(bb, pos, len, false, 0); + } + static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory { public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) { super(bytes, offset, length); @@ -133,6 +144,31 @@ public class KeyValueCodec implements Codec { return kv; } } + + static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory { + public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags, + long seqId) { + super(buf, offset, length, hasTags, seqId); + } + + @Override + public Cell cloneToCell() { + byte[] copy = new byte[this.length]; + ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length); + KeyValue kv; + if (this.hasTags) { + kv = new KeyValue(copy, 0, copy.length); + } else { + kv = new NoTagsKeyValue(copy, 0, copy.length); + } + kv.setSequenceId(this.getSequenceId()); + return kv; + } + } } /** @@ -144,8 +180,8 @@ public class KeyValueCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return new ByteBufferedKeyValueDecoder(buf); + public Decoder getDecoder(ByteBuff buf) { + return new ByteBuffedKeyValueDecoder(buf); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index 63c02e8..334adc0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; /** @@ -78,16 +79,21 @@ public class KeyValueCodecWithTags implements Codec { } } - public static class ByteBufferedKeyValueDecoder - extends KeyValueCodec.ByteBufferedKeyValueDecoder { + public static class ByteBuffedKeyValueDecoder + extends KeyValueCodec.ByteBuffedKeyValueDecoder { - public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + public ByteBuffedKeyValueDecoder(ByteBuff buf) { super(buf); } protected Cell createCell(byte[] buf, int offset, int len) { return new ShareableMemoryKeyValue(buf, offset, len); } + + @Override + protected Cell createCell(ByteBuffer bb, int pos, int len) { + return new ShareableMemoryOffheapKeyValue(bb, pos, len); + } } /** @@ -104,7 +110,7 @@ public class KeyValueCodecWithTags implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return new ByteBufferedKeyValueDecoder(buf); + public Decoder getDecoder(ByteBuff buf) { + return new ByteBuffedKeyValueDecoder(buf); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java index e528f02..971c42c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java @@ -140,7 +140,7 @@ public class ByteBufferPool { buffers.offer(buf); } - int getBufferSize() { + public int getBufferSize() { return this.bufferSize; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java new file mode 100644 index 0000000..9ffc7c3 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * When deal with OutputStream which is not ByteBufferSupportOutputStream type, wrap it with this + * class. We will have to write offheap ByteBuffer (DBB) data into the OS. This class is having a + * temp byte array to which we can copy the DBB data for writing to the OS. + *
+ * This is used while writing Cell data to WAL. In case of AsyncWAL, the OS created there is + * ByteBufferSupportOutputStream. But in case of FSHLog, the OS passed by DFS client, is not of type + * ByteBufferSupportOutputStream. We will need this temp solution until DFS client supports writing + * ByteBuffer directly to the OS it creates. + *
+ * Note that this class is not thread safe. + */ +@InterfaceAudience.Private +public class ByteBufferSupportOutputStreamWrapper extends OutputStream + implements ByteBufferSupportOutputStream { + + private static final int TEMP_BUF_LENGTH = 4 * 1024; + private final OutputStream os; + private final byte[] tempBuf = new byte[TEMP_BUF_LENGTH]; + + public ByteBufferSupportOutputStreamWrapper(OutputStream os) { + this.os = os; + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + byte[] buf = this.tempBuf; + if (len > TEMP_BUF_LENGTH) { + buf = new byte[len]; + } + ByteBufferUtils.copyFromBufferToArray(buf, b, off, 0, len); + this.os.write(buf, 0, len); + } + + @Override + public void writeInt(int i) throws IOException { + Bytes.putInt(this.tempBuf, 0, i); + this.os.write(this.tempBuf, 0, Bytes.SIZEOF_INT); + } + + @Override + public void write(int b) throws IOException { + this.os.write(b); + } + + public void write(byte b[], int off, int len) throws IOException { + this.os.write(b, off, len); + } + + @Override + public void flush() throws IOException { + this.os.flush(); + } + + @Override + public void close() throws IOException { + this.os.close(); + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 183a031..f6a34f8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.nio; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -35,6 +37,8 @@ import org.apache.hadoop.io.WritableUtils; */ @InterfaceAudience.Private public abstract class ByteBuff { + private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. + /** * @return this ByteBuff's current position */ @@ -356,6 +360,14 @@ public abstract class ByteBuff { public abstract long getLongAfterPosition(int offset); /** + * Copy the content from this ByteBuff to a byte[]. + * @return byte[] with the copied contents from this ByteBuff. + */ + public byte[] toBytes() { + return toBytes(0, this.limit()); + } + + /** * Copy the content from this ByteBuff to a byte[] based on the given offset and * length * @@ -389,7 +401,39 @@ public abstract class ByteBuff { */ public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); + /** + * Reads bytes from the given channel into this ByteBuff + * @param channel + * @return The number of bytes read from the channel + * @throws IOException + */ + public abstract int read(ReadableByteChannel channel) throws IOException; + // static helper methods + public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { + if (buf.remaining() <= NIO_BUFFER_LIMIT) { + return channel.read(buf); + } + int originalLimit = buf.limit(); + int initialRemaining = buf.remaining(); + int ret = 0; + + while (buf.remaining() > 0) { + try { + int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); + buf.limit(buf.position() + ioSize); + ret = channel.read(buf); + if (ret < ioSize) { + break; + } + } finally { + buf.limit(originalLimit); + } + } + int nBytes = initialRemaining - buf.remaining(); + return (nBytes > 0) ? nBytes : ret; + } + /** * Read integer from ByteBuff coded in 7 bits and increment position. * @return Read integer. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 107bb3f..399644d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hbase.nio; +import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.InvalidMarkException; +import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -1071,6 +1073,28 @@ public class MultiByteBuff extends ByteBuff { } @Override + public int read(ReadableByteChannel channel) throws IOException { + int total = 0; + while (true) { + // Read max possible into the current BB + int len = channelRead(channel, this.curItem); + if (len > 0) + total += len; + if (this.curItem.hasRemaining()) { + // We were not able to read enough to fill the current BB itself. Means there is no point in + // doing more reads from Channel. Only this much there for now. + break; + } else { + if (this.curItemIndex >= this.limitedItemIndex) + break; + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + } + } + return total; + } + + @Override public boolean equals(Object obj) { if (!(obj instanceof MultiByteBuff)) return false; if (this == obj) return true; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledByteBuff.java new file mode 100644 index 0000000..3e5bb87 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledByteBuff.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This interface marks {@link ByteBuff}s which works with {@link ByteBuffer}s from a Pool. + */ +@InterfaceAudience.Private +public interface PooledByteBuff { + + /** + * Release individual {@link ByteBuffer}s to pool if they are obtained from it. + */ + void release(); +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledMultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledMultiByteBuff.java new file mode 100644 index 0000000..62df78d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledMultiByteBuff.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferPool; + +@InterfaceAudience.Private +public class PooledMultiByteBuff extends MultiByteBuff implements PooledByteBuff { + + private final ByteBuffer[] bbsFromPool; + private final ByteBufferPool pool; + + public PooledMultiByteBuff(ByteBuffer[] bbs, ByteBuffer[] bbsFromPool, ByteBufferPool pool) { + super(bbs); + this.bbsFromPool = bbsFromPool; + this.pool = pool; + } + + @Override + public void release() { + // Return back all the BBs to pool + if (this.bbsFromPool != null) { + for (int i = 0; i < this.bbsFromPool.length; i++) { + this.pool.putbackBuffer(this.bbsFromPool[i]); + } + } + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledSingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledSingleByteBuff.java new file mode 100644 index 0000000..8626c2c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/PooledSingleByteBuff.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferPool; + +@InterfaceAudience.Private +public class PooledSingleByteBuff extends SingleByteBuff implements PooledByteBuff { + + private final ByteBuffer bbFromPool; + private final ByteBufferPool pool; + + public PooledSingleByteBuff(ByteBuffer bbFromPool, ByteBufferPool pool) { + super(bbFromPool); + this.bbFromPool = bbFromPool; + this.pool = pool; + } + + public void release() { + // Return back all the BB to pool + this.pool.putbackBuffer(this.bbFromPool); + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 9ad28dc..89e5010 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.nio; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -313,6 +315,11 @@ public class SingleByteBuff extends ByteBuff { } @Override + public int read(ReadableByteChannel channel) throws IOException { + return channelRead(channel, buf); + } + + @Override public boolean equals(Object obj) { if(!(obj instanceof SingleByteBuff)) return false; return this.buf.equals(((SingleByteBuff)obj).buf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java index ea162fc..41dc387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -83,8 +83,8 @@ public class MessageCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 34140a9..114e585 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public class CallRunner { - private static final Log LOG = LogFactory.getLog(CallRunner.class); private static final CallDroppedException CALL_DROPPED_EXCEPTION = new CallDroppedException(); @@ -143,6 +140,8 @@ public class CallRunner { sucessful = true; } } + // return back the RPC request read BB we can do here. It is done by now. + call.releaseRequestBuffer(); // Set the response Message param = resultPair != null ? resultPair.getFirst() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 7bcf3a7..81e9fcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; @@ -99,6 +100,11 @@ import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.PooledByteBuff; +import org.apache.hadoop.hbase.nio.PooledMultiByteBuff; +import org.apache.hadoop.hbase.nio.PooledSingleByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; @@ -144,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; @@ -302,6 +309,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private UserProvider userProvider; private final ByteBufferPool reservoir; + // The requests and response will use buffers from ByteBufferPool, when the size of the + // request/response is at least this size. + // We make this to be 1/6th of the pool buffer size. + private final int minSizeForReservoirUse; private volatile boolean allowFallbackToSimpleAuth; @@ -336,6 +347,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected boolean isError; protected TraceInfo tinfo; private ByteBufferListOutputStream cellBlockStream = null; + private PooledByteBuff reqBuff = null; private User user; private InetAddress remoteAddress; @@ -349,7 +361,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { justification="Can't figure why this complaint is happening... see below") Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, - long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { + long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout, + PooledByteBuff reqBuff) { this.id = id; this.service = service; this.md = md; @@ -369,6 +382,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; + this.reqBuff = reqBuff; } /** @@ -383,9 +397,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // got from pool. this.cellBlockStream = null; } + releaseRequestBuffer();// If the call was run successfuly, we might have already returned the + // BB back to pool. No worries..Then inputCellBlock will be null this.connection.decRpcCount(); // Say that we're done with this call. } + protected void releaseRequestBuffer() { + if (this.reqBuff != null) { + this.reqBuff.release(); + this.reqBuff = null; + } + } + @Override public String toString() { return toShortString() + " param: " + @@ -1281,7 +1304,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // If the connection header has been read or not. private boolean connectionHeaderRead = false; protected SocketChannel channel; - private ByteBuffer data; + private ByteBuff data; private ByteBuffer dataLengthBuffer; protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque(); private final Lock responseWriteLock = new ReentrantLock(); @@ -1319,17 +1342,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, - null, null, this, null, 0, null, null, 0); + null, null, this, null, 0, null, null, 0, null); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, - 0, null, null, 0); + 0, null, null, 0, null); // Fake 'call' for connection header response private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID, - null, null, null, null, null, this, null, 0, null, null, 0); + null, null, null, null, null, this, null, 0, null, null, 0, null); // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; @@ -1429,7 +1452,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return authorizedUgi; } - private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, + private void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) @@ -1439,13 +1462,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (!useWrap) { processOneRpc(saslToken); } else { - byte[] b = saslToken.array(); + byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); byte [] plaintextData; if (useCryptoAesWrap) { // unwrap with CryptoAES - plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit()); + plaintextData = cryptoAES.unwrap(b, 0, b.length); } else { - plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); + plaintextData = saslServer.unwrap(b, 0, b.length); } processUnwrappedData(plaintextData); } @@ -1498,7 +1521,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken.array()); + replyToken = saslServer + .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1751,7 +1775,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Notify the client about the offending request Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null, - null, this, responder, 0, null, this.addr,0); + null, this, responder, 0, null, this.addr,0, null); metrics.exception(REQUEST_TOO_BIG_EXCEPTION); // Make sure the client recognizes the underlying exception // Otherwise, throw a DoNotRetryIOException. @@ -1771,7 +1795,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return -1; } - data = ByteBuffer.allocate(dataLength); + data = allocateByteBuffToReadInto(dataLength); // Increment the rpc count. This counter will be decreased when we write // the response. If we want the connection to be detected as idle properly, we @@ -1779,7 +1803,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { incRpcCount(); } - count = channelRead(channel, data); + count = channelDataRead(channel, data); if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 process(); @@ -1788,11 +1812,71 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return count; } + private ByteBuff allocateByteBuffToReadInto(int length) { + // We create random on heap buffers are read into those when + // 1. ByteBufferPool is not there. + // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is + // waste then. Also if all the reqs are of this size, we will be creating larger sized + // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like + // RegionOpen. + // 3. If it is an initial handshake signal or initial connection request. Any way then + // condition 2 itself will match + // 4. When SASL use is ON. + ByteBuff resultBuf; + if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl + || length < minSizeForReservoirUse) { + resultBuf = new SingleByteBuff(ByteBuffer.allocate(length)); + } else { + List bufsFromPool = new ArrayList( + (length / reservoir.getBufferSize()) + 1); + int remain = length; + ByteBuffer buf = null; + while (remain >= minSizeForReservoirUse && (buf = reservoir.getBuffer()) != null) { + bufsFromPool.add(buf); + remain -= reservoir.getBufferSize(); + } + buf = null; + ByteBuffer[] itemsFromPool = new ByteBuffer[bufsFromPool.size()]; + bufsFromPool.toArray(itemsFromPool); + if (remain > 0) { + buf = ByteBuffer.allocate(remain); + } + ByteBuffer[] items = new ByteBuffer[buf == null ? itemsFromPool.length + : itemsFromPool.length + 1]; + System.arraycopy(itemsFromPool, 0, items, 0, itemsFromPool.length); + if (buf != null) { + items[items.length - 1] = buf; + } + assert items.length > 0; + if (items.length > 1) { + resultBuf = new PooledMultiByteBuff(items, itemsFromPool, reservoir); + } else { + // We are backed by single BB + assert itemsFromPool.length <= 1; + if (itemsFromPool.length == 0) { + resultBuf = new SingleByteBuff(items[0]); + } else { + resultBuf = new PooledSingleByteBuff(itemsFromPool[0], reservoir); + } + } + } + resultBuf.limit(length); + return resultBuf; + } + + protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { + int count = buf.read(channel); + if (count > 0) { + metrics.receivedBytes(count); + } + return count; + } + /** * Process the data buffer and clean the connection state for the next call. */ private void process() throws IOException, InterruptedException { - data.flip(); + data.rewind(); try { if (skipInitialSaslHandshake) { skipInitialSaslHandshake = false; @@ -1823,7 +1907,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0, + null); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1831,9 +1916,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Reads the connection header following version - private void processConnectionHeader(ByteBuffer buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom( - new ByteBufferInputStream(buf)); + private void processConnectionHeader(ByteBuff buf) throws IOException { + if (buf.hasArray()) { + this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); + } else { + CodedInputStream cis = CodedInputStream + .newInstance(new ByteBuffedByteInput(buf, 0, buf.limit()), true); + cis.enableAliasing(true); + this.connectionHeader = ConnectionHeader.parseFrom(cis); + } String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -2035,13 +2126,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData); + processOneRpc(new SingleByteBuff(unwrappedData)); unwrappedData = null; } } } - private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -2063,12 +2154,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); + CodedInputStream cis; + if (buf.hasArray()) { + cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); + } else { + cis = CodedInputStream.newInstance(new ByteBuffedByteInput(buf, 0, buf.limit()), true); + cis.enableAliasing(true); + } int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); @@ -2085,7 +2182,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null, 0); + responder, totalRequestSize, null, null, 0, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, @@ -2119,7 +2216,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (header.hasCellBlockMeta()) { buf.position(offset); - cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf); + cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, + this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); @@ -2140,7 +2238,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { final Call readParamsFailedCall = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null, 0); + responder, totalRequestSize, null, null, 0, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -2156,7 +2254,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { timeout = Math.max(minClientRequestTimeout, header.getTimeout()); } Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, - totalRequestSize, traceInfo, this.addr, timeout); + totalRequestSize, traceInfo, this.addr, timeout, + buf instanceof PooledByteBuff ? (PooledByteBuff) buf : null); if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { callQueueSizeInBytes.add(-1 * call.getSize()); @@ -2293,8 +2392,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2)); + this.minSizeForReservoirUse = this.reservoir.getBufferSize() / 6; } else { reservoir = null; + this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place. } this.server = server; this.services = services; @@ -3031,4 +3132,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { idleScanTimer.schedule(idleScanTask, idleScanInterval); } } -} + + private static class ByteBuffedByteInput extends ByteInput { + + private ByteBuff buf; + private int offset; + private int length; + + ByteBuffedByteInput(ByteBuff buf, int offset, int length) { + this.buf = buf; + this.offset = offset; + this.length = length; + } + + @Override + public byte read(int offset) { + return this.buf.get(getAbsoluteOffset(offset)); + } + + private int getAbsoluteOffset(int offset) { + return this.offset + offset; + } + + @Override + public int read(int offset, byte[] out, int outOffset, int len) { + this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); + return len; + } + + @Override + public int read(int offset, ByteBuffer out) { + int len = out.remaining(); + this.buf.get(out, getAbsoluteOffset(offset), len); + return len; + } + + @Override + public int size() { + return this.length; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 52dfae0..9e80bbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder; import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStreamWrapper; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -356,14 +358,18 @@ public class WALCellCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override public Encoder getEncoder(OutputStream os) { - return (compression == null) - ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); + if (compression == null) { + os = (os instanceof ByteBufferSupportOutputStream) ? os + : new ByteBufferSupportOutputStreamWrapper(os); + return new EnsureKvEncoder(os); + } + return new CompressedKvEncoder(os, compression); } public ByteStringCompressor getByteStringCompressor() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 2211e8f..5a9178a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -315,7 +315,7 @@ public abstract class AbstractTestIPC { } @Override - protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { // this will throw exception after the connection header is read, and an RPC is sent // from client throw new DoNotRetryIOException("Failing for test");