diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index b7e7728..414abee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -65,6 +65,7 @@ public class IPCUtil {
this.conf = conf;
this.cellBlockDecompressionMultiplier =
conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+
// Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
// #buildCellBlock.
this.cellBlockBuildingInitialBufferSize =
@@ -91,23 +92,46 @@ public class IPCUtil {
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner)
throws IOException {
+ return buildCellBlock(codec, compressor, cellScanner, null);
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in codec and/or
+ * compressor.
+ * @param codec
+ * @param compressor
+ * @param cellScanner
+ * @param bb ByteBuffer to use. Can be null. You'd pass in a ByteBuffer if you want to practice
+ * recycling. If the passed in ByteBuffer is too small, it is discarded and a new one allotted
+ * so you will get back the passed-in ByteBuffer or a new, right-sized one. SIDE EFFECT!!!!!
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in codec and/or compressor; the returned buffer has been
+ * flipped and is ready for reading. Use limit to find total size.
+ * @throws IOException
+ */
+ @SuppressWarnings("resource")
+ public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ final CellScanner cellScanner, final ByteBuffer bb)
+ throws IOException {
if (cellScanner == null) return null;
if (codec == null) throw new CellScannerButNoCodecException();
int bufferSize = this.cellBlockBuildingInitialBufferSize;
- if (cellScanner instanceof HeapSize) {
- long longSize = ((HeapSize)cellScanner).heapSize();
- // Just make sure we don't have a size bigger than an int.
- if (longSize > Integer.MAX_VALUE) {
- throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
+ ByteBufferOutputStream baos = null;
+ if (bb != null) {
+ bufferSize = bb.capacity();
+ baos = new ByteBufferOutputStream(bb);
+ } else {
+ // Then we need to make our own to return.
+ if (cellScanner instanceof HeapSize) {
+ long longSize = ((HeapSize)cellScanner).heapSize();
+ // Just make sure we don't have a size bigger than an int.
+ if (longSize > Integer.MAX_VALUE) {
+ throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
+ }
+ bufferSize = ClassSize.align((int)longSize);
}
- bufferSize = ClassSize.align((int)longSize);
- } // TODO: Else, get estimate on size of buffer rather than have the buffer resize.
- // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of
- // total size before creating the buffer. It costs somw small percentage. If we are usually
- // within the estimated buffer size, then the cost is not worth it. If we are often well
- // outside the guesstimated buffer size, the processing can be done in half the time if we
- // go w/ the estimated size rather than let the buffer resize.
- ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+ baos = new ByteBufferOutputStream(bufferSize);
+ }
OutputStream os = baos;
Compressor poolCompressor = null;
try {
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
new file mode 100644
index 0000000..10b9bf6
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer.
+ * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size
+ * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say,
+ * Hadoop's ElasticByteBuffferPool).
+ * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go
+ * rather than add it to the pool. If more ByteBuffers than the configured maximum instances,
+ * we will not add the passed ByteBuffer to the pool; we will just drop it
+ * (we will log a WARN in this case that we are at capacity).
+ *
+ *
The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to
+ * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this
+ * class for a couple of seconds to get reporting on how it is running when deployed.
+ *
+ *
This class is thread safe.
+ */
+@InterfaceAudience.Private
+public class BoundedByteBufferPool {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+
+ @VisibleForTesting
+ final Queue buffers;
+
+ // Maximum size of a ByteBuffer to retain in pool
+ private final int maxByteBufferSizeToCache;
+
+ // A running average only it only rises, it never recedes
+ private volatile int runningAverage;
+
+ // Scratch that keeps rough total size of pooled bytebuffers
+ private volatile int totalReservoirCapacity;
+
+ // For reporting
+ private AtomicLong allocations = new AtomicLong(0);
+
+ /**
+ * @param maxByteBufferSizeToCache
+ * @param initialByteBufferSize
+ * @param maxToCache
+ */
+ public BoundedByteBufferPool(final int maxByteBufferSizeToCache, final int initialByteBufferSize,
+ final int maxToCache) {
+ this.maxByteBufferSizeToCache = maxByteBufferSizeToCache;
+ this.runningAverage = initialByteBufferSize;
+ this.buffers = new ArrayBlockingQueue(maxToCache, true);
+ }
+
+ public ByteBuffer getBuffer() {
+ ByteBuffer bb = this.buffers.poll();
+ if (bb != null) {
+ // Clear sets limit == capacity. Postion == 0.
+ bb.clear();
+ this.totalReservoirCapacity -= bb.capacity();
+ } else {
+ bb = ByteBuffer.allocate(this.runningAverage);
+ this.allocations.incrementAndGet();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("runningAverage=" + this.runningAverage +
+ ", totalCapacity=" + this.totalReservoirCapacity + ", count=" + this.buffers.size() +
+ ", alloctions=" + this.allocations.get());
+ }
+ return bb;
+ }
+
+ public void putBuffer(ByteBuffer bb) {
+ // If buffer is larger than we want to keep around, just let it go.
+ if (bb.capacity() > this.maxByteBufferSizeToCache) return;
+ if (!this.buffers.offer(bb)) {
+ LOG.warn("At capacity: " + this.buffers.size());
+ } else {
+ int size = this.buffers.size(); // This size may be inexact.
+ this.totalReservoirCapacity += bb.capacity();
+ int average = this.totalReservoirCapacity / size;
+ if (average > this.runningAverage && average < this.maxByteBufferSizeToCache) {
+ this.runningAverage = average;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index 257b850..af12113 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -43,17 +43,32 @@ public class ByteBufferOutputStream extends OutputStream {
}
public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
- if (useDirectByteBuffer) {
- buf = ByteBuffer.allocateDirect(capacity);
- } else {
- buf = ByteBuffer.allocate(capacity);
- }
+ this(allocate(capacity, useDirectByteBuffer));
+ }
+
+ /**
+ * @param bb ByteBuffer to use. If too small, will be discarded and a new one allocated in its
+ * place; i.e. the passed in BB may NOT BE RETURNED!! Minimally it will be altered. SIDE EFFECT!!
+ * If you want to get the newly allocated ByteBuffer, you'll need to pick it up when
+ * done with this instance by calling {@link #getByteBuffer()}. All this encapsulation violation
+ * is so we can recycle buffers rather than allocate each time; it can get expensive especially
+ * if the buffers are big doing allocations each time or having them undergo resizing because
+ * initial allocation was small.
+ * @see #getByteBuffer()
+ */
+ public ByteBufferOutputStream(final ByteBuffer bb) {
+ this.buf = bb;
+ this.buf.clear();
}
public int size() {
return buf.position();
}
+ private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) {
+ return useDirectByteBuffer? ByteBuffer.allocateDirect(capacity): ByteBuffer.allocate(capacity);
+ }
+
/**
* This flips the underlying BB so be sure to use it _last_!
* @return ByteBuffer
@@ -70,12 +85,7 @@ public class ByteBufferOutputStream extends OutputStream {
int newSize = (int)Math.min((((long)buf.capacity()) * 2),
(long)(Integer.MAX_VALUE));
newSize = Math.max(newSize, buf.position() + extra);
- ByteBuffer newBuf = null;
- if (buf.isDirect()) {
- newBuf = ByteBuffer.allocateDirect(newSize);
- } else {
- newBuf = ByteBuffer.allocate(newSize);
- }
+ ByteBuffer newBuf = allocate(newSize, buf.isDirect());
buf.flip();
newBuf.put(buf);
buf = newBuf;
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
new file mode 100644
index 0000000..0d45287
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestBoundedByteBufferPool.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestBoundedByteBufferPool {
+ final int maxByteBufferSizeToCache = 10;
+ final int initialByteBufferSize = 1;
+ final int maxToCache = 10;
+ BoundedByteBufferPool reservoir;
+
+ @Before
+ public void before() {
+ this.reservoir =
+ new BoundedByteBufferPool(maxByteBufferSizeToCache, initialByteBufferSize, maxToCache);
+ }
+
+ @After
+ public void after() {
+ this.reservoir = null;
+ }
+
+ @Test
+ public void testEquivalence() {
+ ByteBuffer bb = ByteBuffer.allocate(1);
+ this.reservoir.putBuffer(bb);
+ this.reservoir.putBuffer(bb);
+ this.reservoir.putBuffer(bb);
+ assertEquals(3, this.reservoir.buffers.size());
+ }
+
+ @Test
+ public void testGetPut() {
+ ByteBuffer bb = this.reservoir.getBuffer();
+ assertEquals(initialByteBufferSize, bb.capacity());
+ assertEquals(0, this.reservoir.buffers.size());
+ this.reservoir.putBuffer(bb);
+ assertEquals(1, this.reservoir.buffers.size());
+ // Now remove a buffer and don't put it back so reservoir is empty.
+ this.reservoir.getBuffer();
+ assertEquals(0, this.reservoir.buffers.size());
+ // Try adding in a buffer with a bigger-than-initial size and see if our runningAverage works.
+ // Need to add then remove, then get a new bytebuffer so reservoir internally is doing
+ // allocation
+ final int newCapacity = 2;
+ this.reservoir.putBuffer(ByteBuffer.allocate(newCapacity));
+ assertEquals(1, reservoir.buffers.size());
+ this.reservoir.getBuffer();
+ assertEquals(0, this.reservoir.buffers.size());
+ bb = this.reservoir.getBuffer();
+ assertEquals(newCapacity, bb.capacity());
+ // Assert that adding a too-big buffer won't happen
+ assertEquals(0, this.reservoir.buffers.size());
+ this.reservoir.putBuffer(ByteBuffer.allocate(maxByteBufferSizeToCache * 2));
+ assertEquals(0, this.reservoir.buffers.size());
+ // Assert we can't add more than max allowed instances.
+ for (int i = 0; i < maxToCache; i++) {
+ this.reservoir.putBuffer(ByteBuffer.allocate(initialByteBufferSize));
+ }
+ assertEquals(maxToCache, this.reservoir.buffers.size());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 064771c..89f1be3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -267,6 +268,9 @@ public class RpcServer implements RpcServerInterface {
private UserProvider userProvider;
+ private final BoundedByteBufferPool reservoir;
+
+
/**
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
@@ -293,6 +297,7 @@ public class RpcServer implements RpcServerInterface {
protected long size; // size of current call
protected boolean isError;
protected TraceInfo tinfo;
+ private ByteBuffer cellBlock = null;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
@@ -313,6 +318,19 @@ public class RpcServer implements RpcServerInterface {
this.tinfo = tinfo;
}
+ /**
+ * Call is done. Execution happened and we returned results to client. It is now safe to
+ * cleanup.
+ */
+ void done() {
+ if (this.cellBlock != null) {
+ // Return buffer to reservoir now we are done with it.
+ reservoir.putBuffer(this.cellBlock);
+ this.cellBlock = null;
+ }
+ this.connection.decRpcCount(); // Say that we're done with this call.
+ }
+
@Override
public String toString() {
return toShortString() + " param: " +
@@ -375,12 +393,17 @@ public class RpcServer implements RpcServerInterface {
// Set the exception as the result of the method invocation.
headerBuilder.setException(exceptionBuilder.build());
}
- ByteBuffer cellBlock =
- ipcUtil.buildCellBlock(this.connection.codec, this.connection.compressionCodec, cells);
- if (cellBlock != null) {
+ // Get a bb from the reservoir and pass it to buildCellBlock. What comes back will be the
+ // passed in reservoir bb or a resized one that we should instead add back to the reservoir
+ // when done. Keep reference so can add it back to the reservoir when finished. This is
+ // hacky and the hack is not contained but benefits are high when we can avoid a big buffer
+ // allocation on each rpc.
+ this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
+ this.connection.compressionCodec, cells, reservoir.getBuffer());
+ if (this.cellBlock != null) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
- cellBlockBuilder.setLength(cellBlock.limit());
+ cellBlockBuilder.setLength(this.cellBlock.limit());
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
@@ -390,9 +413,9 @@ public class RpcServer implements RpcServerInterface {
ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
- (cellBlock == null? 0: cellBlock.limit());
+ (this.cellBlock == null? 0: this.cellBlock.limit());
ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
- bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
+ bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
if (connection.useWrap) {
bc = wrapWithSasl(bc);
}
@@ -1051,7 +1074,7 @@ public class RpcServer implements RpcServerInterface {
}
if (!call.response.hasRemaining()) {
- call.connection.decRpcCount(); // Say that we're done with this call.
+ call.done();
return true;
} else {
return false; // Socket can't take more, we will have to come back.
@@ -1885,7 +1908,13 @@ public class RpcServer implements RpcServerInterface {
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler)
throws IOException {
-
+ this.reservoir = new BoundedByteBufferPool(
+ conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
+ conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
+ // Make the max twice the number of handlers to be safe.
+ conf.getInt("hbase.ipc.server.reservoir.initial.max",
+ conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
this.server = server;
this.services = services;
this.bindAddress = bindAddress;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
new file mode 100644
index 0000000..55f8dda
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferOutputStream.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestByteBufferOutputStream {
+ @Test
+ public void testByteBufferReuse() throws IOException {
+ byte [] someBytes = Bytes.toBytes("some bytes");
+ ByteBuffer bb = ByteBuffer.allocate(someBytes.length);
+ ByteBuffer bbToReuse = write(bb, someBytes);
+ bbToReuse = write(bbToReuse, Bytes.toBytes("less"));
+ assertTrue(bb == bbToReuse);
+ }
+
+ private ByteBuffer write(final ByteBuffer bb, final byte [] bytes) throws IOException {
+ try (ByteBufferOutputStream bbos = new ByteBufferOutputStream(bb)) {
+ bbos.write(bytes);
+ assertTrue(Bytes.compareTo(bytes, bbos.toByteArray(0, bytes.length)) == 0);
+ bbos.flush();
+ return bbos.getByteBuffer();
+ }
+ }
+}
\ No newline at end of file