diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 081e171..bba6410 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.ByteArrayInputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,7 +43,6 @@ import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import com.google.common.base.Preconditions; -import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; @@ -187,24 +185,23 @@ class IPCUtil { } /** - * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized - * to hold these elements. - * @param header - * @param param - * @param cellBlock - * @return A {@link ByteBufferOutputStream} filled with the content of the passed in - * header, param, and cellBlock. + * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its + * serialization. + * @return The passed in Message serialized with delimiter. Return null if m is null * @throws IOException */ - static ByteBufferOutputStream write(final Message header, final Message param, - final ByteBuffer cellBlock) - throws IOException { - int totalSize = getTotalSizeWhenWrittenDelimited(header, param); - if (cellBlock != null) totalSize += cellBlock.limit(); - ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize); - write(bbos, header, param, cellBlock, totalSize); - bbos.close(); - return bbos; + static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { + if (m == null) return null; + int serializedSize = m.getSerializedSize(); + int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); + byte [] buffer = new byte[serializedSize + vintSize]; + // Passing in a byte array saves COS creating a buffer which it does when using streams. + CodedOutputStream cos = CodedOutputStream.newInstance(buffer); + // This will write out the vint preamble and the message serialized. + cos.writeMessageNoTag(m); + cos.flush(); + cos.checkNoSpaceLeft(); + return ByteBuffer.wrap(buffer); } /** @@ -230,8 +227,9 @@ class IPCUtil { private static int write(final OutputStream dos, final Message header, final Message param, final ByteBuffer cellBlock, final int totalSize) throws IOException { - // I confirmed toBytes does same as say DataOutputStream#writeInt. + // I confirmed toBytes does same as DataOutputStream#writeInt. dos.write(Bytes.toBytes(totalSize)); + // This allocates a buffer that is the size of the message internally. header.writeDelimitedTo(dos); if (param != null) param.writeDelimitedTo(dos); if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); @@ -240,20 +238,6 @@ class IPCUtil { } /** - * @param in Stream cue'd up just before a delimited message - * @return Bytes that hold the bytes that make up the message read from in - * @throws IOException - */ - static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException { - byte b = in.readByte(); - int size = CodedInputStream.readRawVarint32(b, in); - // Allocate right-sized buffer rather than let pb allocate its default minimum 4k. - byte [] bytes = new byte[size]; - IOUtils.readFully(in, bytes); - return bytes; - } - - /** * Read in chunks of 8K (HBASE-7239) * @param in * @param dest diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 05a7f44..81ad44a 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -62,7 +62,7 @@ public interface MetricsHBaseServerSource extends BaseSource { void authenticationFailure(); - void sentBytes(int count); + void sentBytes(long count); void receivedBytes(int count); diff --git a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index f6f6241..00411fd 100644 --- a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -86,7 +86,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl } @Override - public void sentBytes(int count) { + public void sentBytes(long count) { this.sentBytes.incr(count); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index afe0bfb..d0cdaf9 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -89,7 +89,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl } @Override - public void sentBytes(int count) { + public void sentBytes(long count) { this.sentBytes.incr(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java new file mode 100644 index 0000000..8b0b568 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Chain of ByteBuffers. + * Used writing out an array of byte buffers. Writes in chunks. + */ +@InterfaceAudience.Private +class BufferChain { + private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0]; + private final ByteBuffer[] buffers; + private int remaining = 0; + private int bufferOffset = 0; + + BufferChain(ByteBuffer ... buffers) { + // Some of the incoming buffers can be null + List bbs = new ArrayList(buffers.length); + for (ByteBuffer b : buffers) { + if (b == null) continue; + bbs.add(b); + this.remaining += b.remaining(); + } + this.buffers = bbs.toArray(FOR_TOARRAY_TYPE); + } + + /** + * Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This + * call drains this instance; it cannot be used subsequent to the call. + * @return A new byte buffer with the content of all contained ByteBuffers. + */ + byte [] getBytes() { + if (!hasRemaining()) throw new IllegalAccessError(); + byte [] bytes = new byte [this.remaining]; + int offset = 0; + for (ByteBuffer bb: this.buffers) { + System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit()); + offset += bb.capacity(); + } + return bytes; + } + + boolean hasRemaining() { + return remaining > 0; + } + + /** + * Write out our chain of buffers in chunks + * @param channel Where to write + * @param chunkSize Size of chunks to write. + * @return Amount written. + * @throws IOException + */ + long write(GatheringByteChannel channel, int chunkSize) throws IOException { + int chunkRemaining = chunkSize; + ByteBuffer lastBuffer = null; + int bufCount = 0; + int restoreLimit = -1; + + while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) { + lastBuffer = buffers[bufferOffset + bufCount]; + if (!lastBuffer.hasRemaining()) { + bufferOffset++; + continue; + } + bufCount++; + if (lastBuffer.remaining() > chunkRemaining) { + restoreLimit = lastBuffer.limit(); + lastBuffer.limit(lastBuffer.position() + chunkRemaining); + chunkRemaining = 0; + break; + } else { + chunkRemaining -= lastBuffer.remaining(); + } + } + assert lastBuffer != null; + if (chunkRemaining == chunkSize) { + assert !hasRemaining(); + // no data left to write + return 0; + } + try { + long ret = channel.write(buffers, bufferOffset, bufCount); + if (ret > 0) { + remaining -= ret; + } + return ret; + } finally { + if (restoreLimit >= 0) { + lastBuffer.limit(restoreLimit); + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index 7911c94..8f278d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -47,7 +47,7 @@ public class MetricsHBaseServer { source.authenticationSuccess(); } - void sentBytes(int count) { + void sentBytes(long count) { source.sentBytes(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ade4117..1aad013 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -35,6 +35,7 @@ import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -84,12 +85,11 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -276,7 +276,10 @@ public class RpcServer implements RpcServerInterface { protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null - protected ByteBuffer response; // the response for this call + /** + * Chain of buffers to send as response. + */ + protected BufferChain response; protected boolean delayResponse; protected Responder responder; protected boolean delayReturnValue; // if the return value should be @@ -341,14 +344,14 @@ public class RpcServer implements RpcServerInterface { } protected synchronized void setSaslTokenResponse(ByteBuffer response) { - this.response = response; + this.response = new BufferChain(response); } protected synchronized void setResponse(Object m, final CellScanner cells, Throwable t, String errorMsg) { if (this.isError) return; if (t != null) this.isError = true; - ByteBufferOutputStream bbos = null; + BufferChain bc = null; try { ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); // Presume it a pb Message. Could be null. @@ -380,42 +383,44 @@ public class RpcServer implements RpcServerInterface { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - bbos = IPCUtil.write(header, result, cellBlock); + + // Organize the response as a set of bytebuffers rather than collect it all together inside + // one big byte array; save on allocations. + ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); + ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); + int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) + + (cellBlock == null? 0: cellBlock.limit()); + ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); + bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock); if (connection.useWrap) { - wrapWithSasl(bbos); + bc = wrapWithSasl(bc); } } catch (IOException e) { LOG.warn("Exception while creating response " + e); } - ByteBuffer bb = null; - if (bbos != null) { - // TODO: If SASL, maybe buffer already been flipped and written? - bb = bbos.getByteBuffer(); - bb.position(0); - } - this.response = bb; + this.response = bc; } - private void wrapWithSasl(ByteBufferOutputStream response) - throws IOException { - if (connection.useSasl) { - // getByteBuffer calls flip() - ByteBuffer buf = response.getByteBuffer(); - byte[] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(buf.array(), - buf.arrayOffset(), buf.remaining()); - } - if (LOG.isDebugEnabled()) - LOG.debug("Adding saslServer wrapped token of size " + token.length - + " as call response."); - buf.clear(); - DataOutputStream saslOut = new DataOutputStream(response); - saslOut.writeInt(token.length); - saslOut.write(token, 0, token.length); + private BufferChain wrapWithSasl(BufferChain bc) + throws IOException { + if (bc == null) return bc; + if (!this.connection.useSasl) return bc; + // Looks like no way around this; saslserver wants a byte array. I have to make it one. + // THIS IS A BIG UGLY COPY. + byte [] responseBytes = bc.getBytes(); + byte [] token; + // synchronization may be needed since there can be multiple Handler + // threads using saslServer to wrap responses. + synchronized (connection.saslServer) { + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); } + if (LOG.isDebugEnabled()) + LOG.debug("Adding saslServer wrapped token of size " + token.length + + " as call response."); + + ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length)); + ByteBuffer bbTokenBytes = ByteBuffer.wrap(token); + return new BufferChain(bbTokenLength, bbTokenBytes); } @Override @@ -994,7 +999,7 @@ public class RpcServer implements RpcServerInterface { // // Send as much data as we can in the non-blocking fashion // - int numBytes = channelWrite(channel, call.response); + long numBytes = channelWrite(channel, call.response); if (numBytes < 0) { return true; } @@ -1347,6 +1352,7 @@ public class RpcServer implements RpcServerInterface { } } } + /** * No protobuf encoding of raw sasl messages */ @@ -2169,19 +2175,15 @@ public class RpcServer implements RpcServerInterface { * buffer. * * @param channel writable byte channel to write to - * @param buffer buffer to write + * @param bufferChain Chain of buffers to write * @return number of bytes written * @throws java.io.IOException e * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) */ - protected int channelWrite(WritableByteChannel channel, - ByteBuffer buffer) throws IOException { - - int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? - channel.write(buffer) : channelIO(null, channel, buffer); - if (count > 0) { - metrics.sentBytes(count); - } + protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) + throws IOException { + long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); + if (count > 0) this.metrics.sentBytes(count); return count; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java new file mode 100644 index 0000000..465bc04 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +@Category(SmallTests.class) +public class TestBufferChain { + private File tmpFile; + + private static final byte[][] HELLO_WORLD_CHUNKS = new byte[][] { + "hello".getBytes(Charsets.UTF_8), + " ".getBytes(Charsets.UTF_8), + "world".getBytes(Charsets.UTF_8) + }; + + @Before + public void setup() throws IOException { + tmpFile = File.createTempFile("TestBufferChain", "txt"); + } + + @After + public void teardown() { + tmpFile.delete(); + } + + @Test + public void testGetBackBytesWePutIn() { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes())); + } + + @Test + public void testChainChunkBiggerThanWholeArray() throws IOException { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain, "hello world", 8192); + assertNoRemaining(bufs); + } + + @Test + public void testChainChunkBiggerThanSomeArrays() throws IOException { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain, "hello world", 3); + assertNoRemaining(bufs); + } + + @Test + public void testLimitOffset() throws IOException { + ByteBuffer[] bufs = new ByteBuffer[] { + stringBuf("XXXhelloYYY", 3, 5), + stringBuf(" ", 0, 1), + stringBuf("XXXXworldY", 4, 5) }; + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain , "hello world", 3); + assertNoRemaining(bufs); + } + + @Test + public void testWithSpy() throws IOException { + ByteBuffer[] bufs = new ByteBuffer[] { + stringBuf("XXXhelloYYY", 3, 5), + stringBuf(" ", 0, 1), + stringBuf("XXXXworldY", 4, 5) }; + BufferChain chain = new BufferChain(bufs); + FileOutputStream fos = new FileOutputStream(tmpFile); + FileChannel ch = Mockito.spy(fos.getChannel()); + try { + chain.write(ch, 2); + assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 2); + assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 3); + assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 8); + assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8)); + } finally { + ch.close(); + fos.close(); + } + } + + private ByteBuffer stringBuf(String string, int position, int length) { + ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8)); + buf.position(position); + buf.limit(position + length); + assertTrue(buf.hasRemaining()); + return buf; + } + + private void assertNoRemaining(ByteBuffer[] bufs) { + for (ByteBuffer buf : bufs) { + assertFalse(buf.hasRemaining()); + } + } + + private ByteBuffer[] wrapArrays(byte[][] arrays) { + ByteBuffer[] ret = new ByteBuffer[arrays.length]; + for (int i = 0; i < arrays.length; i++) { + ret[i] = ByteBuffer.wrap(arrays[i]); + } + return ret; + } + + private void writeAndVerify(BufferChain chain, String string, int chunkSize) + throws IOException { + FileOutputStream fos = new FileOutputStream(tmpFile); + FileChannel ch = fos.getChannel(); + try { + long remaining = string.length(); + while (chain.hasRemaining()) { + long n = chain.write(ch, chunkSize); + assertTrue(n == chunkSize || n == remaining); + remaining -= n; + } + assertEquals(0, remaining); + } finally { + fos.close(); + } + assertFalse(chain.hasRemaining()); + assertEquals(string, Files.toString(tmpFile, Charsets.UTF_8)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index c1cbd7b..128a91a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.SmallTests; @@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -76,6 +79,7 @@ import org.mockito.stubbing.Answer; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -89,6 +93,8 @@ public class TestIPC { public static final Log LOG = LogFactory.getLog(TestIPC.class); static byte [] CELL_BYTES = Bytes.toBytes("xyz"); static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + static byte [] BIG_CELL_BYTES = new byte [10 * 1024]; + static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); private final static Configuration CONF = HBaseConfiguration.create(); // We are using the test TestRpcServiceProtos generated classes and Service because they are // available and basic with methods like 'echo', and ping. Below we make a blocking service @@ -303,8 +309,10 @@ public class TestIPC { int cellcount = Integer.parseInt(args[1]); Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); - KeyValue kv = KeyValueUtil.ensureKeyValue(CELL); + KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL); Put p = new Put(kv.getRow()); for (int i = 0; i < cellcount; i++) { p.add(kv); @@ -324,15 +332,17 @@ public class TestIPC { RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), MutationProto.newBuilder()); - CellScanner cellScanner = CellUtil.createCellScanner(cells); - if (i % 1000 == 0) { + builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME). + setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); + if (i % 100000 == 0) { LOG.info("" + i); // Uncomment this for a thread dump every so often. // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), // "Thread dump " + Thread.currentThread().getName()); } + CellScanner cellScanner = CellUtil.createCellScanner(cells); Pair response = - client.call(null, builder.build(), cellScanner, null, user, address, 0); + client.call(md, builder.build(), cellScanner, param, user, address, 0); /* int count = 0; while (p.getSecond().advance()) {