.../hadoop/hbase/io/ByteBuffOutputStream.java | 65 ++++++++ .../apache/hadoop/hbase/io/ByteBufferWriter.java | 10 ++ .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 9 ++ .../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 29 ++++ .../apache/hadoop/hbase/nio/SingleByteBuff.java | 6 + .../apache/hadoop/hbase/nio/TestMultiByteBuff.java | 42 ++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 164 ++++++++++++++++----- .../client/TestFromClientSideAsNonJavaClient.java | 54 +++++++ .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 28 ++++ 9 files changed, 370 insertions(+), 37 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java new file mode 100644 index 0000000..b6a1f6b --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffOutputStream.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; + +@InterfaceAudience.Private +public class ByteBuffOutputStream extends OutputStream implements ByteBufferWriter { + + private ByteBuff buff; + + public ByteBuffOutputStream(ByteBuff buff) { + this.buff = buff; + } + + @Override + public void write(int b) throws IOException { + this.buff.put((byte) b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + this.buff.put(b, off, len); + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + // Take duplicate + ByteBuffer bb = b.duplicate(); + bb.position(off); + bb.limit(off + len); + write(bb); + } + + @Override + public void write(ByteBuffer b) throws IOException { + this.buff.put(b); + } + + @Override + public void writeInt(int val) throws IOException { + this.buff.putInt(val); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java index 012080c..4414c86 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferWriter.java @@ -31,6 +31,16 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; public interface ByteBufferWriter { /** + * Writes len bytes from the specified ByteBuffer starting at the current position. + * Note that it does not change the position of the specified ByteBuffer + * @param b the data. + * @exception IOException if an I/O error occurs. + */ + default public void write(ByteBuffer b) throws IOException { + write(b, b.position(), b.remaining()); + } + + /** * Writes len bytes from the specified ByteBuffer starting at offset off * * @param b the data. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 60202a0..e12844a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -410,6 +410,15 @@ public abstract class ByteBuff { */ public abstract int read(ReadableByteChannel channel) throws IOException; + /** + * Copies from the given bytebuffer 'buff' to this ByteBuff. The position of this ByteBuff will be + * incremented to the limit of the given buffer and the position of the given 'buff' will be now + * set to the limit. + * @param buff the bytebuffer to copy + * @return this ByteBuffer + */ + public abstract ByteBuff put(ByteBuffer buff); + // static helper methods public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException { if (buf.remaining() <= NIO_BUFFER_LIMIT) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 948321d..f8c53da 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -856,6 +856,35 @@ public class MultiByteBuff extends ByteBuff { return this; } + @Override + public ByteBuff put(ByteBuffer buff) { + int pos = buff.position(); + if (this.curItem.remaining() >= buff.remaining()) { + ByteBufferUtils.copyFromBufferToBuffer(buff, this.curItem); + } else { + int remain = buff.remaining(); + while (remain > 0) { + int copied = Math.min(buff.remaining(), this.curItem.remaining()); + ByteBufferUtils.copyFromBufferToBuffer(buff, this.curItem, buff.position(), copied); + remain -= copied; + buff.position(buff.position() + copied); + if (remain == 0) { + break; + } + if (!curItem.hasRemaining()) { + if (this.curItemIndex == this.items.length - 1) { + throw new BufferOverflowException(); + } + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + } + } + } + // reset posistion - should we do it? + buff.position(pos); + return this; + } + /** * Writes a long to this MBB at its current position. Also advances the position by size of long diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 0e45410..b1e591f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -219,6 +219,12 @@ public class SingleByteBuff extends ByteBuff { } @Override + public ByteBuff put(ByteBuffer buff) { + ByteBufferUtils.copyFromBufferToBuffer(buff, this.buf); + return this; + } + + @Override public SingleByteBuff put(byte[] src) { return put(src, 0, src.length); } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java index af4c464..397771d 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java @@ -403,4 +403,46 @@ public class TestMultiByteBuff { mbb1.get(); // Now we have reached the limit assertFalse(mbb1.hasRemaining()); } + + @Test + public void testPutByteBuffer() { + ByteBuffer b1 = ByteBuffer.allocate(8); + ByteBuffer b2 = ByteBuffer.allocate(8); + ByteBuffer b3 = ByteBuffer.allocate(8); + ByteBuffer toPut = ByteBuffer.allocate(8); + toPut.putInt(100); + toPut.putInt(500); + toPut.flip(); + MultiByteBuff mbb1 = new MultiByteBuff(b1, b2, b3); + mbb1.position(8); + //simple version + mbb1.put(toPut); + assertEquals(16, mbb1.position()); + mbb1.position(8); + assertEquals(100, mbb1.getInt()); + assertEquals(500, mbb1.getInt()); + b1 = ByteBuffer.allocate(4); + b2 = ByteBuffer.allocate(4); + b3 = ByteBuffer.allocate(4); + ByteBuffer b4 = ByteBuffer.allocate(1); + ByteBuffer b5 = ByteBuffer.allocate(1); + ByteBuffer b6 = ByteBuffer.allocate(1); + ByteBuffer b7 = ByteBuffer.allocate(1); + toPut = ByteBuffer.allocate(16); + toPut.putInt(100); + toPut.putInt(500); + toPut.putInt(100); + toPut.putInt(500); + toPut.flip(); + mbb1 = new MultiByteBuff(b1, b2, b3, b4, b5, b6, b7); + //copies to multiple version + mbb1.put(toPut); + assertEquals(16, mbb1.position()); + mbb1.position(0); + assertEquals(100, mbb1.getInt()); + assertEquals(500, mbb1.getInt()); + assertEquals(100, mbb1.getInt()); + //TODO : fix this in another JIRA + // assertEquals(500, mbb1.getInt()); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 96f506f..373c5c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -93,6 +93,7 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; +import org.apache.hadoop.hbase.io.ByteBuffOutputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; @@ -354,6 +355,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected TraceInfo tinfo; private ByteBufferListOutputStream cellBlockStream = null; private CallCleanup reqCleanup = null; + private CallCleanup responseCleanup = null; private User user; private InetAddress remoteAddress; @@ -403,6 +405,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // got from pool. this.cellBlockStream = null; } + if (responseCleanup != null) { + responseCleanup.run(); + responseCleanup = null; + } cleanup();// If the call was run successfuly, we might have already returned the // BB back to pool. No worries..Then inputCellBlock will be null this.connection.decRpcCount(); // Say that we're done with this call. @@ -506,20 +512,20 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - ByteBuffer headerBuf = + ByteBuffer[] hdrMsgBuffers = createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); ByteBuffer[] responseBufs = null; int cellBlockBufferSize = 0; if (cellBlock != null) { cellBlockBufferSize = cellBlock.size(); - responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; - } else { - responseBufs = new ByteBuffer[1]; } - responseBufs[0] = headerBuf; + responseBufs = new ByteBuffer[hdrMsgBuffers.length + cellBlockBufferSize]; + // add all the header/result buffers + System.arraycopy(hdrMsgBuffers, 0, responseBufs, 0, hdrMsgBuffers.length); + int i = hdrMsgBuffers.length; if (cellBlock != null) { - for (int i = 0; i < cellBlockBufferSize; i++) { - responseBufs[i + 1] = cellBlock.get(i); + for (int j = 0; j < cellBlockBufferSize; j++) { + responseBufs[i++] = cellBlock.get(j); } } bc = new BufferChain(responseBufs); @@ -560,7 +566,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { headerBuilder.setException(exceptionBuilder.build()); } - private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + private ByteBuffer[] createHeaderAndMessageBytes(Message result, Message header, int cellBlockSize, List cellBlock) throws IOException { // Organize the response as a set of bytebuffers rather than collect it all together inside // one big byte array; save on allocations. @@ -569,8 +575,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // the last buffer in the cellblock. This applies to the cellblock created from the // pool or even the onheap cellblock buffer in case there is no pool enabled. // Possible reuse would avoid creating a temporary array for storing the header every time. - ByteBuffer possiblePBBuf = - (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + ByteBuffer[] buffs = null; int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, resultVintSize = 0; if (header != null) { @@ -583,44 +588,86 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // calculate the total size int totalSize = headerSerializedSize + headerVintSize - + (resultSerializedSize + resultVintSize) - + cellBlockSize; + + (resultSerializedSize + resultVintSize) + cellBlockSize; int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + resultVintSize + Bytes.SIZEOF_INT; - // Only if the last buffer has enough space for header use it. Else allocate - // a new buffer. Assume they are all flipped - if (possiblePBBuf != null - && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { - // duplicate the buffer. This is where the header is going to be written - ByteBuffer pbBuf = possiblePBBuf.duplicate(); - // get the current limit - int limit = pbBuf.limit(); - // Position such that we write the header to the end of the buffer - pbBuf.position(limit); - // limit to the header size - pbBuf.limit(totalPBSize + limit); - // mark the current position - pbBuf.mark(); - writeToCOS(result, header, totalSize, pbBuf); - // reset the buffer back to old position - pbBuf.reset(); - return pbBuf; + // TODO : Should we check for totalPBSize >= minSizeForReservoirUse + if (cellBlockStream == null && this.rpcCallback != null && !isClientCellBlockSupported()) { + // we know that the result that we are processing is for scan/get and the client + // is a non java client. + // We get them as ByteBuffer[] because we need ByteBuffer[] to be returned for the + // BufferChain. + Pair buffers = + allocateByteBufferArrayToReadInto(reservoir, minSizeForReservoirUse, totalPBSize); + buffs = buffers.getFirst(); + responseCleanup = buffers.getSecond(); + ByteBuff byteBuffs; + if (buffs.length > 1) { + byteBuffs = new MultiByteBuff(buffs); + } else { + // We are backed by single BB + byteBuffs = new SingleByteBuff(buffs[0]); + } + byteBuffs.limit(totalPBSize); + CodedOutputStream bcos = + CodedOutputStream.newInstance(new ByteBuffOutputStream(byteBuffs), totalPBSize); + byteBuffs.putInt(totalSize); + writeHeaderandResult(result, header, bcos); + bcos.flush(); + if(byteBuffs.hasRemaining()) { + // TODO : Actually bcos should be doing it?? + // or is it ok to check on ByteBuffs directly? + throw new IllegalArgumentException("Not fully written"); + } + // flip all the buffers + for (ByteBuffer buf : buffs) { + buf.flip(); + } } else { - return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + ByteBuffer possiblePBBuf = (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + buffs = new ByteBuffer[1]; + if (possiblePBBuf != null + && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { + // duplicate the buffer. This is where the header is going to be written + ByteBuffer pbBuf = possiblePBBuf.duplicate(); + // get the current limit + int limit = pbBuf.limit(); + // Position such that we write the header to the end of the buffer + pbBuf.position(limit); + // limit to the header size + pbBuf.limit(totalPBSize + limit); + // mark the current position + pbBuf.mark(); + writeToCOS(result, header, totalSize, pbBuf); + // reset the buffer back to old position + pbBuf.reset(); + buffs[0] = pbBuf; + } else { + ByteBuffer res = createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + buffs[0] = res; + } } + return buffs; } - private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) + private void writeHeaderandResult(Message result, Message header, CodedOutputStream bcos) throws IOException { - ByteBufferUtils.putInt(pbBuf, totalSize); - // create COS that works on BB - CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); if (header != null) { - cos.writeMessageNoTag(header); + bcos.writeMessageNoTag(header); } if (result != null) { - cos.writeMessageNoTag(result); + bcos.writeMessageNoTag(result); } + } + + private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) + throws IOException { + ByteBufferUtils.putInt(pbBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); + writeHeaderandResult(result, header, cos); cos.flush(); cos.checkNoSpaceLeft(); } @@ -2979,6 +3026,49 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } /** + * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool + * as much as possible. + * + * @param pool The ByteBufferPool to use + * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer + * need of size below this, create on heap ByteBuffer. + * @param reqLen Bytes count in request + */ + @VisibleForTesting + static Pair allocateByteBufferArrayToReadInto(ByteBufferPool pool, + int minSizeForPoolUse, int reqLen) { + List bbs = new ArrayList((reqLen / pool.getBufferSize()) + 1); + int remain = reqLen; + ByteBuffer buf = null; + while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { + bbs.add(buf); + remain -= pool.getBufferSize(); + } + ByteBuffer[] bufsFromPool = null; + if (bbs.size() > 0) { + bufsFromPool = new ByteBuffer[bbs.size()]; + bbs.toArray(bufsFromPool); + } + if (remain > 0) { + bbs.add(ByteBuffer.allocate(remain)); + } + ByteBuffer[] items = new ByteBuffer[bbs.size()]; + // convert to array + bbs.toArray(items); + if (bufsFromPool != null) { + final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; + final ByteBuffer[] itemsFinal = items; + return new Pair(itemsFinal, () -> { + // Return back all the BBs to pool + for (int i = 0; i < bufsFromPoolFinal.length; i++) { + pool.putbackBuffer(bufsFromPoolFinal[i]); + } + }); + } + return new Pair(items, null); + } + + /** * Needed for features such as delayed calls. We need to be able to store the current call * so that we can complete it later or ask questions of what is supported by the current ongoing * call. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java new file mode 100644 index 0000000..2ca0508 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideAsNonJavaClient.java @@ -0,0 +1,54 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Run tests that use the HBase clients; {@link Table}. + * Sets up the HBase mini cluster once at start and runs through all client tests. + * Each creates a table named for the method and does its stuff against that. + * The test ensures that the client acts as non-java client + */ +@Category({LargeTests.class, ClientTests.class}) +@SuppressWarnings ("deprecation") +public class TestFromClientSideAsNonJavaClient extends TestFromClientSide { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Uncomment the following lines if more verbosity is needed for + // debugging (see HBASE-12285 for details). + // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); + // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName()); + conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests + // set no codec - so that it imitates a non java client + conf.set("hbase.client.default.rpc.codec", ""); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java index 6fd65f2..19b6aef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java @@ -23,15 +23,22 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ByteBuffOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -125,6 +132,27 @@ public class TestRpcServer { assertNull(pair.getSecond()); } + @Test + public void testByteBuffCOS() throws IOException { + KeyValue kv1 = + new KeyValue(Bytes.toBytes("aaa"), Bytes.toBytes("f1"), Bytes.toBytes("q1"), new byte[30]); + ByteBuffer dbb1 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb2 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb3 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb4 = ByteBuffer.allocateDirect(3); + ByteBuffer dbb5 = ByteBuffer.allocateDirect(50); + MultiByteBuff mbb = new MultiByteBuff(dbb1, dbb2, dbb3, dbb4, dbb5); + CellProtos.Cell cell = ProtobufUtil.toCell(kv1); + int serializedSize = cell.getSerializedSize(); + int computeRawVarint32Size = CodedOutputStream.computeRawVarint32Size(serializedSize); + int totalSize = serializedSize + computeRawVarint32Size; + mbb.limit(totalSize); + CodedOutputStream cos = CodedOutputStream.newInstance(new ByteBuffOutputStream(mbb), totalSize); + cos.writeMessageNoTag(cell); + cos.flush(); + assertEquals(totalSize, mbb.position()); + } + private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) { ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool]; // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back