diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 081e171..ab5f16c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.ByteArrayInputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -44,7 +43,6 @@ import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import com.google.common.base.Preconditions; -import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; @@ -117,10 +115,15 @@ class IPCUtil { os = compressor.createOutputStream(os, poolCompressor); } Codec.Encoder encoder = codec.getEncoder(os); + int count = 0; while (cellScanner.advance()) { encoder.write(cellScanner.current()); + count++; } encoder.flush(); + // If no cells, don't mess around. Just return null (could be a bunch of existence checking + // gets or something -- stuff that does not return a cell). + if (count == 0) return null; } finally { os.close(); if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor); @@ -187,24 +190,23 @@ class IPCUtil { } /** - * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized - * to hold these elements. - * @param header - * @param param - * @param cellBlock - * @return A {@link ByteBufferOutputStream} filled with the content of the passed in - * header, param, and cellBlock. + * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its + * serialization. + * @return The passed in Message serialized with delimiter. Return null if m is null * @throws IOException */ - static ByteBufferOutputStream write(final Message header, final Message param, - final ByteBuffer cellBlock) - throws IOException { - int totalSize = getTotalSizeWhenWrittenDelimited(header, param); - if (cellBlock != null) totalSize += cellBlock.limit(); - ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize); - write(bbos, header, param, cellBlock, totalSize); - bbos.close(); - return bbos; + static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { + if (m == null) return null; + int serializedSize = m.getSerializedSize(); + int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); + byte [] buffer = new byte[serializedSize + vintSize]; + // Passing in a byte array saves COS creating a buffer which it does when using streams. + CodedOutputStream cos = CodedOutputStream.newInstance(buffer); + // This will write out the vint preamble and the message serialized. + cos.writeMessageNoTag(m); + cos.flush(); + cos.checkNoSpaceLeft(); + return ByteBuffer.wrap(buffer); } /** @@ -230,8 +232,9 @@ class IPCUtil { private static int write(final OutputStream dos, final Message header, final Message param, final ByteBuffer cellBlock, final int totalSize) throws IOException { - // I confirmed toBytes does same as say DataOutputStream#writeInt. + // I confirmed toBytes does same as DataOutputStream#writeInt. dos.write(Bytes.toBytes(totalSize)); + // This allocates a buffer that is the size of the message internally. header.writeDelimitedTo(dos); if (param != null) param.writeDelimitedTo(dos); if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); @@ -240,20 +243,6 @@ class IPCUtil { } /** - * @param in Stream cue'd up just before a delimited message - * @return Bytes that hold the bytes that make up the message read from in - * @throws IOException - */ - static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException { - byte b = in.readByte(); - int size = CodedInputStream.readRawVarint32(b, in); - // Allocate right-sized buffer rather than let pb allocate its default minimum 4k. - byte [] bytes = new byte[size]; - IOUtils.readFully(in, bytes); - return bytes; - } - - /** * Read in chunks of 8K (HBASE-7239) * @param in * @param dest diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 9e6d957..7ea9e0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -172,10 +172,13 @@ public final class ProtobufUtil { ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder(); builder.setExists(true); + builder.setAssociatedCellCount(0); EMPTY_RESULT_PB_EXISTS_TRUE = builder.build(); builder.clear(); + builder.setExists(false); + builder.setAssociatedCellCount(0); EMPTY_RESULT_PB_EXISTS_FALSE = builder.build(); builder.clear(); @@ -1184,19 +1187,11 @@ public final class ProtobufUtil { * @return the converted protocol buffer Result */ public static ClientProtos.Result toResultNoData(final Result result) { - if (result.getExists() != null){ - return result.getExists() ? EMPTY_RESULT_PB_EXISTS_TRUE : EMPTY_RESULT_PB_EXISTS_FALSE; - } - + if (result.getExists() != null) return toResult(result.getExists()); int size = result.size(); - - if (size == 0){ - return EMPTY_RESULT_PB; - } - + if (size == 0) return EMPTY_RESULT_PB; ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder(); builder.setAssociatedCellCount(size); - return builder.build(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 05a7f44..81ad44a 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -62,7 +62,7 @@ public interface MetricsHBaseServerSource extends BaseSource { void authenticationFailure(); - void sentBytes(int count); + void sentBytes(long count); void receivedBytes(int count); diff --git a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index f6f6241..00411fd 100644 --- a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -86,7 +86,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl } @Override - public void sentBytes(int count) { + public void sentBytes(long count) { this.sentBytes.incr(count); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index afe0bfb..d0cdaf9 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -89,7 +89,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl } @Override - public void sentBytes(int count) { + public void sentBytes(long count) { this.sentBytes.incr(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java new file mode 100644 index 0000000..8b0b568 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Chain of ByteBuffers. + * Used writing out an array of byte buffers. Writes in chunks. + */ +@InterfaceAudience.Private +class BufferChain { + private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0]; + private final ByteBuffer[] buffers; + private int remaining = 0; + private int bufferOffset = 0; + + BufferChain(ByteBuffer ... buffers) { + // Some of the incoming buffers can be null + List bbs = new ArrayList(buffers.length); + for (ByteBuffer b : buffers) { + if (b == null) continue; + bbs.add(b); + this.remaining += b.remaining(); + } + this.buffers = bbs.toArray(FOR_TOARRAY_TYPE); + } + + /** + * Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This + * call drains this instance; it cannot be used subsequent to the call. + * @return A new byte buffer with the content of all contained ByteBuffers. + */ + byte [] getBytes() { + if (!hasRemaining()) throw new IllegalAccessError(); + byte [] bytes = new byte [this.remaining]; + int offset = 0; + for (ByteBuffer bb: this.buffers) { + System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit()); + offset += bb.capacity(); + } + return bytes; + } + + boolean hasRemaining() { + return remaining > 0; + } + + /** + * Write out our chain of buffers in chunks + * @param channel Where to write + * @param chunkSize Size of chunks to write. + * @return Amount written. + * @throws IOException + */ + long write(GatheringByteChannel channel, int chunkSize) throws IOException { + int chunkRemaining = chunkSize; + ByteBuffer lastBuffer = null; + int bufCount = 0; + int restoreLimit = -1; + + while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) { + lastBuffer = buffers[bufferOffset + bufCount]; + if (!lastBuffer.hasRemaining()) { + bufferOffset++; + continue; + } + bufCount++; + if (lastBuffer.remaining() > chunkRemaining) { + restoreLimit = lastBuffer.limit(); + lastBuffer.limit(lastBuffer.position() + chunkRemaining); + chunkRemaining = 0; + break; + } else { + chunkRemaining -= lastBuffer.remaining(); + } + } + assert lastBuffer != null; + if (chunkRemaining == chunkSize) { + assert !hasRemaining(); + // no data left to write + return 0; + } + try { + long ret = channel.write(buffers, bufferOffset, bufCount); + if (ret > 0) { + remaining -= ret; + } + return ret; + } finally { + if (restoreLimit >= 0) { + lastBuffer.limit(restoreLimit); + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index 7911c94..8f278d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -47,7 +47,7 @@ public class MetricsHBaseServer { source.authenticationSuccess(); } - void sentBytes(int count) { + void sentBytes(long count) { source.sentBytes(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ade4117..1aad013 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -35,6 +35,7 @@ import java.nio.ByteBuffer; import java.nio.channels.CancelledKeyException; import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -84,12 +85,11 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUtil; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -276,7 +276,10 @@ public class RpcServer implements RpcServerInterface { protected Connection connection; // connection to client protected long timestamp; // the time received when response is null // the time served when response is not null - protected ByteBuffer response; // the response for this call + /** + * Chain of buffers to send as response. + */ + protected BufferChain response; protected boolean delayResponse; protected Responder responder; protected boolean delayReturnValue; // if the return value should be @@ -341,14 +344,14 @@ public class RpcServer implements RpcServerInterface { } protected synchronized void setSaslTokenResponse(ByteBuffer response) { - this.response = response; + this.response = new BufferChain(response); } protected synchronized void setResponse(Object m, final CellScanner cells, Throwable t, String errorMsg) { if (this.isError) return; if (t != null) this.isError = true; - ByteBufferOutputStream bbos = null; + BufferChain bc = null; try { ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); // Presume it a pb Message. Could be null. @@ -380,42 +383,44 @@ public class RpcServer implements RpcServerInterface { headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); } Message header = headerBuilder.build(); - bbos = IPCUtil.write(header, result, cellBlock); + + // Organize the response as a set of bytebuffers rather than collect it all together inside + // one big byte array; save on allocations. + ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); + ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); + int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) + + (cellBlock == null? 0: cellBlock.limit()); + ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); + bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock); if (connection.useWrap) { - wrapWithSasl(bbos); + bc = wrapWithSasl(bc); } } catch (IOException e) { LOG.warn("Exception while creating response " + e); } - ByteBuffer bb = null; - if (bbos != null) { - // TODO: If SASL, maybe buffer already been flipped and written? - bb = bbos.getByteBuffer(); - bb.position(0); - } - this.response = bb; + this.response = bc; } - private void wrapWithSasl(ByteBufferOutputStream response) - throws IOException { - if (connection.useSasl) { - // getByteBuffer calls flip() - ByteBuffer buf = response.getByteBuffer(); - byte[] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer to wrap responses. - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(buf.array(), - buf.arrayOffset(), buf.remaining()); - } - if (LOG.isDebugEnabled()) - LOG.debug("Adding saslServer wrapped token of size " + token.length - + " as call response."); - buf.clear(); - DataOutputStream saslOut = new DataOutputStream(response); - saslOut.writeInt(token.length); - saslOut.write(token, 0, token.length); + private BufferChain wrapWithSasl(BufferChain bc) + throws IOException { + if (bc == null) return bc; + if (!this.connection.useSasl) return bc; + // Looks like no way around this; saslserver wants a byte array. I have to make it one. + // THIS IS A BIG UGLY COPY. + byte [] responseBytes = bc.getBytes(); + byte [] token; + // synchronization may be needed since there can be multiple Handler + // threads using saslServer to wrap responses. + synchronized (connection.saslServer) { + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); } + if (LOG.isDebugEnabled()) + LOG.debug("Adding saslServer wrapped token of size " + token.length + + " as call response."); + + ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length)); + ByteBuffer bbTokenBytes = ByteBuffer.wrap(token); + return new BufferChain(bbTokenLength, bbTokenBytes); } @Override @@ -994,7 +999,7 @@ public class RpcServer implements RpcServerInterface { // // Send as much data as we can in the non-blocking fashion // - int numBytes = channelWrite(channel, call.response); + long numBytes = channelWrite(channel, call.response); if (numBytes < 0) { return true; } @@ -1347,6 +1352,7 @@ public class RpcServer implements RpcServerInterface { } } } + /** * No protobuf encoding of raw sasl messages */ @@ -2169,19 +2175,15 @@ public class RpcServer implements RpcServerInterface { * buffer. * * @param channel writable byte channel to write to - * @param buffer buffer to write + * @param bufferChain Chain of buffers to write * @return number of bytes written * @throws java.io.IOException e * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) */ - protected int channelWrite(WritableByteChannel channel, - ByteBuffer buffer) throws IOException { - - int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? - channel.write(buffer) : channelIO(null, channel, buffer); - if (count > 0) { - metrics.sentBytes(count); - } + protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) + throws IOException { + long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); + if (count > 0) this.metrics.sentBytes(count); return count; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index db699c7..f4b306a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2834,7 +2834,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa if (existence != null){ ClientProtos.Result pbr = ProtobufUtil.toResult(existence); builder.setResult(pbr); - }else if (r != null) { + } else if (r != null) { ClientProtos.Result pbr = ProtobufUtil.toResult(r); builder.setResult(pbr); } @@ -3294,6 +3294,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa @Override public MultiResponse multi(final RpcController rpcc, final MultiRequest request) throws ServiceException { + // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java index e36ee4e..f1c9607 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java @@ -64,7 +64,6 @@ public class TestFromClientSide3 { private final static byte[] VAL_BYTES = Bytes.toBytes("v1"); private final static byte[] ROW_BYTES = Bytes.toBytes("r1"); - /** * @throws java.lang.Exception */ @@ -345,6 +344,7 @@ public class TestFromClientSide3 { gets.add(new Get(ROW)); gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 }))); + LOG.info("Calling exists"); Boolean[] results = table.exists(gets); assertEquals(results[0], false); assertEquals(results[1], false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index f8b93ce..9f3e8b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -82,8 +82,7 @@ import org.junit.experimental.categories.Category; public class TestRegionObserverInterface { static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class); - public static final TableName TEST_TABLE = - TableName.valueOf("TestTable"); + public static final TableName TEST_TABLE = TableName.valueOf("TestTable"); public final static byte[] A = Bytes.toBytes("a"); public final static byte[] B = Bytes.toBytes("b"); public final static byte[] C = Bytes.toBytes("c"); @@ -111,126 +110,132 @@ public class TestRegionObserverInterface { @Test public void testRegionObserver() throws IOException { - TableName tableName = TEST_TABLE; + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRegionObserver"); // recreate table every time in order to reset the status of the // coprocessor. HTable table = util.createTable(tableName, new byte[][] {A, B, C}); - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, - TEST_TABLE, - new Boolean[] {false, false, false, false, false}); - - Put put = new Put(ROW); - put.add(A, A, A); - put.add(B, B, B); - put.add(C, C, C); - table.put(put); + try { + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadDelete"}, + tableName, + new Boolean[] {false, false, false, false, false}); + + Put put = new Put(ROW); + put.add(A, A, A); + put.add(B, B, B); + put.add(C, C, C); + table.put(put); - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, - TEST_TABLE, + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, + tableName, new Boolean[] {false, false, true, true, true, true, false} - ); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"}, - TEST_TABLE, - new Integer[] {1, 1, 0, 0}); - - Get get = new Get(ROW); - get.addColumn(A, A); - get.addColumn(B, B); - get.addColumn(C, C); - table.get(get); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDelete"}, - TEST_TABLE, - new Boolean[] {true, true, true, true, false} - ); - - Delete delete = new Delete(ROW); - delete.deleteColumn(A, A); - delete.deleteColumn(B, B); - delete.deleteColumn(C, C); - table.delete(delete); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, - TEST_TABLE, + ); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"}, + tableName, + new Integer[] {1, 1, 0, 0}); + + Get get = new Get(ROW); + get.addColumn(A, A); + get.addColumn(B, B); + get.addColumn(C, C); + table.get(get); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadDelete"}, + tableName, + new Boolean[] {true, true, true, true, false} + ); + + Delete delete = new Delete(ROW); + delete.deleteColumn(A, A); + delete.deleteColumn(B, B); + delete.deleteColumn(C, C); + table.delete(delete); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, + tableName, new Boolean[] {true, true, true, true, true, true, true} - ); - util.deleteTable(tableName); - table.close(); - + ); + } finally { + util.deleteTable(tableName); + table.close(); + } verifyMethodResult(SimpleRegionObserver.class, new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"}, - TEST_TABLE, + tableName, new Integer[] {1, 1, 1, 1}); } @Test public void testRowMutation() throws IOException { - TableName tableName = TEST_TABLE; + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRowMutation"); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); - verifyMethodResult(SimpleRegionObserver.class, + try { + verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", "hadDeleted"}, - TEST_TABLE, + tableName, new Boolean[] {false, false, false, false, false}); - - Put put = new Put(ROW); - put.add(A, A, A); - put.add(B, B, B); - put.add(C, C, C); - - Delete delete = new Delete(ROW); - delete.deleteColumn(A, A); - delete.deleteColumn(B, B); - delete.deleteColumn(C, C); - - RowMutations arm = new RowMutations(ROW); - arm.add(put); - arm.add(delete); - table.mutateRow(arm); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadDeleted"}, - TEST_TABLE, - new Boolean[] {false, false, true, true, true} - ); - util.deleteTable(tableName); - table.close(); + Put put = new Put(ROW); + put.add(A, A, A); + put.add(B, B, B); + put.add(C, C, C); + + Delete delete = new Delete(ROW); + delete.deleteColumn(A, A); + delete.deleteColumn(B, B); + delete.deleteColumn(C, C); + + RowMutations arm = new RowMutations(ROW); + arm.add(put); + arm.add(delete); + table.mutateRow(arm); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadDeleted"}, + tableName, + new Boolean[] {false, false, true, true, true} + ); + } finally { + util.deleteTable(tableName); + table.close(); + } } @Test public void testIncrementHook() throws IOException { - TableName tableName = TEST_TABLE; - + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook"); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); - Increment inc = new Increment(Bytes.toBytes(0)); - inc.addColumn(A, A, 1); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreIncrement", "hadPostIncrement"}, - tableName, - new Boolean[] {false, false} - ); - - table.increment(inc); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreIncrement", "hadPostIncrement"}, - tableName, - new Boolean[] {true, true} - ); - util.deleteTable(tableName); - table.close(); + try { + Increment inc = new Increment(Bytes.toBytes(0)); + inc.addColumn(A, A, 1); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreIncrement", "hadPostIncrement"}, + tableName, + new Boolean[] {false, false} + ); + + table.increment(inc); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreIncrement", "hadPostIncrement"}, + tableName, + new Boolean[] {true, true} + ); + } finally { + util.deleteTable(tableName); + table.close(); + } } @Test @@ -464,84 +469,82 @@ public class TestRegionObserverInterface { @Test public void bulkLoadHFileTest() throws Exception { String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest"; - TableName tableName = TEST_TABLE; + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest"); Configuration conf = util.getConfiguration(); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, - tableName, - new Boolean[] {false, false} - ); - - FileSystem fs = util.getTestFileSystem(); - final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(A)); - - createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); - - //Bulk load - new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, - tableName, - new Boolean[] {true, true} - ); - util.deleteTable(tableName); - table.close(); + try { + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, + tableName, + new Boolean[] {false, false} + ); + + FileSystem fs = util.getTestFileSystem(); + final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(A)); + + createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A); + + //Bulk load + new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName)); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"}, + tableName, + new Boolean[] {true, true} + ); + } finally { + util.deleteTable(tableName); + table.close(); + } } @Test public void testRecovery() throws Exception { - LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery"); - TableName tableName = TEST_TABLE; - + LOG.info(TestRegionObserverInterface.class.getName() +".testRecovery"); + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRecovery"); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + try { + JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); + ServerName sn2 = rs1.getRegionServer().getServerName(); + String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName(); - JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); - ServerName sn2 = rs1.getRegionServer().getServerName(); - String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName(); - - util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes()); - while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){ - Thread.sleep(100); - } + util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes()); + while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){ + Thread.sleep(100); + } - Put put = new Put(ROW); - put.add(A, A, A); - put.add(B, B, B); - put.add(C, C, C); - table.put(put); + Put put = new Put(ROW); + put.add(A, A, A); + put.add(B, B, B); + put.add(C, C, C); + table.put(put); - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", - "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, - TEST_TABLE, + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, + tableName, new Boolean[] {false, false, true, true, true, true, false} - ); - - verifyMethodResult(SimpleRegionObserver.class, - new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"}, - TEST_TABLE, - new Integer[] {0, 0, 1, 1}); - - cluster.killRegionServer(rs1.getRegionServer().getServerName()); - Threads.sleep(20000); // just to be sure that the kill has fully started. - util.waitUntilAllRegionsAssigned(tableName); - - verifyMethodResult(SimpleRegionObserver.class, - new String[]{"getCtPreWALRestore", "getCtPostWALRestore"}, - TEST_TABLE, - new Integer[]{1, 1}); - - verifyMethodResult(SimpleRegionObserver.class, - new String[]{"getCtPrePut", "getCtPostPut"}, - TEST_TABLE, - new Integer[]{0, 0}); - - util.deleteTable(tableName); - table.close(); + ); + + verifyMethodResult(SimpleRegionObserver.class, + new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"}, + tableName, + new Integer[] {0, 0, 1, 1}); + + cluster.killRegionServer(rs1.getRegionServer().getServerName()); + Threads.sleep(1000); // Let the kill soak in. + util.waitUntilAllRegionsAssigned(tableName); + LOG.info("All regions assigned"); + + verifyMethodResult(SimpleRegionObserver.class, + new String[]{"getCtPrePut", "getCtPostPut"}, + tableName, + new Integer[]{0, 0}); + } finally { + util.deleteTable(tableName); + table.close(); + } } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java new file mode 100644 index 0000000..465bc04 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +@Category(SmallTests.class) +public class TestBufferChain { + private File tmpFile; + + private static final byte[][] HELLO_WORLD_CHUNKS = new byte[][] { + "hello".getBytes(Charsets.UTF_8), + " ".getBytes(Charsets.UTF_8), + "world".getBytes(Charsets.UTF_8) + }; + + @Before + public void setup() throws IOException { + tmpFile = File.createTempFile("TestBufferChain", "txt"); + } + + @After + public void teardown() { + tmpFile.delete(); + } + + @Test + public void testGetBackBytesWePutIn() { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes())); + } + + @Test + public void testChainChunkBiggerThanWholeArray() throws IOException { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain, "hello world", 8192); + assertNoRemaining(bufs); + } + + @Test + public void testChainChunkBiggerThanSomeArrays() throws IOException { + ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain, "hello world", 3); + assertNoRemaining(bufs); + } + + @Test + public void testLimitOffset() throws IOException { + ByteBuffer[] bufs = new ByteBuffer[] { + stringBuf("XXXhelloYYY", 3, 5), + stringBuf(" ", 0, 1), + stringBuf("XXXXworldY", 4, 5) }; + BufferChain chain = new BufferChain(bufs); + writeAndVerify(chain , "hello world", 3); + assertNoRemaining(bufs); + } + + @Test + public void testWithSpy() throws IOException { + ByteBuffer[] bufs = new ByteBuffer[] { + stringBuf("XXXhelloYYY", 3, 5), + stringBuf(" ", 0, 1), + stringBuf("XXXXworldY", 4, 5) }; + BufferChain chain = new BufferChain(bufs); + FileOutputStream fos = new FileOutputStream(tmpFile); + FileChannel ch = Mockito.spy(fos.getChannel()); + try { + chain.write(ch, 2); + assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 2); + assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 3); + assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8)); + chain.write(ch, 8); + assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8)); + } finally { + ch.close(); + fos.close(); + } + } + + private ByteBuffer stringBuf(String string, int position, int length) { + ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8)); + buf.position(position); + buf.limit(position + length); + assertTrue(buf.hasRemaining()); + return buf; + } + + private void assertNoRemaining(ByteBuffer[] bufs) { + for (ByteBuffer buf : bufs) { + assertFalse(buf.hasRemaining()); + } + } + + private ByteBuffer[] wrapArrays(byte[][] arrays) { + ByteBuffer[] ret = new ByteBuffer[arrays.length]; + for (int i = 0; i < arrays.length; i++) { + ret[i] = ByteBuffer.wrap(arrays[i]); + } + return ret; + } + + private void writeAndVerify(BufferChain chain, String string, int chunkSize) + throws IOException { + FileOutputStream fos = new FileOutputStream(tmpFile); + FileChannel ch = fos.getChannel(); + try { + long remaining = string.length(); + while (chain.hasRemaining()) { + long n = chain.write(ch, chunkSize); + assertTrue(n == chunkSize || n == remaining); + remaining -= n; + } + assertEquals(0, remaining); + } finally { + fos.close(); + } + assertFalse(chain.hasRemaining()); + assertEquals(string, Files.toString(tmpFile, Charsets.UTF_8)); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index c1cbd7b..128a91a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.SmallTests; @@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -76,6 +79,7 @@ import org.mockito.stubbing.Answer; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; +import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -89,6 +93,8 @@ public class TestIPC { public static final Log LOG = LogFactory.getLog(TestIPC.class); static byte [] CELL_BYTES = Bytes.toBytes("xyz"); static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); + static byte [] BIG_CELL_BYTES = new byte [10 * 1024]; + static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); private final static Configuration CONF = HBaseConfiguration.create(); // We are using the test TestRpcServiceProtos generated classes and Service because they are // available and basic with methods like 'echo', and ping. Below we make a blocking service @@ -303,8 +309,10 @@ public class TestIPC { int cellcount = Integer.parseInt(args[1]); Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); + MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); - KeyValue kv = KeyValueUtil.ensureKeyValue(CELL); + KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL); Put p = new Put(kv.getRow()); for (int i = 0; i < cellcount; i++) { p.add(kv); @@ -324,15 +332,17 @@ public class TestIPC { RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), MutationProto.newBuilder()); - CellScanner cellScanner = CellUtil.createCellScanner(cells); - if (i % 1000 == 0) { + builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME). + setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); + if (i % 100000 == 0) { LOG.info("" + i); // Uncomment this for a thread dump every so often. // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), // "Thread dump " + Thread.currentThread().getName()); } + CellScanner cellScanner = CellUtil.createCellScanner(cells); Pair response = - client.call(null, builder.build(), cellScanner, null, user, address, 0); + client.call(md, builder.build(), cellScanner, param, user, address, 0); /* int count = 0; while (p.getSecond().advance()) {