diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index d98d81d..c08feb8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -33,10 +33,10 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.PoolAwareByteBuffersOutputStream; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.compress.CodecPool; @@ -95,48 +95,26 @@ public class IPCUtil { public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, final CellScanner cellScanner) throws IOException { - return buildCellBlock(codec, compressor, cellScanner, null); - } - - /** - * Puts CellScanner Cells into a cell block using passed in codec and/or - * compressor. - * @param codec - * @param compressor - * @param cellScanner - * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate - * our own ByteBuffer. - * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using - * passed in codec and/or compressor; the returned buffer has been - * flipped and is ready for reading. Use limit to find total size. If pool was not - * null, then this returned ByteBuffer came from there and should be returned to the pool when - * done. - * @throws IOException - */ - @SuppressWarnings("resource") - public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, - final CellScanner cellScanner, final BoundedByteBufferPool pool) - throws IOException { if (cellScanner == null) return null; if (codec == null) throw new CellScannerButNoCodecException(); int bufferSize = this.cellBlockBuildingInitialBufferSize; - ByteBufferOutputStream baos = null; - if (pool != null) { - ByteBuffer bb = pool.getBuffer(); - bufferSize = bb.capacity(); - baos = new ByteBufferOutputStream(bb); - } else { - // Then we need to make our own to return. - if (cellScanner instanceof HeapSize) { - long longSize = ((HeapSize)cellScanner).heapSize(); - // Just make sure we don't have a size bigger than an int. - if (longSize > Integer.MAX_VALUE) { - throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE); - } - bufferSize = ClassSize.align((int)longSize); + ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize); + encodeCellsTo(baos, cellScanner, codec, compressor); + if (LOG.isTraceEnabled()) { + if (bufferSize < baos.size()) { + LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + + "; up hbase.ipc.cellblock.building.initial.buffersize?"); } - baos = new ByteBufferOutputStream(bufferSize); } + ByteBuffer bb = baos.getByteBuffer(); + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + if (!bb.hasRemaining()) return null; + return bb; + } + + private void encodeCellsTo(ByteBufferOutputStream baos, CellScanner cellScanner, Codec codec, + CompressionCodec compressor) throws IOException { OutputStream os = baos; Compressor poolCompressor = null; try { @@ -146,28 +124,43 @@ public class IPCUtil { os = compressor.createOutputStream(os, poolCompressor); } Codec.Encoder encoder = codec.getEncoder(os); - int count = 0; while (cellScanner.advance()) { encoder.write(cellScanner.current()); - count++; } encoder.flush(); - // If no cells, don't mess around. Just return null (could be a bunch of existence checking - // gets or something -- stuff that does not return a cell). - if (count == 0) return null; } catch (BufferOverflowException e) { throw new DoNotRetryIOException(e); } finally { - os.close(); if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); } - if (LOG.isTraceEnabled()) { - if (bufferSize < baos.size()) { - LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() + - "; up hbase.ipc.cellblock.building.initial.buffersize?"); - } - } - return baos.getByteBuffer(); + } + + /** + * Puts CellScanner Cells into a cell block using passed in codec and/or + * compressor. + * @param codec + * @param compressor + * @param cellScanner + * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate + * our own ByteBuffer. + * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using + * passed in codec and/or compressor; the returned buffer has been + * flipped and is ready for reading. Use limit to find total size. If pool was not + * null, then this returned ByteBuffer came from there and should be returned to the pool when + * done. + * @throws IOException + */ + @SuppressWarnings("resource") + public PoolAwareByteBuffersOutputStream buildCellBlockStream(final Codec codec, + final CompressionCodec compressor, final CellScanner cellScanner, final ByteBufferPool pool) + throws IOException { + if (cellScanner == null) return null; + if (codec == null) throw new CellScannerButNoCodecException(); + assert pool != null; + PoolAwareByteBuffersOutputStream pbbos = new PoolAwareByteBuffersOutputStream(pool); + encodeCellsTo(pbbos, cellScanner, codec, compressor); + if (pbbos.size() == 0) return null; + return pbbos; } /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java index d4bda18..e22d72d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java @@ -44,7 +44,11 @@ public class ByteBufferOutputStream extends OutputStream // http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; - protected ByteBuffer buf; + protected ByteBuffer curBuf; + + ByteBufferOutputStream() { + + } public ByteBufferOutputStream(int capacity) { this(capacity, false); @@ -66,12 +70,12 @@ public class ByteBufferOutputStream extends OutputStream */ public ByteBufferOutputStream(final ByteBuffer bb) { assert bb.order() == ByteOrder.BIG_ENDIAN; - this.buf = bb; - this.buf.clear(); + this.curBuf = bb; + this.curBuf.clear(); } public int size() { - return buf.position(); + return curBuf.position(); } private static ByteBuffer allocate(final int capacity, final boolean useDirectByteBuffer) { @@ -86,25 +90,25 @@ public class ByteBufferOutputStream extends OutputStream * @return ByteBuffer */ public ByteBuffer getByteBuffer() { - buf.flip(); - return buf; + curBuf.flip(); + return curBuf; } - private void checkSizeAndGrow(int extra) { - long capacityNeeded = buf.position() + (long) extra; - if (capacityNeeded > buf.limit()) { + protected void checkSizeAndGrow(int extra) { + long capacityNeeded = curBuf.position() + (long) extra; + if (capacityNeeded > curBuf.limit()) { // guarantee it's possible to fit if (capacityNeeded > MAX_ARRAY_SIZE) { throw new BufferOverflowException(); } // double until hit the cap - long nextCapacity = Math.min(buf.capacity() * 2L, MAX_ARRAY_SIZE); + long nextCapacity = Math.min(curBuf.capacity() * 2L, MAX_ARRAY_SIZE); // but make sure there is enough if twice the existing capacity is still too small nextCapacity = Math.max(nextCapacity, capacityNeeded); - ByteBuffer newBuf = allocate((int) nextCapacity, buf.isDirect()); - buf.flip(); - ByteBufferUtils.copyFromBufferToBuffer(buf, newBuf); - buf = newBuf; + ByteBuffer newBuf = allocate((int) nextCapacity, curBuf.isDirect()); + curBuf.flip(); + ByteBufferUtils.copyFromBufferToBuffer(curBuf, newBuf); + curBuf = newBuf; } } @@ -112,7 +116,7 @@ public class ByteBufferOutputStream extends OutputStream @Override public void write(int b) throws IOException { checkSizeAndGrow(Bytes.SIZEOF_BYTE); - buf.put((byte)b); + curBuf.put((byte)b); } /** @@ -122,9 +126,9 @@ public class ByteBufferOutputStream extends OutputStream * @param out the output stream to which to write the data. * @exception IOException if an I/O error occurs. */ - public synchronized void writeTo(OutputStream out) throws IOException { + public void writeTo(OutputStream out) throws IOException { WritableByteChannel channel = Channels.newChannel(out); - ByteBuffer bb = buf.duplicate(); + ByteBuffer bb = curBuf.duplicate(); bb.flip(); channel.write(bb); } @@ -137,12 +141,12 @@ public class ByteBufferOutputStream extends OutputStream @Override public void write(byte[] b, int off, int len) throws IOException { checkSizeAndGrow(len); - ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len); + ByteBufferUtils.copyFromArrayToBuffer(curBuf, b, off, len); } public void write(ByteBuffer b, int off, int len) throws IOException { checkSizeAndGrow(len); - ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len); + ByteBufferUtils.copyFromBufferToBuffer(b, curBuf, off, len); } /** @@ -153,7 +157,7 @@ public class ByteBufferOutputStream extends OutputStream */ public void writeInt(int i) throws IOException { checkSizeAndGrow(Bytes.SIZEOF_INT); - ByteBufferUtils.putInt(this.buf, i); + ByteBufferUtils.putInt(this.curBuf, i); } @Override @@ -167,7 +171,7 @@ public class ByteBufferOutputStream extends OutputStream } public byte[] toByteArray(int offset, int length) { - ByteBuffer bb = buf.duplicate(); + ByteBuffer bb = curBuf.duplicate(); bb.flip(); byte[] chunk = new byte[length]; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java new file mode 100644 index 0000000..e774c8a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. + * This pool keeps an upper bound on the count of ByteBuffers in the pool and on the maximum size + * of ByteBuffer that it will retain (Hence the pool is 'bounded' as opposed to, say, + * Hadoop's ElasticByteBuffferPool). + * If a ByteBuffer is bigger than the configured threshold, we will just let the ByteBuffer go + * rather than add it to the pool. If more ByteBuffers than the configured maximum instances, + * we will not add the passed ByteBuffer to the pool; we will just drop it + * (we will log a WARN in this case that we are at capacity). + * + *

The intended use case is a reservoir of bytebuffers that an RPC can reuse; buffers tend to + * achieve a particular 'run' size over time give or take a few extremes. Set TRACE level on this + * class for a couple of seconds to get reporting on how it is running when deployed. + * + *

This pool returns off heap ByteBuffers. + * + *

This class is thread safe. + */ +@InterfaceAudience.Private +public class ByteBufferPool { + private static final Log LOG = LogFactory.getLog(ByteBufferPool.class); + + private final Queue buffers = new ConcurrentLinkedQueue(); + + private final int bufferSize; + private final int maxPoolSize; + private AtomicInteger count; + + /** + * @param maxByteBufferSizeToCache + * @param initialByteBufferSize + * @param maxToCache + */ + public ByteBufferPool(final int bufferSize, final int maxPoolSize) { + this.bufferSize = bufferSize; + this.maxPoolSize = maxPoolSize; + // TODO can add initialPoolSize param also and make those many BBs ready for use. + if (LOG.isDebugEnabled()) { + LOG.debug("Created ByteBufferPool with bufferSize : " + bufferSize + " and maxPoolSize : " + + maxPoolSize); + } + this.count = new AtomicInteger(0); + } + + /** + * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the + * maximum pool size, it will create a new one and return. In case of max pool size also + * reached, will return null. When pool returned a ByteBuffer, make sure to return it back + * to pool after use. + * @see #putBuffer(ByteBuffer) + */ + public ByteBuffer getBuffer() { + ByteBuffer bb = buffers.poll(); + if (bb != null) { + // Clear sets limit == capacity. Position == 0. + bb.clear(); + return bb; + } + while (true) { + int c = this.count.intValue(); + if (c >= this.maxPoolSize) { + return null; + } + if (!this.count.compareAndSet(c, c + 1)) { + continue; + } + return ByteBuffer.allocateDirect(this.bufferSize); + } + } + + public void putBuffer(ByteBuffer bb) { + if (bb.capacity() != this.bufferSize || !bb.isDirect() + || this.buffers.size() >= this.maxPoolSize) { + LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); + return; + } + buffers.offer(bb); + } + + int getBufferSize() { + return this.bufferSize; + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/PoolAwareByteBuffersOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/PoolAwareByteBuffersOutputStream.java new file mode 100644 index 0000000..d49fd60 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/PoolAwareByteBuffersOutputStream.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Not thread safe! + */ +@InterfaceAudience.Private +public class PoolAwareByteBuffersOutputStream extends ByteBufferOutputStream { + + private ByteBufferPool pool; + // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If + // it is not available will make a new one our own and keep writing to that. We keep track of all + // the BBs that we got from pool, separately so that on close we can make sure to return back all + // of them to pool + private List allBufs = new ArrayList(); + private List bufsFromPool = new ArrayList(); + + private boolean writeStopped = false; + + public PoolAwareByteBuffersOutputStream(ByteBufferPool pool) { + this.pool = pool; + allocateNewBuffer(); + } + + private void allocateNewBuffer() { + // Get an initial BB to work with from the pool + this.curBuf = this.pool.getBuffer(); + if (this.curBuf == null) { + // No free BB at this moment. Make a new one. The pool returns us off heap BBs. Dont make off + // heap BB on demand. It is difficult to account for all such and so proper sizing of Max + // direct heap size. See HBASE-15525 also for more details. + // Make BB with same size of pool's buffer size. + this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize()); + } else { + this.bufsFromPool.add(this.curBuf); + } + this.allBufs.add(this.curBuf); + } + + @Override + public int size() { + int s = 0; + for (int i = 0; i < this.allBufs.size() - 1; i++) { + s += this.allBufs.get(i).remaining(); + } + // On the last BB, it might not be flipped yet if getByteBuffers is not yet called + if (this.writeStopped) { + s += this.curBuf.remaining(); + } else { + s += this.curBuf.position(); + } + return s; + } + + @Override + public ByteBuffer getByteBuffer() { + throw new UnsupportedOperationException("This stream is not backed by a single ByteBuffer"); + } + + @Override + protected void checkSizeAndGrow(int extra) { + long capacityNeeded = curBuf.position() + (long) extra; + if (capacityNeeded > curBuf.limit()) { + this.curBuf.flip();// On the current buf set limit = pos and pos = 0. + allocateNewBuffer(); + } + } + + @Override + public void writeTo(OutputStream out) throws IOException { + // No usage of this API in code. Just making it as an Unsupported operation as of now + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + // Return back all the BBs to pool + if (this.bufsFromPool != null) { + for (int i = 0; i < this.bufsFromPool.size(); i++) { + this.pool.putBuffer(this.bufsFromPool.get(i)); + } + this.bufsFromPool = null; + } + this.allBufs = null; + this.curBuf = null; + } + + @Override + public byte[] toByteArray(int offset, int length) { + return null;// TODO + } + + public List getByteBuffers() { + this.writeStopped = true; + // All the other BBs are already flipped while moving to the new BB. + curBuf.flip(); + return this.allBufs; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index babd2f8..39efa40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -46,6 +46,13 @@ class BufferChain { this.buffers = bbs.toArray(new ByteBuffer[bbs.size()]); } + BufferChain(List buffers) { + for (ByteBuffer b : buffers) { + this.remaining += b.remaining(); + } + this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]); + } + /** * Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This * call drains this instance; it cannot be used subsequent to the call. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 2c6084a..052a686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -83,9 +83,10 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; -import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.PoolAwareByteBuffersOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -281,7 +282,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private UserProvider userProvider; - private final BoundedByteBufferPool reservoir; + private final ByteBufferPool reservoir; private volatile boolean allowFallbackToSimpleAuth; @@ -309,7 +310,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected long size; // size of current call protected boolean isError; protected TraceInfo tinfo; - private ByteBuffer cellBlock = null; + private PoolAwareByteBuffersOutputStream cellBlockStream = null; private User user; private InetAddress remoteAddress; @@ -350,10 +351,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", justification="Presume the lock on processing request held by caller is protection enough") void done() { - if (this.cellBlock != null && reservoir != null) { - // Return buffer to reservoir now we are done with it. - reservoir.putBuffer(this.cellBlock); - this.cellBlock = null; + if (this.cellBlockStream != null) { + this.cellBlockStream.close();// The close will return back the BBs which we got from pool. + this.cellBlockStream = null; } this.connection.decRpcCount(); // Say that we're done with this call. } @@ -405,38 +405,43 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Call id. headerBuilder.setCallId(this.id); if (t != null) { - ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); - exceptionBuilder.setExceptionClassName(t.getClass().getName()); - exceptionBuilder.setStackTrace(errorMsg); - exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); - if (t instanceof RegionMovedException) { - // Special casing for this exception. This is only one carrying a payload. - // Do this instead of build a generic system for allowing exceptions carry - // any kind of payload. - RegionMovedException rme = (RegionMovedException)t; - exceptionBuilder.setHostname(rme.getHostname()); - exceptionBuilder.setPort(rme.getPort()); - } - // Set the exception as the result of the method invocation. - headerBuilder.setException(exceptionBuilder.build()); + setExceptionResponse(t, errorMsg, headerBuilder); } // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the // reservoir when finished. This is hacky and the hack is not contained but benefits are // high when we can avoid a big buffer allocation on each rpc. - this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec, - this.connection.compressionCodec, cells, reservoir); - if (this.cellBlock != null) { + List cellBlock = null; + int cellBlockSize = 0; + if (reservoir != null) { + this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec, + this.connection.compressionCodec, cells, reservoir); + if (this.cellBlockStream != null) { + cellBlock = this.cellBlockStream.getByteBuffers(); + cellBlockSize = this.cellBlockStream.size(); + } + } else { + ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec, + this.connection.compressionCodec, cells); + if (b != null) { + cellBlockSize = b.remaining(); + cellBlock = new ArrayList(1); + cellBlock.add(b); + } + } + + if (cellBlockSize > 0) { CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. - cellBlockBuilder.setLength(this.cellBlock.limit()); + cellBlockBuilder.setLength(cellBlockSize); headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - - byte[] b = createHeaderAndMessageBytes(result, header); - - bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock); - + byte[] b = createHeaderAndMessageBytes(result, header, cellBlockSize); + List responseBufs = new ArrayList( + (cellBlock == null ? 0 : cellBlock.size()) + 1); + responseBufs.add(ByteBuffer.wrap(b)); + if (cellBlock != null) responseBufs.addAll(cellBlock); + bc = new BufferChain(responseBufs); if (connection.useWrap) { bc = wrapWithSasl(bc); } @@ -456,7 +461,25 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } - private byte[] createHeaderAndMessageBytes(Message result, Message header) + private void setExceptionResponse(Throwable t, String errorMsg, + ResponseHeader.Builder headerBuilder) { + ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); + exceptionBuilder.setExceptionClassName(t.getClass().getName()); + exceptionBuilder.setStackTrace(errorMsg); + exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); + if (t instanceof RegionMovedException) { + // Special casing for this exception. This is only one carrying a payload. + // Do this instead of build a generic system for allowing exceptions carry + // any kind of payload. + RegionMovedException rme = (RegionMovedException)t; + exceptionBuilder.setHostname(rme.getHostname()); + exceptionBuilder.setPort(rme.getPort()); + } + // Set the exception as the result of the method invocation. + headerBuilder.setException(exceptionBuilder.build()); + } + + private byte[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. @@ -473,7 +496,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // calculate the total size int totalSize = headerSerializedSize + headerVintSize + (resultSerializedSize + resultVintSize) - + (this.cellBlock == null ? 0 : this.cellBlock.limit()); + + cellBlockSize; // The byte[] should also hold the totalSize of the header, message and the cellblock byte[] b = new byte[headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize + Bytes.SIZEOF_INT]; @@ -2054,10 +2077,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { RpcScheduler scheduler) throws IOException { if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) { - this.reservoir = new BoundedByteBufferPool( - conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024), - conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024), - // Make the max twice the number of handlers to be safe. + // TODO rename these configs? + this.reservoir = new ByteBufferPool( + conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 64 * 1024), conf.getInt("hbase.ipc.server.reservoir.initial.max", conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));