diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index b7e7728..414abee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -65,6 +65,7 @@ public class IPCUtil { this.conf = conf; this.cellBlockDecompressionMultiplier = conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3); + // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in // #buildCellBlock. this.cellBlockBuildingInitialBufferSize = @@ -91,23 +92,46 @@ public class IPCUtil { public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) throws IOException { + return buildCellBlock(codec, compressor, cellScanner, null); + } + + /** + * Puts CellScanner Cells into a cell block using passed in codec and/or + * compressor. + * @param codec + * @param compressor + * @param cellScanner + * @param bb ByteBuffer to use. Can be null. You'd pass in a ByteBuffer if you want to practice + * recycling. If the passed in ByteBuffer is too small, it is discarded and a new one allotted + * so you will get back the passed-in ByteBuffer or a new, right-sized one. SIDE EFFECT!!!!! + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. + * @throws IOException + */ + @SuppressWarnings("resource") + public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, + final CellScanner cellScanner, final ByteBuffer bb) + throws IOException { if (cellScanner == null) return null; if (codec == null) throw new CellScannerButNoCodecException(); int bufferSize = this.cellBlockBuildingInitialBufferSize; - if (cellScanner instanceof HeapSize) { - long longSize = ((HeapSize)cellScanner).heapSize(); - // Just make sure we don't have a size bigger than an int. - if (longSize > Integer.MAX_VALUE) { - throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); + ByteBufferOutputStream baos = null; + if (bb != null) { + bufferSize = bb.capacity(); + baos = new ByteBufferOutputStream(bb); + } else { + // Then we need to make our own to return. + if (cellScanner instanceof HeapSize) { + long longSize = ((HeapSize)cellScanner).heapSize(); + // Just make sure we don't have a size bigger than an int. + if (longSize > Integer.MAX_VALUE) { + throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); + } + bufferSize = ClassSize.align((int)longSize); } - bufferSize = ClassSize.align((int)longSize); - } // TODO: Else, get estimate on size of buffer rather than have the buffer resize. - // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of - // total size before creating the buffer. It costs somw small percentage. If we are usually - // within the estimated buffer size, then the cost is not worth it. If we are often well - // outside the guesstimated buffer size, the processing can be done in half the time if we - // go w/ the estimated size rather than let the buffer resize. - ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); + baos = new ByteBufferOutputStream(bufferSize); + } OutputStream os = baos; Compressor poolCompressor = null; try { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java new file mode 100644 index 0000000..10b9bf6 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. + * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size + * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say, + * Hadoop's ElasticByteBuffferPool). + * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go + * rather than add it to the pool. If more ByteBuffers than the configured maximum instances, + * we will not add the passed ByteBuffer to the pool; we will just drop it + * (we will log a WARN in this case that we are at capacity). + * + *

The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to + * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this + * class for a couple of seconds to get reporting on how it is running when deployed. + * + *

