Index: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (revision 1541421) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (working copy) @@ -19,7 +19,6 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,7 +43,6 @@ import org.apache.hadoop.io.compress.Decompressor; import com.google.common.base.Preconditions; -import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; @@ -187,24 +185,23 @@ } /** - * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized - * to hold these elements. - * @param header - * @param param - * @param cellBlock - * @return A {@link ByteBufferOutputStream} filled with the content of the passed in - * header, param, and cellBlock. + * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its + * serialization. + * @return The passed in Message serialized with delimiter. Return null if m is null * @throws IOException */ - static ByteBufferOutputStream write(final Message header, final Message param, - final ByteBuffer cellBlock) - throws IOException { - int totalSize = getTotalSizeWhenWrittenDelimited(header, param); - if (cellBlock != null) totalSize += cellBlock.limit(); - ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize); - write(bbos, header, param, cellBlock, totalSize); - bbos.close(); - return bbos; + static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { + if (m == null) return null; + int serializedSize = m.getSerializedSize(); + int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); + byte [] buffer = new byte[serializedSize + vintSize]; + // Passing in a byte array saves COS creating a buffer which it does when using streams. + CodedOutputStream cos = CodedOutputStream.newInstance(buffer); + // This will write out the vint preamble and the message serialized. + cos.writeMessageNoTag(m); + cos.flush(); + cos.checkNoSpaceLeft(); + return ByteBuffer.wrap(buffer); } /** @@ -230,8 +227,9 @@ private static int write(final OutputStream dos, final Message header, final Message param, final ByteBuffer cellBlock, final int totalSize) throws IOException { - // I confirmed toBytes does same as say DataOutputStream#writeInt. + // I confirmed toBytes does same as DataOutputStream#writeInt. dos.write(Bytes.toBytes(totalSize)); + // This allocates a buffer that is the size of the message internally. header.writeDelimitedTo(dos); if (param != null) param.writeDelimitedTo(dos); if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); @@ -240,20 +238,6 @@ } /** - * @param in Stream cue'd up just before a delimited message - * @return Bytes that hold the bytes that make up the message read from in - * @throws IOException - */ - static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException { - byte b = in.readByte(); - int size = CodedInputStream.readRawVarint32(b, in); - // Allocate right-sized buffer rather than let pb allocate its default minimum 4k. - byte [] bytes = new byte[size]; - IOUtils.readFully(in, bytes); - return bytes; - } - - /** * Read in chunks of 8K (HBASE-7239) * @param in * @param dest Index: hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java =================================================================== --- hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java (revision 1541421) +++ hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java (working copy) @@ -62,7 +62,7 @@ void authenticationFailure(); - void sentBytes(int count); + void sentBytes(long count); void receivedBytes(int count); Index: hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java =================================================================== --- hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (revision 1541421) +++ hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (working copy) @@ -86,7 +86,7 @@ } @Override - public void sentBytes(int count) { + public void sentBytes(long count) { this.sentBytes.incr(count); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java (working copy) @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Chain of ByteBuffers. + * Used writing out an array of byte buffers. Writes in chunks. + */ +@InterfaceAudience.Private +class BufferChain { + private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0]; + private final ByteBuffer[] buffers; + private int remaining = 0; + private int bufferOffset = 0; + + BufferChain(ByteBuffer ... buffers) { + // Some of the incoming buffers can be null + List bbs = new ArrayList(buffers.length); + for (ByteBuffer b : buffers) { + if (b == null) continue; + bbs.add(b); + this.remaining += b.remaining(); + } + this.buffers = bbs.toArray(FOR_TOARRAY_TYPE); + } + + /** + * Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This + * call drains this instance; it cannot be used subsequent to the call. + * @return A new byte buffer with the content of all contained ByteBuffers. + */ + byte [] getBytes() { + if (!hasRemaining()) throw new IllegalAccessError(); + byte [] bytes = new byte [this.remaining]; + int offset = 0; + for (ByteBuffer bb: this.buffers) { + System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit()); + offset += bb.capacity(); + } + return bytes; + } + + boolean hasRemaining() { + return remaining > 0; + } + + /** + * Write out our chain of buffers in chunks + * @param channel Where to write + * @param chunkSize Size of chunks to write. + * @return Amount written. + * @throws IOException + */ + long write(GatheringByteChannel channel, int chunkSize) throws IOException { + int chunkRemaining = chunkSize; + ByteBuffer lastBuffer = null; + int bufCount = 0; + int restoreLimit = -1; + + while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) { + lastBuffer = buffers[bufferOffset + bufCount]; + if (!lastBuffer.hasRemaining()) { + bufferOffset++; + continue; + } + bufCount++; + if (lastBuffer.remaining() > chunkRemaining) { + restoreLimit = lastBuffer.limit(); + lastBuffer.limit(lastBuffer.position() + chunkRemaining); + chunkRemaining = 0; + break; + } else { + chunkRemaining -= lastBuffer.remaining(); + } + } + assert lastBuffer != null; + if (chunkRemaining == chunkSize) { + assert !hasRemaining(); + // no data left to write + return 0; + } + try { + long ret = channel.write(buffers, bufferOffset, bufCount); + if (ret > 0) { + remaining -= ret; + } + return ret; + } finally { + if (restoreLimit >= 0) { + lastBuffer.limit(restoreLimit); + } + } + } +} \ No newline at end of file Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (revision 1541421) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (working copy) @@ -47,7 +47,7 @@ source.authenticationSuccess(); } - void sentBytes(int count) { + void sentBytes(long count) { source.sentBytes(count); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1541421) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -35,6 +35,7 @@ import java.nio.channels.CancelledKeyException; import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -84,12 +85,11 @@ import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -114,6 +114,7 @@ import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; import com.google.protobuf.CodedInputStream; import com.google.protobuf.Descriptors.MethodDescriptor; @@ -276,7 +277,10 @@ protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null - protected ByteBuffer response; // the response for this call + /** + * Chain of buffers to send as response. + */ + protected BufferChain response; protected boolean delayResponse; protected Responder responder; protected boolean delayReturnValue; // if the return value should be @@ -341,14 +345,14 @@ } protected synchronized void setSaslTokenResponse(ByteBuffer response) { - this.response = response; + this.response = new BufferChain(response); } protected synchronized void setResponse(Object m, final CellScanner cells, Throwable t, String errorMsg) { if (this.isError) return; if (t != null) this.isError = true; - ByteBufferOutputStream bbos = null; + BufferChain bc = null; try { ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); // Presume it a pb Message. Could be null. @@ -380,42 +384,44 @@ headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - bbos = IPCUtil.write(header, result, cellBlock); + + // Organize the response as a set of bytebuffers rather than collect it all together inside + // one big byte array; save on allocations. + ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); + ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); + int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) + + (cellBlock == null? 0: cellBlock.limit()); + ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); + bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock); if (connection.useWrap) { - wrapWithSasl(bbos); + bc = wrapWithSasl(bc); } } catch (IOException e) { LOG.warn("Exception while creating response " + e); } - ByteBuffer bb = null; - if (bbos != null) { - // TODO: If SASL, maybe buffer already been flipped and written? - bb = bbos.getByteBuffer(); - bb.position(0); - } - this.response = bb; + this.response = bc; } - private void wrapWithSasl(ByteBufferOutputStream response) - throws IOException { - if (connection.useSasl) { - // getByteBuffer calls flip() - ByteBuffer buf = response.getByteBuffer(); - byte[] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(buf.array(), - buf.arrayOffset(), buf.remaining()); - } - if (LOG.isDebugEnabled()) - LOG.debug("Adding saslServer wrapped token of size " + token.length - + " as call response."); - buf.clear(); - DataOutputStream saslOut = new DataOutputStream(response); - saslOut.writeInt(token.length); - saslOut.write(token, 0, token.length); + private BufferChain wrapWithSasl(BufferChain bc) + throws IOException { + if (bc == null) return bc; + if (!this.connection.useSasl) return bc; + // Looks like no way around this; saslserver wants a byte array. I have to make it one. + // THIS IS A BIG UGLY COPY. + byte [] responseBytes = bc.getBytes(); + byte [] token; + // synchronization may be needed since there can be multiple Handler + // threads using saslServer to wrap responses. + synchronized (connection.saslServer) { + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); } + if (LOG.isDebugEnabled()) + LOG.debug("Adding saslServer wrapped token of size " + token.length + + " as call response."); + + ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length)); + ByteBuffer bbTokenBytes = ByteBuffer.wrap(token); + return new BufferChain(bbTokenLength, bbTokenBytes); } @Override @@ -994,7 +1000,7 @@ // // Send as much data as we can in the non-blocking fashion // - int numBytes = channelWrite(channel, call.response); + long numBytes = channelWrite(channel, call.response); if (numBytes < 0) { return true; } @@ -1347,6 +1353,7 @@ } } } + /** * No protobuf encoding of raw sasl messages */ @@ -2169,19 +2176,15 @@ * buffer. * * @param channel writable byte channel to write to - * @param buffer buffer to write + * @param bufferChain Chain of buffers to write * @return number of bytes written * @throws java.io.IOException e * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) */ - protected int channelWrite(WritableByteChannel channel, - ByteBuffer buffer) throws IOException { - - int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? - channel.write(buffer) : channelIO(null, channel, buffer); - if (count > 0) { - metrics.sentBytes(count); - } + protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) + throws IOException { + long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); + if (count > 0) this.metrics.sentBytes(count); return count; } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java (working copy) @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +@Category(SmallTests.class) +public class TestBufferChain { + private File tmpFile; + + private static final byte[][] HELLO_WORLD_CHUNKS = new byte[][] { + "hello".getBytes(Charsets.UTF_8), + " ".getBytes(Charsets.UTF_8), + "world".getBytes(Charsets.UTF_8) + }; + + @Before + public void setup() throws IOException { + tmpFile = File.createTempFile("TestBufferChain", "txt"); + } + + @After + public void teardown() { + tmpFile.delete(); + } + + @Test + public void testGetBackBytesWePutIn() { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes())); + } + + @Test + public void testChainChunkBiggerThanWholeArray() throws IOException { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain, "hello world", 8192); + assertNoRemaining(bufs); + } + + @Test + public void testChainChunkBiggerThanSomeArrays() throws IOException { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain, "hello world", 3); + assertNoRemaining(bufs); + } + + @Test + public void testLimitOffset() throws IOException { + ByteBuffer[] bufs = new ByteBuffer[] { + stringBuf("XXXhelloYYY", 3, 5), + stringBuf(" ", 0, 1), + stringBuf("XXXXworldY", 4, 5) }; + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain , "hello world", 3); + assertNoRemaining(bufs); + } + + @Test + public void testWithSpy() throws IOException { + ByteBuffer[] bufs = new ByteBuffer[] { + stringBuf("XXXhelloYYY", 3, 5), + stringBuf(" ", 0, 1), + stringBuf("XXXXworldY", 4, 5) }; + BufferChain chain = new BufferChain(bufs); + FileOutputStream fos = new FileOutputStream(tmpFile); + FileChannel ch = Mockito.spy(fos.getChannel()); + try { + chain.write(ch, 2); + assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 2); + assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 3); + assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 8); + assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8)); + } finally { + ch.close(); + fos.close(); + } + } + + private ByteBuffer stringBuf(String string, int position, int length) { + ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8)); + buf.position(position); + buf.limit(position + length); + assertTrue(buf.hasRemaining()); + return buf; + } + + private void assertNoRemaining(ByteBuffer[] bufs) { + for (ByteBuffer buf : bufs) { + assertFalse(buf.hasRemaining()); + } + } + + private ByteBuffer[] wrapArrays(byte[][] arrays) { + ByteBuffer[] ret = new ByteBuffer[arrays.length]; + for (int i = 0; i < arrays.length; i++) { + ret[i] = ByteBuffer.wrap(arrays[i]); + } + return ret; + } + + private void writeAndVerify(BufferChain chain, String string, int chunkSize) + throws IOException { + FileOutputStream fos = new FileOutputStream(tmpFile); + FileChannel ch = fos.getChannel(); + try { + long remaining = string.length(); + while (chain.hasRemaining()) { + long n = chain.write(ch, chunkSize); + assertTrue(n == chunkSize || n == remaining); + remaining -= n; + } + assertEquals(0, remaining); + } finally { + fos.close(); + } + assertFalse(chain.hasRemaining()); + assertEquals(string, Files.toString(tmpFile, Charsets.UTF_8)); + } +} \ No newline at end of file Index: hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (revision 1541421) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (working copy) @@ -36,6 +36,7 @@ import javax.net.SocketFactory; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration;