diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java index fb2cafa..cb9dc89 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -21,7 +21,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; @@ -35,10 +37,12 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -236,17 +240,16 @@ class CellBlockBuilder { * @throws IOException if encoding fails */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte[] cellBlock) throws IOException { + byte[] cellBlock) throws IOException { // Use this method from Client side to create the CellScanner - ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock); if (compressor != null) { - cellBlockBuf = decompress(compressor, cellBlockBuf); + cellBlock = decompress(compressor, cellBlock); } // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will // make Cells directly over the passed BB. This method is called at client side and we don't // want the Cells to share the same byte[] where the RPC response is being read. Caching of any // of the Cells at user's app level will make it not possible to GC the response byte[] - return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf)); + return codec.getDecoder(new ByteArrayInputStream(cellBlock)); } /** @@ -258,7 +261,7 @@ class CellBlockBuilder { * @throws IOException if cell encoding fails */ public CellScanner createCellScannerReusingBuffers(final Codec codec, - final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException { + final CompressionCodec compressor, ByteBuff cellBlock) throws IOException { // Use this method from HRS to create the CellScanner // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -268,27 +271,39 @@ class CellBlockBuilder { return codec.getDecoder(cellBlock); } - private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock) + private byte[] decompress(CompressionCodec compressor, byte[] compressedCellBlock) + throws IOException { + ByteBuffer cellBlock = decompress(compressor, new ByteArrayInputStream(compressedCellBlock), + compressedCellBlock.length * this.cellBlockDecompressionMultiplier); + assert cellBlock.hasArray(); + return cellBlock.array(); + } + + private ByteBuff decompress(CompressionCodec compressor, ByteBuff compressedCellBlock) throws IOException { + ByteBuffer cellBlock = decompress(compressor, new ByteBuffInputStream(compressedCellBlock), + compressedCellBlock.remaining() * this.cellBlockDecompressionMultiplier); + return new SingleByteBuff(cellBlock); + } + + private ByteBuffer decompress(CompressionCodec compressor, InputStream cellBlockStream, + int osInitialSize) throws IOException { // GZIPCodec fails w/ NPE if no configuration. if (compressor instanceof Configurable) { ((Configurable) compressor).setConf(this.conf); } Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); - CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock), - poolDecompressor); + CompressionInputStream cis = compressor.createInputStream(cellBlockStream, poolDecompressor); ByteBufferOutputStream bbos; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream( - cellBlock.remaining() * this.cellBlockDecompressionMultiplier); + bbos = new ByteBufferOutputStream(osInitialSize); IOUtils.copy(cis, bbos); bbos.close(); - cellBlock = bbos.getByteBuffer(); + return bbos.getByteBuffer(); } finally { CodecPool.returnDecompressor(poolDecompressor); } - return cellBlock; } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java index ccabe66..9addaa5 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.io.SizedCellScanner; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -78,7 +79,8 @@ public class TestCellBlockBuilder { CellScanner cellScanner = sized ? getSizedCellScanner(cells) : CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = builder.buildCellBlock(codec, compressor, cellScanner); - cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, bb); + cellScanner = builder.createCellScannerReusingBuffers(codec, compressor, + new SingleByteBuff(bb)); int i = 0; while (cellScanner.advance()) { i++; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java index 2165362..06a0ed6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java @@ -36,10 +36,10 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { protected final ByteBuffer buf; protected final int offset; protected final int length; + protected final boolean hasTags; private final short rowLen; private final int keyLen; private long seqId = 0; - private final boolean hasTags; // TODO : See if famLen can be cached or not? private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE @@ -57,6 +57,18 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { this.seqId = seqId; } + public OffheapKeyValue(ByteBuffer buf, int offset, int length) { + assert buf.isDirect(); + this.buf = buf; + this.offset = offset; + this.length = length; + rowLen = ByteBufferUtils.toShort(this.buf, this.offset + KeyValue.ROW_OFFSET); + keyLen = ByteBufferUtils.toInt(this.buf, this.offset); + int tagsLen = this.length + - (this.keyLen + getValueLength() + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + this.hasTags = tagsLen > 0; + } + @Override public byte[] getRowArray() { return CellUtil.cloneRow(this); @@ -265,16 +277,19 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell { @Override public void setTimestamp(long ts) throws IOException { - // This Cell implementation is not yet used in write path. - // TODO when doing HBASE-15179 - throw new UnsupportedOperationException(); + ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), Bytes.toBytes(ts), 0, + Bytes.SIZEOF_LONG); + } + + private int getTimestampOffset() { + return this.offset + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + this.keyLen + - KeyValue.TIMESTAMP_TYPE_SIZE; } @Override public void setTimestamp(byte[] ts, int tsOffset) throws IOException { - // This Cell implementation is not yet used in write path. - // TODO when doing HBASE-15179 - throw new UnsupportedOperationException(); + ByteBufferUtils.copyFromArrayToBuffer(this.buf, this.getTimestampOffset(), ts, tsOffset, + Bytes.SIZEOF_LONG); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index d6b64f6..ca2e3e8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; /** @@ -118,8 +118,8 @@ public class CellCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java index 7326884..2dca10a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java @@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; /** @@ -119,8 +119,8 @@ public class CellCodecWithTags implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java index c8a4cdc..d1463ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/Codec.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.codec; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.CellOutputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; /** * Encoder/Decoder for Cell. @@ -51,6 +51,6 @@ public interface Codec { interface Decoder extends CellScanner {}; Decoder getDecoder(InputStream is); - Decoder getDecoder(ByteBuffer buf); + Decoder getDecoder(ByteBuff buf); Encoder getEncoder(OutputStream os); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java index 2609398..5d0f559 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java @@ -27,8 +27,10 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.NoTagsKeyValue; +import org.apache.hadoop.hbase.OffheapKeyValue; import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -76,12 +78,12 @@ public class KeyValueCodec implements Codec { } } - public static class ByteBufferedKeyValueDecoder implements Codec.Decoder { + public static class ByteBuffKeyValueDecoder implements Codec.Decoder { - protected final ByteBuffer buf; + protected final ByteBuff buf; protected Cell current = null; - public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + public ByteBuffKeyValueDecoder(ByteBuff buf) { this.buf = buf; } @@ -90,10 +92,14 @@ public class KeyValueCodec implements Codec { if (this.buf.remaining() <= 0) { return false; } - int len = ByteBufferUtils.toInt(buf); - assert buf.hasArray(); - this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len); - buf.position(buf.position() + len); + int len = buf.getInt(); + ByteBuffer bb = buf.asSubByteBuffer(len); + if (bb.isDirect()) { + this.current = createCell(bb, bb.position(), len); + } else { + this.current = createCell(bb.array(), bb.arrayOffset() + bb.position(), len); + } + buf.skip(len); return true; } @@ -106,6 +112,11 @@ public class KeyValueCodec implements Codec { return new ShareableMemoryNoTagsKeyValue(buf, offset, len); } + protected Cell createCell(ByteBuffer bb, int pos, int len) { + // We know there is not going to be any tags. + return new ShareableMemoryOffheapKeyValue(bb, pos, len, false, 0); + } + static class ShareableMemoryKeyValue extends KeyValue implements ShareableMemory { public ShareableMemoryKeyValue(byte[] bytes, int offset, int length) { super(bytes, offset, length); @@ -133,6 +144,31 @@ public class KeyValueCodec implements Codec { return kv; } } + + static class ShareableMemoryOffheapKeyValue extends OffheapKeyValue implements ShareableMemory { + public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length) { + super(buf, offset, length); + } + + public ShareableMemoryOffheapKeyValue(ByteBuffer buf, int offset, int length, boolean hasTags, + long seqId) { + super(buf, offset, length, hasTags, seqId); + } + + @Override + public Cell cloneToCell() { + byte[] copy = new byte[this.length]; + ByteBufferUtils.copyFromBufferToArray(copy, this.buf, this.offset, 0, this.length); + KeyValue kv; + if (this.hasTags) { + kv = new KeyValue(copy, 0, copy.length); + } else { + kv = new NoTagsKeyValue(copy, 0, copy.length); + } + kv.setSequenceId(this.getSequenceId()); + return kv; + } + } } /** @@ -144,8 +180,8 @@ public class KeyValueCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return new ByteBufferedKeyValueDecoder(buf); + public Decoder getDecoder(ByteBuff buf) { + return new ByteBuffKeyValueDecoder(buf); } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java index 63c02e8..57b9718 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; /** @@ -78,16 +79,20 @@ public class KeyValueCodecWithTags implements Codec { } } - public static class ByteBufferedKeyValueDecoder - extends KeyValueCodec.ByteBufferedKeyValueDecoder { + public static class ByteBuffKeyValueDecoder extends KeyValueCodec.ByteBuffKeyValueDecoder { - public ByteBufferedKeyValueDecoder(ByteBuffer buf) { + public ByteBuffKeyValueDecoder(ByteBuff buf) { super(buf); } protected Cell createCell(byte[] buf, int offset, int len) { return new ShareableMemoryKeyValue(buf, offset, len); } + + @Override + protected Cell createCell(ByteBuffer bb, int pos, int len) { + return new ShareableMemoryOffheapKeyValue(bb, pos, len); + } } /** @@ -104,7 +109,7 @@ public class KeyValueCodecWithTags implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return new ByteBufferedKeyValueDecoder(buf); + public Decoder getDecoder(ByteBuff buf) { + return new ByteBuffKeyValueDecoder(buf); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java index e528f02..115671d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java @@ -140,7 +140,7 @@ public class ByteBufferPool { buffers.offer(buf); } - int getBufferSize() { + public int getBufferSize() { return this.bufferSize; } @@ -148,7 +148,7 @@ public class ByteBufferPool { * @return Number of free buffers */ @VisibleForTesting - int getQueueSize() { + public int getQueueSize() { return buffers.size(); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java new file mode 100644 index 0000000..195e86a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferSupportOutputStreamWrapper.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * When deal with OutputStream which is not ByteBufferSupportOutputStream type, wrap it with this + * class. We will have to write offheap ByteBuffer (DBB) data into the OS. This class is having a + * temp byte array to which we can copy the DBB data for writing to the OS. + *
+ * This is used while writing Cell data to WAL. In case of AsyncWAL, the OS created there is + * ByteBufferSupportOutputStream. But in case of FSHLog, the OS passed by DFS client, is not of type + * ByteBufferSupportOutputStream. We will need this temp solution until DFS client supports writing + * ByteBuffer directly to the OS it creates. + *
+ * Note that this class is not thread safe. + */ +@InterfaceAudience.Private +public class ByteBufferSupportOutputStreamWrapper extends OutputStream + implements ByteBufferSupportOutputStream { + + private static final int TEMP_BUF_LENGTH = 4 * 1024; + private final OutputStream os; + private byte[] tempBuf = null; + + public ByteBufferSupportOutputStreamWrapper(OutputStream os) { + this.os = os; + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + byte[] buf = null; + if (len > TEMP_BUF_LENGTH) { + buf = new byte[len]; + } else { + if (this.tempBuf == null) { + this.tempBuf = new byte[TEMP_BUF_LENGTH]; + } + buf = this.tempBuf; + } + ByteBufferUtils.copyFromBufferToArray(buf, b, off, 0, len); + this.os.write(buf, 0, len); + } + + @Override + public void writeInt(int i) throws IOException { + StreamUtils.writeInt(this.os, i); + } + + @Override + public void write(int b) throws IOException { + this.os.write(b); + } + + public void write(byte b[], int off, int len) throws IOException { + this.os.write(b, off, len); + } + + @Override + public void flush() throws IOException { + this.os.flush(); + } + + @Override + public void close() throws IOException { + this.os.close(); + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 183a031..60202a0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.nio; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -34,7 +36,10 @@ import org.apache.hadoop.io.WritableUtils; * helps us in the read path. */ @InterfaceAudience.Private +// TODO to have another name. This can easily get confused with netty's ByteBuf public abstract class ByteBuff { + private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. + /** * @return this ByteBuff's current position */ @@ -356,6 +361,14 @@ public abstract class ByteBuff { public abstract long getLongAfterPosition(int offset); /** + * Copy the content from this ByteBuff to a byte[]. + * @return byte[] with the copied contents from this ByteBuff. + */ + public byte[] toBytes() { + return toBytes(0, this.limit()); + } + + /** * Copy the content from this ByteBuff to a byte[] based on the given offset and * length * @@ -389,7 +402,39 @@ public abstract class ByteBuff { */ public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); + /** + * Reads bytes from the given channel into this ByteBuff + * @param channel + * @return The number of bytes read from the channel + * @throws IOException + */ + public abstract int read(ReadableByteChannel channel) throws IOException; + // static helper methods + public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { + if (buf.remaining() <= NIO_BUFFER_LIMIT) { + return channel.read(buf); + } + int originalLimit = buf.limit(); + int initialRemaining = buf.remaining(); + int ret = 0; + + while (buf.remaining() > 0) { + try { + int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); + buf.limit(buf.position() + ioSize); + ret = channel.read(buf); + if (ret < ioSize) { + break; + } + } finally { + buf.limit(originalLimit); + } + } + int nBytes = initialRemaining - buf.remaining(); + return (nBytes > 0) ? nBytes : ret; + } + /** * Read integer from ByteBuff coded in 7 bits and increment position. * @return Read integer. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 107bb3f..948321d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -17,16 +17,20 @@ */ package org.apache.hadoop.hbase.nio; +import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.InvalidMarkException; +import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ObjectIntPair; +import com.google.common.annotations.VisibleForTesting; + /** * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, @@ -1071,6 +1075,28 @@ public class MultiByteBuff extends ByteBuff { } @Override + public int read(ReadableByteChannel channel) throws IOException { + int total = 0; + while (true) { + // Read max possible into the current BB + int len = channelRead(channel, this.curItem); + if (len > 0) + total += len; + if (this.curItem.hasRemaining()) { + // We were not able to read enough to fill the current BB itself. Means there is no point in + // doing more reads from Channel. Only this much there for now. + break; + } else { + if (this.curItemIndex >= this.limitedItemIndex) + break; + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + } + } + return total; + } + + @Override public boolean equals(Object obj) { if (!(obj instanceof MultiByteBuff)) return false; if (this == obj) return true; @@ -1091,4 +1117,12 @@ public class MultiByteBuff extends ByteBuff { } return hash; } + + /** + * @return the ByteBuffers which this wraps. + */ + @VisibleForTesting + public ByteBuffer[] getEnclosingByteBuffers() { + return this.items; + } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 9ad28dc..0e45410 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.nio; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -25,6 +27,8 @@ import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.UnsafeAccess; import org.apache.hadoop.hbase.util.UnsafeAvailChecker; +import com.google.common.annotations.VisibleForTesting; + import sun.nio.ch.DirectBuffer; /** @@ -313,6 +317,11 @@ public class SingleByteBuff extends ByteBuff { } @Override + public int read(ReadableByteChannel channel) throws IOException { + return channelRead(channel, buf); + } + + @Override public boolean equals(Object obj) { if(!(obj instanceof SingleByteBuff)) return false; return this.buf.equals(((SingleByteBuff)obj).buf); @@ -326,7 +335,8 @@ public class SingleByteBuff extends ByteBuff { /** * @return the ByteBuffer which this wraps. */ - ByteBuffer getEnclosingByteBuffer() { + @VisibleForTesting + public ByteBuffer getEnclosingByteBuffer() { return this.buf; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java index ea162fc..41dc387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/codec/MessageCodec.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.codec; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -83,8 +83,8 @@ public class MessageCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 34140a9..d570b17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.CallDroppedException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceStability.Evolving public class CallRunner { - private static final Log LOG = LogFactory.getLog(CallRunner.class); private static final CallDroppedException CALL_DROPPED_EXCEPTION = new CallDroppedException(); @@ -143,6 +140,8 @@ public class CallRunner { sucessful = true; } } + // return back the RPC request read BB we can do here. It is done by now. + call.cleanup(); // Set the response Message param = resultPair != null ? resultPair.getFirst() : null; CellScanner cells = resultPair != null ? resultPair.getSecond() : null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 1c2d51f..87e9160 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.ByteArrayInputStream; @@ -99,6 +100,9 @@ import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -146,6 +150,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; @@ -304,6 +309,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private UserProvider userProvider; private final ByteBufferPool reservoir; + // The requests and response will use buffers from ByteBufferPool, when the size of the + // request/response is at least this size. + // We make this to be 1/6th of the pool buffer size. + private final int minSizeForReservoirUse; private volatile boolean allowFallbackToSimpleAuth; @@ -344,10 +353,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected boolean isError; protected TraceInfo tinfo; private ByteBufferListOutputStream cellBlockStream = null; + private CallCleanup reqCleanup = null; private User user; private InetAddress remoteAddress; - private RpcCallback callback; + private RpcCallback rpcCallback; private long responseCellSize = 0; private long responseBlockSize = 0; @@ -357,7 +367,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { justification="Can't figure why this complaint is happening... see below") Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, - long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout) { + long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout, + CallCleanup reqCleanup) { this.id = id; this.service = service; this.md = md; @@ -377,6 +388,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { connection == null? null: connection.retryImmediatelySupported; this.timeout = timeout; this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; + this.reqCleanup = reqCleanup; } /** @@ -391,9 +403,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // got from pool. this.cellBlockStream = null; } + cleanup();// If the call was run successfuly, we might have already returned the + // BB back to pool. No worries..Then inputCellBlock will be null this.connection.decRpcCount(); // Say that we're done with this call. } + protected void cleanup() { + if (this.reqCleanup != null) { + this.reqCleanup.run(); + this.reqCleanup = null; + } + } + @Override public String toString() { return toShortString() + " param: " + @@ -515,9 +536,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.response = bc; // Once a response message is created and set to this.response, this Call can be treated as // done. The Responder thread will do the n/w write of this message back to client. - if (this.callback != null) { + if (this.rpcCallback != null) { try { - this.callback.run(); + this.rpcCallback.run(); } catch (Exception e) { // Don't allow any exception here to kill this handler thread. LOG.warn("Exception while running the Rpc Callback.", e); @@ -722,7 +743,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { @Override public synchronized void setCallBack(RpcCallback callback) { - this.callback = callback; + this.rpcCallback = callback; } @Override @@ -731,6 +752,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } + static interface CallCleanup { + void run(); + } + /** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { @@ -1289,7 +1314,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // If the connection header has been read or not. private boolean connectionHeaderRead = false; protected SocketChannel channel; - private ByteBuffer data; + private ByteBuff data; + private CallCleanup callCleanup; private ByteBuffer dataLengthBuffer; protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque(); private final Lock responseWriteLock = new ReentrantLock(); @@ -1327,17 +1353,17 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, - null, null, this, null, 0, null, null, 0); + null, null, this, null, 0, null, null, 0, null); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup private static final int SASL_CALLID = -33; private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null, - 0, null, null, 0); + 0, null, null, 0, null); // Fake 'call' for connection header response private static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; private final Call setConnectionHeaderResponseCall = new Call(CONNECTION_HEADER_RESPONSE_CALLID, - null, null, null, null, null, this, null, 0, null, null, 0); + null, null, null, null, null, this, null, 0, null, null, 0, null); // was authentication allowed with a fallback to simple auth private boolean authenticatedWithFallback; @@ -1352,6 +1378,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.channel = channel; this.lastContact = lastContact; this.data = null; + this.callCleanup = null; this.dataLengthBuffer = ByteBuffer.allocate(4); this.socket = channel.socket(); this.addr = socket.getInetAddress(); @@ -1437,7 +1464,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return authorizedUgi; } - private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, + private void saslReadAndProcess(ByteBuff saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) @@ -1447,13 +1474,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (!useWrap) { processOneRpc(saslToken); } else { - byte[] b = saslToken.array(); + byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes(); byte [] plaintextData; if (useCryptoAesWrap) { // unwrap with CryptoAES - plaintextData = cryptoAES.unwrap(b, saslToken.position(), saslToken.limit()); + plaintextData = cryptoAES.unwrap(b, 0, b.length); } else { - plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); + plaintextData = saslServer.unwrap(b, 0, b.length); } processUnwrappedData(plaintextData); } @@ -1506,7 +1533,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken.array()); + replyToken = saslServer + .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1759,7 +1787,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Notify the client about the offending request Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null, - null, this, responder, 0, null, this.addr,0); + null, this, responder, 0, null, this.addr, 0, null); metrics.exception(REQUEST_TOO_BIG_EXCEPTION); // Make sure the client recognizes the underlying exception // Otherwise, throw a DoNotRetryIOException. @@ -1779,7 +1807,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return -1; } - data = ByteBuffer.allocate(dataLength); + // Allocate ByteBuffer(s) and assign to 'data' + allocateByteBuffToReadInto(dataLength); // Increment the rpc count. This counter will be decreased when we write // the response. If we want the connection to be detected as idle properly, we @@ -1787,7 +1816,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { incRpcCount(); } - count = channelRead(channel, data); + count = channelDataRead(channel, data); if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0 process(); @@ -1796,11 +1825,40 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return count; } + private void allocateByteBuffToReadInto(int length) { + // We create random on heap buffers are read into those when + // 1. ByteBufferPool is not there. + // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is + // waste then. Also if all the reqs are of this size, we will be creating larger sized + // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like + // RegionOpen. + // 3. If it is an initial handshake signal or initial connection request. Any way then + // condition 2 itself will match + // 4. When SASL use is ON. + if (reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || useSasl + || length < minSizeForReservoirUse) { + this.data = new SingleByteBuff(ByteBuffer.allocate(length)); + } else { + Pair pair = RpcServer.allocateByteBuffToReadInto(reservoir, + minSizeForReservoirUse, length); + this.data = pair.getFirst(); + this.callCleanup = pair.getSecond(); + } + } + + protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { + int count = buf.read(channel); + if (count > 0) { + metrics.receivedBytes(count); + } + return count; + } + /** * Process the data buffer and clean the connection state for the next call. */ private void process() throws IOException, InterruptedException { - data.flip(); + data.rewind(); try { if (skipInitialSaslHandshake) { skipInitialSaslHandshake = false; @@ -1816,6 +1874,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } finally { dataLengthBuffer.clear(); // Clean for the next call data = null; // For the GC + this.callCleanup = null; } } @@ -1831,7 +1890,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null,0); + Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null, 0, + null); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1839,9 +1899,15 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Reads the connection header following version - private void processConnectionHeader(ByteBuffer buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom( - new ByteBufferInputStream(buf)); + private void processConnectionHeader(ByteBuff buf) throws IOException { + if (buf.hasArray()) { + this.connectionHeader = ConnectionHeader.parseFrom(buf.array()); + } else { + CodedInputStream cis = CodedInputStream + .newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true); + cis.enableAliasing(true); + this.connectionHeader = ConnectionHeader.parseFrom(cis); + } String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -2043,13 +2109,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData); + processOneRpc(new SingleByteBuff(unwrappedData)); unwrappedData = null; } } } - private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuff buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -2071,12 +2137,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); + CodedInputStream cis; + if (buf.hasArray()) { + cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); + } else { + cis = CodedInputStream.newInstance(new ByteBuffByteInput(buf, 0, buf.limit()), true); + cis.enableAliasing(true); + } int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); @@ -2093,7 +2165,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) { final Call callTooBig = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null, 0); + responder, totalRequestSize, null, null, 0, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION); setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION, @@ -2127,7 +2199,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (header.hasCellBlockMeta()) { buf.position(offset); - cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf); + cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, + this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); @@ -2148,7 +2221,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { final Call readParamsFailedCall = new Call(id, this.service, null, null, null, null, this, - responder, totalRequestSize, null, null, 0); + responder, totalRequestSize, null, null, 0, null); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); setupResponse(responseBuffer, readParamsFailedCall, t, msg + "; " + t.getMessage()); @@ -2164,7 +2237,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { timeout = Math.max(minClientRequestTimeout, header.getTimeout()); } Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, - totalRequestSize, traceInfo, this.addr, timeout); + totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup); if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) { callQueueSizeInBytes.add(-1 * call.getSize()); @@ -2211,6 +2284,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected synchronized void close() { disposeSasl(); data = null; + callCleanup = null; if (!channel.isOpen()) return; try {socket.shutdownOutput();} catch(Exception ignored) { @@ -2301,8 +2375,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2)); + this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir); } else { reservoir = null; + this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place. } this.server = server; this.services = services; @@ -2347,6 +2423,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.scheduler.init(new RpcSchedulerContext(this)); } + @VisibleForTesting + static int getMinSizeForReservoirUse(ByteBufferPool pool) { + return pool.getBufferSize() / 6; + } + @Override public void onConfigurationChange(Configuration newConf) { initReconfigurable(newConf); @@ -2755,6 +2836,59 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } /** + * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool + * as much as possible. + * + * @param pool The ByteBufferPool to use + * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer + * need of size below this, create on heap ByteBuffer. + * @param reqLen Bytes count in request + */ + @VisibleForTesting + static Pair allocateByteBuffToReadInto(ByteBufferPool pool, + int minSizeForPoolUse, int reqLen) { + ByteBuff resultBuf; + List bbs = new ArrayList((reqLen / pool.getBufferSize()) + 1); + int remain = reqLen; + ByteBuffer buf = null; + while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { + bbs.add(buf); + remain -= pool.getBufferSize(); + } + ByteBuffer[] bufsFromPool = null; + if (bbs.size() > 0) { + bufsFromPool = new ByteBuffer[bbs.size()]; + bbs.toArray(bufsFromPool); + } + if (remain > 0) { + bbs.add(ByteBuffer.allocate(remain)); + } + if (bbs.size() > 1) { + ByteBuffer[] items = new ByteBuffer[bbs.size()]; + bbs.toArray(items); + resultBuf = new MultiByteBuff(items); + } else { + // We are backed by single BB + resultBuf = new SingleByteBuff(bbs.get(0)); + } + resultBuf.limit(reqLen); + CallCleanup callCleanup = null; + if (bufsFromPool != null) { + final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; + callCleanup = new CallCleanup() { + @Override + public void run() { + // Return back all the BBs to pool + for (int i = 0; i < bufsFromPoolFinal.length; i++) { + pool.putbackBuffer(bufsFromPoolFinal[i]); + } + } + }; + } + return new Pair(resultBuf, callCleanup); + } + + /** * Needed for features such as delayed calls. We need to be able to store the current call * so that we can complete it later or ask questions of what is supported by the current ongoing * call. @@ -3054,4 +3188,44 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { idleScanTimer.schedule(idleScanTask, idleScanInterval); } } -} + + private static class ByteBuffByteInput extends ByteInput { + + private ByteBuff buf; + private int offset; + private int length; + + ByteBuffByteInput(ByteBuff buf, int offset, int length) { + this.buf = buf; + this.offset = offset; + this.length = length; + } + + @Override + public byte read(int offset) { + return this.buf.get(getAbsoluteOffset(offset)); + } + + private int getAbsoluteOffset(int offset) { + return this.offset + offset; + } + + @Override + public int read(int offset, byte[] out, int outOffset, int len) { + this.buf.get(getAbsoluteOffset(offset), out, outOffset, len); + return len; + } + + @Override + public int read(int offset, ByteBuffer out) { + int len = out.remaining(); + this.buf.get(out, getAbsoluteOffset(offset), len); + return len; + } + + @Override + public int size() { + return this.length; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 52dfae0..9e80bbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.codec.BaseDecoder; import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStreamWrapper; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -356,14 +358,18 @@ public class WALCellCodec implements Codec { } @Override - public Decoder getDecoder(ByteBuffer buf) { - return getDecoder(new ByteBufferInputStream(buf)); + public Decoder getDecoder(ByteBuff buf) { + return getDecoder(new ByteBuffInputStream(buf)); } @Override public Encoder getEncoder(OutputStream os) { - return (compression == null) - ? new EnsureKvEncoder(os) : new CompressedKvEncoder(os, compression); + if (compression == null) { + os = (os instanceof ByteBufferSupportOutputStream) ? os + : new ByteBufferSupportOutputStreamWrapper(os); + return new EnsureKvEncoder(os); + } + return new CompressedKvEncoder(os, compression); } public ByteStringCompressor getByteStringCompressor() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 2211e8f..5a9178a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -33,7 +33,6 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; @@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -315,7 +315,7 @@ public abstract class AbstractTestIPC { } @Override - protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + protected void processRequest(ByteBuff buf) throws IOException, InterruptedException { // this will throw exception after the connection header is read, and an RPC is sent // from client throw new DoNotRetryIOException("Failing for test"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java new file mode 100644 index 0000000..6fd65f2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RPCTests.class, SmallTests.class }) +public class TestRpcServer { + + @Test + public void testAllocateByteBuffToReadInto() throws Exception { + int maxBuffersInPool = 10; + ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool); + initPoolWithAllBuffers(pool, maxBuffersInPool); + ByteBuff buff = null; + Pair pair; + // When the request size is less than 1/6th of the pool buffer size. We should use on demand + // created on heap Buffer + pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), + 200); + buff = pair.getFirst(); + assertTrue(buff.hasArray()); + assertEquals(maxBuffersInPool, pool.getQueueSize()); + assertNull(pair.getSecond()); + // When the request size is > 1/6th of the pool buffer size. + pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), + 1024); + buff = pair.getFirst(); + assertFalse(buff.hasArray()); + assertEquals(maxBuffersInPool - 1, pool.getQueueSize()); + assertNotNull(pair.getSecond()); + pair.getSecond().run();// CallCleanup#run should put back the BB to pool. + assertEquals(maxBuffersInPool, pool.getQueueSize()); + // Request size> pool buffer size + pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), + 7 * 1024); + buff = pair.getFirst(); + assertFalse(buff.hasArray()); + assertTrue(buff instanceof MultiByteBuff); + ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); + assertEquals(2, bbs.length); + assertTrue(bbs[0].isDirect()); + assertTrue(bbs[1].isDirect()); + assertEquals(6 * 1024, bbs[0].limit()); + assertEquals(1024, bbs[1].limit()); + assertEquals(maxBuffersInPool - 2, pool.getQueueSize()); + assertNotNull(pair.getSecond()); + pair.getSecond().run();// CallCleanup#run should put back the BB to pool. + assertEquals(maxBuffersInPool, pool.getQueueSize()); + + pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), + 6 * 1024 + 200); + buff = pair.getFirst(); + assertFalse(buff.hasArray()); + assertTrue(buff instanceof MultiByteBuff); + bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); + assertEquals(2, bbs.length); + assertTrue(bbs[0].isDirect()); + assertFalse(bbs[1].isDirect()); + assertEquals(6 * 1024, bbs[0].limit()); + assertEquals(200, bbs[1].limit()); + assertEquals(maxBuffersInPool - 1, pool.getQueueSize()); + assertNotNull(pair.getSecond()); + pair.getSecond().run();// CallCleanup#run should put back the BB to pool. + assertEquals(maxBuffersInPool, pool.getQueueSize()); + + ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1]; + for (int i = 0; i < maxBuffersInPool - 1; i++) { + buffers[i] = pool.getBuffer(); + } + pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), + 20 * 1024); + buff = pair.getFirst(); + assertFalse(buff.hasArray()); + assertTrue(buff instanceof MultiByteBuff); + bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); + assertEquals(2, bbs.length); + assertTrue(bbs[0].isDirect()); + assertFalse(bbs[1].isDirect()); + assertEquals(6 * 1024, bbs[0].limit()); + assertEquals(14 * 1024, bbs[1].limit()); + assertEquals(0, pool.getQueueSize()); + assertNotNull(pair.getSecond()); + pair.getSecond().run();// CallCleanup#run should put back the BB to pool. + assertEquals(1, pool.getQueueSize()); + pool.getBuffer(); + pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), + 7 * 1024); + buff = pair.getFirst(); + assertTrue(buff.hasArray()); + assertTrue(buff instanceof SingleByteBuff); + assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit()); + assertNull(pair.getSecond()); + } + + private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) { + ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool]; + // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back + // all. Makes pool with max #buffers. + for (int i = 0; i < maxBuffersInPool; i++) { + buffers[i] = pool.getBuffer(); + } + for (ByteBuffer buf : buffers) { + pool.putbackBuffer(buf); + } + } +} \ No newline at end of file