This class is thread safe. + */ +@InterfaceAudience.Private +public class BoundedByteBufferPool { + private final Log LOG = LogFactory.getLog(this.getClass()); + + @VisibleForTesting + final Queue buffers; + + // Maximum size of a ByteBuffer to retain in pool + private final int maxByteBufferSizeToCache; + + // A running average only it only rises, it never recedes + private volatile int runningAverage; + + // Scratch that keeps rough total size of pooled bytebuffers + private volatile int totalReservoirCapacity; + + // For reporting + private AtomicLong allocations = new AtomicLong(0); + + /** + * @param maxByteBufferSizeToCache + * @param initialByteBufferSize + * @param maxToCache + */ + public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize, + final int maxToCache) { + this.maxByteBufferSizeToCache = maxByteBufferSizeToCache; + this.runningAverage = initialByteBufferSize; + this.buffers = new ArrayBlockingQueue(maxToCache, true); + } + + public ByteBuffer getBuffer() { + ByteBuffer bb = this.buffers.poll(); + if (bb != null) { + // Clear sets limit == capacity. Postion == 0. + bb.clear(); + this.totalReservoirCapacity -= bb.capacity(); + } else { + bb = ByteBuffer.allocate(this.runningAverage); + this.allocations.incrementAndGet(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("runningAverage=" + this.runningAverage + + ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() + + ", alloctions=" + this.allocations.get()); + } + return bb; + } + + public void putBuffer(ByteBuffer bb) { + // If buffer is larger than we want to keep around, just let it go. + if (bb.capacity() > this.maxByteBufferSizeToCache) return; + if (!this.buffers.offer(bb)) { + LOG.warn("At capacity: " + this.buffers.size()); + } else { + int size = this.buffers.size(); // This size may be inexact. + this.totalReservoirCapacity += bb.capacity(); + int average = this.totalReservoirCapacity / size; + if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) { + this.runningAverage = average; + } + } + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index 257b850..af12113 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -43,17 +43,32 @@ public class ByteBufferOutputStream extends OutputStream { } public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) { - if (useDirectByteBuffer) { - buf = ByteBuffer.allocateDirect(capacity); - } else { - buf = ByteBuffer.allocate(capacity); - } + this(allocate(capacity, useDirectByteBuffer)); + } + + /** + * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its + * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!! + * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when + * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation + * is so we can recycle buffers rather than allocate each time; it can get expensive especially + * if the buffers are big doing allocations each time or having them undergo resizing because + * initial allocation was small. + * @see #getByteBuffer() + */ + public ByteBufferOutputStream(final ByteBuffer bb) { + this.buf = bb; + this.buf.clear(); } public int size() { return buf.position(); } + private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { + return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity); + } + /** * This flips the underlying BB so be sure to use it _last_! * @return ByteBuffer @@ -70,12 +85,7 @@ public class ByteBufferOutputStream extends OutputStream { int newSize = (int)Math.min((((long)buf.capacity()) * 2), (long)(Integer.MAX_VALUE)); newSize = Math.max(newSize, buf.position() + extra); - ByteBuffer newBuf = null; - if (buf.isDirect()) { - newBuf = ByteBuffer.allocateDirect(newSize); - } else { - newBuf = ByteBuffer.allocate(newSize); - } + ByteBuffer newBuf = allocate(newSize, buf.isDirect()); buf.flip(); newBuf.put(buf); buf = newBuf; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java new file mode 100644 index 0000000..0d45287 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestBoundedByteBufferPool { + final int maxByteBufferSizeToCache = 10; + final int initialByteBufferSize = 1; + final int maxToCache = 10; + BoundedByteBufferPool reservoir; + + @Before + public void before() { + this.reservoir = + new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache); + } + + @After + public void after() { + this.reservoir = null; + } + + @Test + public void testEquivalence() { + ByteBuffer bb = ByteBuffer.allocate(1); + this.reservoir.putBuffer(bb); + this.reservoir.putBuffer(bb); + this.reservoir.putBuffer(bb); + assertEquals(3, this.reservoir.buffers.size()); + } + + @Test + public void testGetPut() { + ByteBuffer bb = this.reservoir.getBuffer(); + assertEquals(initialByteBufferSize, bb.capacity()); + assertEquals(0, this.reservoir.buffers.size()); + this.reservoir.putBuffer(bb); + assertEquals(1, this.reservoir.buffers.size()); + // Now remove a buffer and don't put it back so reservoir is empty. + this.reservoir.getBuffer(); + assertEquals(0, this.reservoir.buffers.size()); + // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works. + // Need to add then remove, then get a new bytebuffer so reservoir internally is doing + // allocation + final int newCapacity = 2; + this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity)); + assertEquals(1, reservoir.buffers.size()); + this.reservoir.getBuffer(); + assertEquals(0, this.reservoir.buffers.size()); + bb = this.reservoir.getBuffer(); + assertEquals(newCapacity, bb.capacity()); + // Assert that adding a too-big buffer won't happen + assertEquals(0, this.reservoir.buffers.size()); + this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2)); + assertEquals(0, this.reservoir.buffers.size()); + // Assert we can't add more than max allowed instances. + for (int i = 0; i < maxToCache; i++) { + this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize)); + } + assertEquals(maxToCache, this.reservoir.buffers.size()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 064771c..89f1be3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta; @@ -267,6 +268,9 @@ public class RpcServer implements RpcServerInterface { private UserProvider userProvider; + private final BoundedByteBufferPool reservoir; + + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -293,6 +297,7 @@ public class RpcServer implements RpcServerInterface { protected long size; // size of current call protected boolean isError; protected TraceInfo tinfo; + private ByteBuffer cellBlock = null; Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, @@ -313,6 +318,19 @@ public class RpcServer implements RpcServerInterface { this.tinfo = tinfo; } + /** + * Call is done. Execution happened and we returned results to client. It is now safe to + * cleanup. + */ + void done() { + if (this.cellBlock != null) { + // Return buffer to reservoir now we are done with it. + reservoir.putBuffer(this.cellBlock); + this.cellBlock = null; + } + this.connection.decRpcCount(); // Say that we're done with this call. + } + @Override public String toString() { return toShortString() + " param: " + @@ -375,12 +393,17 @@ public class RpcServer implements RpcServerInterface { // Set the exception as the result of the method invocation. headerBuilder.setException(exceptionBuilder.build()); } - ByteBuffer cellBlock = - ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells); - if (cellBlock != null) { + // Get a bb from the reservoir and pass it to buildCellBlock. What comes back will be the + // passed in reservoir bb or a resized one that we should instead add back to the reservoir + // when done. Keep reference so can add it back to the reservoir when finished. This is + // hacky and the hack is not contained but benefits are high when we can avoid a big buffer + // allocation on each rpc. + this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec, + this.connection.compressionCodec, cells, reservoir.getBuffer()); + if (this.cellBlock != null) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. - cellBlockBuilder.setLength(cellBlock.limit()); + cellBlockBuilder.setLength(this.cellBlock.limit()); headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); @@ -390,9 +413,9 @@ public class RpcServer implements RpcServerInterface { ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) + - (cellBlock == null? 0: cellBlock.limit()); + (this.cellBlock == null? 0: this.cellBlock.limit()); ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); - bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock); + bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock); if (connection.useWrap) { bc = wrapWithSasl(bc); } @@ -1051,7 +1074,7 @@ public class RpcServer implements RpcServerInterface { } if (!call.response.hasRemaining()) { - call.connection.decRpcCount(); // Say that we're done with this call. + call.done(); return true; } else { return false; // Socket can't take more, we will have to come back. @@ -1885,7 +1908,13 @@ public class RpcServer implements RpcServerInterface { final InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - + this.reservoir = new BoundedByteBufferPool( + conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024), + conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024), + // Make the max twice the number of handlers to be safe. + conf.getInt("hbase.ipc.server.reservoir.initial.max", + conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2)); this.server = server; this.services = services; this.bindAddress = bindAddress; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java new file mode 100644 index 0000000..55f8dda --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestByteBufferOutputStream { + @Test + public void testByteBufferReuse() throws IOException { + byte [] someBytes = Bytes.toBytes("some bytes"); + ByteBuffer bb = ByteBuffer.allocate(someBytes.length); + ByteBuffer bbToReuse = write(bb, someBytes); + bbToReuse = write(bbToReuse, Bytes.toBytes("less")); + assertTrue(bb == bbToReuse); + } + + private ByteBuffer write(final ByteBuffer bb, final byte [] bytes) throws IOException { + try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb)) { + bbos.write(bytes); + assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0, bytes.length)) == 0); + bbos.flush(); + return bbos.getByteBuffer(); + } + } +} \ No newline at end of file