From 130377c1c2e83b8d1cf286368095729d66195867 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 8 May 2017 14:26:10 +0800 Subject: [PATCH] HBASE-18009 Move RpcServer.Call to a separated file --- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 4 +- .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 91 +--- .../apache/hadoop/hbase/ipc/NettyServerCall.java | 67 +++ .../java/org/apache/hadoop/hbase/ipc/RpcCall.java | 5 - .../org/apache/hadoop/hbase/ipc/RpcServer.java | 498 +------------------ .../org/apache/hadoop/hbase/ipc/ServerCall.java | 527 +++++++++++++++++++++ .../apache/hadoop/hbase/ipc/SimpleRpcServer.java | 122 ++--- .../apache/hadoop/hbase/ipc/SimpleServerCall.java | 79 +++ .../apache/hadoop/hbase/ipc/TestCallRunner.java | 4 +- .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 60 ++- .../hadoop/hbase/security/TestSecureIPC.java | 8 +- 11 files changed, 775 insertions(+), 690 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 0aabc10..f16fc50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -75,8 +75,8 @@ public class CallRunner { * @deprecated As of release 2.0, this will be removed in HBase 3.0 */ @Deprecated - public RpcServer.Call getCall() { - return (RpcServer.Call) call; + public ServerCall getCall() { + return (ServerCall) call; } public void setStatus(MonitoredRPCHandler status) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index be55378..c18b894 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -59,7 +59,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; @@ -203,13 +202,14 @@ public class NettyRpcServer extends RpcServer { this.hostAddress = inetSocketAddress.getAddress().getHostAddress(); } this.remotePort = inetSocketAddress.getPort(); - this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, - 0, null, null, 0, null); - this.setConnectionHeaderResponseCall = new Call( - CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, - this, 0, null, null, 0, null); - this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, - null, null, null, this, 0, null, null, 0, null); + this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, + null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); + this.setConnectionHeaderResponseCall = + new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this, + 0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); + this.authFailedCall = + new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0, + null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); } void readPreamble(ByteBuf buffer) throws IOException { @@ -243,7 +243,7 @@ public class NettyRpcServer extends RpcServer { AccessDeniedException ae = new AccessDeniedException( "Authentication is required"); setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); - ((Call) authFailedCall) + ((NettyServerCall) authFailedCall) .sendResponseIfReady(ChannelFutureListener.CLOSE); return; } @@ -269,8 +269,8 @@ public class NettyRpcServer extends RpcServer { private void doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, -1, - null, null, 0, null); + NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, + null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null); setupResponse(null, fakeCall, e, msg); // closes out the connection. fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE); @@ -336,59 +336,17 @@ public class NettyRpcServer extends RpcServer { } @Override - public RpcServer.Call createCall(int id, final BlockingService service, + public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection, long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { - return new Call(id, service, md, header, param, cellScanner, connection, - size, tinfo, remoteAddress, timeout, reqCleanup); + return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size, + tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder, + reqCleanup); } } - /** - * Datastructure that holds all necessary to a method invocation and then afterward, carries the - * result. - */ - @InterfaceStability.Evolving - public class Call extends RpcServer.Call { - - Call(int id, final BlockingService service, final MethodDescriptor md, - RequestHeader header, Message param, CellScanner cellScanner, - RpcServer.Connection connection, long size, TraceInfo tinfo, - final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { - super(id, service, md, header, param, cellScanner, - connection, size, tinfo, remoteAddress, timeout, reqCleanup); - } - - @Override - public long disconnectSince() { - if (!getConnection().isConnectionOpen()) { - return System.currentTimeMillis() - timestamp; - } else { - return -1L; - } - } - - NettyConnection getConnection() { - return (NettyConnection) this.connection; - } - - /** - * If we have a response, and delay is not set, then respond immediately. Otherwise, do not - * respond to client. This is called by the RPC code in the context of the Handler thread. - */ - @Override - public synchronized void sendResponseIfReady() throws IOException { - getConnection().channel.writeAndFlush(this); - } - - public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException { - getConnection().channel.writeAndFlush(this).addListener(listener); - } - - } - private class Initializer extends ChannelInitializer { final int maxRequestSize; @@ -483,7 +441,7 @@ public class NettyRpcServer extends RpcServer { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - final Call call = (Call) msg; + final NettyServerCall call = (NettyServerCall) msg; ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers()); ctx.write(response, promise).addListener(new CallWriteListener(call)); } @@ -492,9 +450,9 @@ public class NettyRpcServer extends RpcServer { private class CallWriteListener implements ChannelFutureListener { - private Call call; + private NettyServerCall call; - CallWriteListener(Call call) { + CallWriteListener(NettyServerCall call) { this.call = call; } @@ -527,14 +485,11 @@ public class NettyRpcServer extends RpcServer { } @Override - public Pair call(BlockingService service, - MethodDescriptor md, Message param, CellScanner cellScanner, - long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) - throws IOException { - Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, - -1, null, null, timeout, null); - fakeCall.setReceiveTime(receiveTime); + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, + long startTime, int timeout) throws IOException { + NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, + -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null); return call(fakeCall, status); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java new file mode 100644 index 0000000..a3f23dd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import io.netty.channel.ChannelFutureListener; + +import java.io.IOException; +import java.net.InetAddress; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.htrace.TraceInfo; + +/** + * Datastructure that holds all necessary to a method invocation and then afterward, carries the + * result. + */ +@InterfaceAudience.Private +class NettyServerCall extends ServerCall { + + NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, + Message param, CellScanner cellScanner, RpcServer.Connection connection, long size, + TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout, + ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { + super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress, + receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); + } + + NettyConnection getConnection() { + return (NettyConnection) this.connection; + } + + /** + * If we have a response, and delay is not set, then respond immediately. Otherwise, do not + * respond to client. This is called by the RPC code in the context of the Handler thread. + */ + @Override + public synchronized void sendResponseIfReady() throws IOException { + getConnection().channel.writeAndFlush(this); + } + + public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException { + getConnection().channel.writeAndFlush(this).addListener(listener); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 239ea9e..fc86594 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -65,11 +65,6 @@ public interface RpcCall extends RpcCallContext { long getReceiveTime(); /** - * Set the timestamp when the call is constructed. - */ - void setReceiveTime(long receiveTime); - - /** * @return The time when the call starts to be executed. */ long getStartTime(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index ebae1fb..bbc329c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -62,9 +62,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; -import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; @@ -88,7 +86,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; @@ -98,13 +95,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.BytesWritable; @@ -120,7 +113,6 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.StringUtils; import org.apache.htrace.TraceInfo; import org.codehaus.jackson.map.ObjectMapper; @@ -265,475 +257,6 @@ public abstract class RpcServer implements RpcServerInterface, */ private RSRpcServices rsRpcServices; - /** - * Datastructure that holds all necessary to a method invocation and then afterward, carries - * the result. - */ - @InterfaceStability.Evolving - @InterfaceAudience.Private - public abstract class Call implements RpcCall { - protected int id; // the client's call id - protected BlockingService service; - protected MethodDescriptor md; - protected RequestHeader header; - protected Message param; // the parameter passed - // Optional cell data passed outside of protobufs. - protected CellScanner cellScanner; - protected Connection connection; // connection to client - protected long timestamp; // the time received when response is null - // the time served when response is not null - protected int timeout; - protected long startTime; - protected long deadline;// the deadline to handle this call, if exceed we can drop it. - - /** - * Chain of buffers to send as response. - */ - protected BufferChain response; - - protected long size; // size of current call - protected boolean isError; - protected TraceInfo tinfo; - protected ByteBufferListOutputStream cellBlockStream = null; - protected CallCleanup reqCleanup = null; - - protected User user; - protected InetAddress remoteAddress; - protected RpcCallback rpcCallback; - - private long responseCellSize = 0; - private long responseBlockSize = 0; - // cumulative size of serialized exceptions - private long exceptionSize = 0; - private boolean retryImmediatelySupported; - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", - justification="Can't figure why this complaint is happening... see below") - Call(int id, final BlockingService service, final MethodDescriptor md, - RequestHeader header, Message param, CellScanner cellScanner, - Connection connection, long size, TraceInfo tinfo, - final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { - this.id = id; - this.service = service; - this.md = md; - this.header = header; - this.param = param; - this.cellScanner = cellScanner; - this.connection = connection; - this.timestamp = System.currentTimeMillis(); - this.response = null; - this.isError = false; - this.size = size; - this.tinfo = tinfo; - this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH - this.remoteAddress = remoteAddress; - this.retryImmediatelySupported = - connection == null? null: connection.retryImmediatelySupported; - this.timeout = timeout; - this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE; - this.reqCleanup = reqCleanup; - } - - /** - * Call is done. Execution happened and we returned results to client. It is - * now safe to cleanup. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", - justification = "Presume the lock on processing request held by caller is protection enough") - void done() { - if (this.cellBlockStream != null) { - // This will return back the BBs which we got from pool. - this.cellBlockStream.releaseResources(); - this.cellBlockStream = null; - } - // If the call was run successfuly, we might have already returned the BB - // back to pool. No worries..Then inputCellBlock will be null - cleanup(); - } - - @Override - public void cleanup() { - if (this.reqCleanup != null) { - this.reqCleanup.run(); - this.reqCleanup = null; - } - } - - @Override - public String toString() { - return toShortString() + " param: " + - (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + - " connection: " + connection.toString(); - } - - @Override - public RequestHeader getHeader() { - return this.header; - } - - @Override - public int getPriority() { - return this.header.getPriority(); - } - - /* - * Short string representation without param info because param itself could be huge depends on - * the payload of a command - */ - @Override - public String toShortString() { - String serviceName = this.connection.service != null ? - this.connection.service.getDescriptorForType().getName() : "null"; - return "callId: " + this.id + " service: " + serviceName + - " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + - " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + - " connection: " + connection.toString() + - " deadline: " + deadline; - } - - protected synchronized void setSaslTokenResponse(ByteBuffer response) { - ByteBuffer[] responseBufs = new ByteBuffer[1]; - responseBufs[0] = response; - this.response = new BufferChain(responseBufs); - } - - protected synchronized void setConnectionHeaderResponse(ByteBuffer response) { - ByteBuffer[] responseBufs = new ByteBuffer[1]; - responseBufs[0] = response; - this.response = new BufferChain(responseBufs); - } - - @Override - public synchronized void setResponse(Message m, final CellScanner cells, - Throwable t, String errorMsg) { - if (this.isError) return; - if (t != null) this.isError = true; - BufferChain bc = null; - try { - ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); - // Call id. - headerBuilder.setCallId(this.id); - if (t != null) { - setExceptionResponse(t, errorMsg, headerBuilder); - } - // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the - // reservoir when finished. This is hacky and the hack is not contained but benefits are - // high when we can avoid a big buffer allocation on each rpc. - List cellBlock = null; - int cellBlockSize = 0; - if (reservoir != null) { - this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec, - this.connection.compressionCodec, cells, reservoir); - if (this.cellBlockStream != null) { - cellBlock = this.cellBlockStream.getByteBuffers(); - cellBlockSize = this.cellBlockStream.size(); - } - } else { - ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec, - this.connection.compressionCodec, cells); - if (b != null) { - cellBlockSize = b.remaining(); - cellBlock = new ArrayList<>(1); - cellBlock.add(b); - } - } - - if (cellBlockSize > 0) { - CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); - // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. - cellBlockBuilder.setLength(cellBlockSize); - headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); - } - Message header = headerBuilder.build(); - ByteBuffer headerBuf = - createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); - ByteBuffer[] responseBufs = null; - int cellBlockBufferSize = 0; - if (cellBlock != null) { - cellBlockBufferSize = cellBlock.size(); - responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; - } else { - responseBufs = new ByteBuffer[1]; - } - responseBufs[0] = headerBuf; - if (cellBlock != null) { - for (int i = 0; i < cellBlockBufferSize; i++) { - responseBufs[i + 1] = cellBlock.get(i); - } - } - bc = new BufferChain(responseBufs); - if (connection.useWrap) { - bc = wrapWithSasl(bc); - } - } catch (IOException e) { - LOG.warn("Exception while creating response " + e); - } - this.response = bc; - // Once a response message is created and set to this.response, this Call can be treated as - // done. The Responder thread will do the n/w write of this message back to client. - if (this.rpcCallback != null) { - try { - this.rpcCallback.run(); - } catch (Exception e) { - // Don't allow any exception here to kill this handler thread. - LOG.warn("Exception while running the Rpc Callback.", e); - } - } - } - - protected void setExceptionResponse(Throwable t, String errorMsg, - ResponseHeader.Builder headerBuilder) { - ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); - exceptionBuilder.setExceptionClassName(t.getClass().getName()); - exceptionBuilder.setStackTrace(errorMsg); - exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); - if (t instanceof RegionMovedException) { - // Special casing for this exception. This is only one carrying a payload. - // Do this instead of build a generic system for allowing exceptions carry - // any kind of payload. - RegionMovedException rme = (RegionMovedException)t; - exceptionBuilder.setHostname(rme.getHostname()); - exceptionBuilder.setPort(rme.getPort()); - } - // Set the exception as the result of the method invocation. - headerBuilder.setException(exceptionBuilder.build()); - } - - protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header, - int cellBlockSize, List cellBlock) throws IOException { - // Organize the response as a set of bytebuffers rather than collect it all together inside - // one big byte array; save on allocations. - // for writing the header, we check if there is available space in the buffers - // created for the cellblock itself. If there is space for the header, we reuse - // the last buffer in the cellblock. This applies to the cellblock created from the - // pool or even the onheap cellblock buffer in case there is no pool enabled. - // Possible reuse would avoid creating a temporary array for storing the header every time. - ByteBuffer possiblePBBuf = - (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; - int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, - resultVintSize = 0; - if (header != null) { - headerSerializedSize = header.getSerializedSize(); - headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize); - } - if (result != null) { - resultSerializedSize = result.getSerializedSize(); - resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize); - } - // calculate the total size - int totalSize = headerSerializedSize + headerVintSize - + (resultSerializedSize + resultVintSize) - + cellBlockSize; - int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize - + resultVintSize + Bytes.SIZEOF_INT; - // Only if the last buffer has enough space for header use it. Else allocate - // a new buffer. Assume they are all flipped - if (possiblePBBuf != null - && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { - // duplicate the buffer. This is where the header is going to be written - ByteBuffer pbBuf = possiblePBBuf.duplicate(); - // get the current limit - int limit = pbBuf.limit(); - // Position such that we write the header to the end of the buffer - pbBuf.position(limit); - // limit to the header size - pbBuf.limit(totalPBSize + limit); - // mark the current position - pbBuf.mark(); - writeToCOS(result, header, totalSize, pbBuf); - // reset the buffer back to old position - pbBuf.reset(); - return pbBuf; - } else { - return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); - } - } - - private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) - throws IOException { - ByteBufferUtils.putInt(pbBuf, totalSize); - // create COS that works on BB - CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); - if (header != null) { - cos.writeMessageNoTag(header); - } - if (result != null) { - cos.writeMessageNoTag(result); - } - cos.flush(); - cos.checkNoSpaceLeft(); - } - - private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, - int totalSize, int totalPBSize) throws IOException { - ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); - writeToCOS(result, header, totalSize, pbBuf); - pbBuf.flip(); - return pbBuf; - } - - protected BufferChain wrapWithSasl(BufferChain bc) - throws IOException { - if (!this.connection.useSasl) return bc; - // Looks like no way around this; saslserver wants a byte array. I have to make it one. - // THIS IS A BIG UGLY COPY. - byte [] responseBytes = bc.getBytes(); - byte [] token; - // synchronization may be needed since there can be multiple Handler - // threads using saslServer or Crypto AES to wrap responses. - if (connection.useCryptoAesWrap) { - // wrap with Crypto AES - synchronized (connection.cryptoAES) { - token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length); - } - } else { - synchronized (connection.saslServer) { - token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Adding saslServer wrapped token of size " + token.length - + " as call response."); - } - - ByteBuffer[] responseBufs = new ByteBuffer[2]; - responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length)); - responseBufs[1] = ByteBuffer.wrap(token); - return new BufferChain(responseBufs); - } - - @Override - public boolean isClientCellBlockSupported() { - return this.connection != null && this.connection.codec != null; - } - - @Override - public long getResponseCellSize() { - return responseCellSize; - } - - @Override - public void incrementResponseCellSize(long cellSize) { - responseCellSize += cellSize; - } - - @Override - public long getResponseBlockSize() { - return responseBlockSize; - } - - @Override - public void incrementResponseBlockSize(long blockSize) { - responseBlockSize += blockSize; - } - - @Override - public long getResponseExceptionSize() { - return exceptionSize; - } - @Override - public void incrementResponseExceptionSize(long exSize) { - exceptionSize += exSize; - } - - @Override - public long getSize() { - return this.size; - } - - @Override - public long getDeadline() { - return deadline; - } - - @Override - public User getRequestUser() { - return user; - } - - @Override - public String getRequestUserName() { - User user = getRequestUser(); - return user == null? null: user.getShortName(); - } - - @Override - public InetAddress getRemoteAddress() { - return remoteAddress; - } - - @Override - public VersionInfo getClientVersionInfo() { - return connection.getVersionInfo(); - } - - @Override - public synchronized void setCallBack(RpcCallback callback) { - this.rpcCallback = callback; - } - - @Override - public boolean isRetryImmediatelySupported() { - return retryImmediatelySupported; - } - - @Override - public BlockingService getService() { - return service; - } - - @Override - public MethodDescriptor getMethod() { - return md; - } - - @Override - public Message getParam() { - return param; - } - - @Override - public CellScanner getCellScanner() { - return cellScanner; - } - - @Override - public long getReceiveTime() { - return timestamp; - } - - @Override - public void setReceiveTime(long t) { - this.timestamp = t; - } - - @Override - public long getStartTime() { - return startTime; - } - - @Override - public void setStartTime(long t) { - this.startTime = t; - } - - @Override - public int getTimeout() { - return timeout; - } - - @Override - public int getRemotePort() { - return connection.getRemotePort(); - } - - @Override - public TraceInfo getTraceInfo() { - return tinfo; - } - - } - @FunctionalInterface protected static interface CallCleanup { void run(); @@ -781,15 +304,15 @@ public abstract class RpcServer implements RpcServerInterface, protected boolean useCryptoAesWrap = false; // Fake 'call' for failed authorization response protected static final int AUTHORIZATION_FAILED_CALLID = -1; - protected Call authFailedCall; + protected ServerCall authFailedCall; protected ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); // Fake 'call' for SASL context setup protected static final int SASL_CALLID = -33; - protected Call saslCall; + protected ServerCall saslCall; // Fake 'call' for connection header response protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34; - protected Call setConnectionHeaderResponseCall; + protected ServerCall setConnectionHeaderResponseCall; // was authentication allowed with a fallback to simple auth protected boolean authenticatedWithFallback; @@ -1366,7 +889,7 @@ public abstract class RpcServer implements RpcServerInterface, // This is a bit late to be doing this check - we have already read in the // total request. if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) { - final RpcServer.Call callTooBig = createCall(id, this.service, null, + final ServerCall callTooBig = createCall(id, this.service, null, null, null, null, this, totalRequestSize, null, null, 0, this.callCleanup); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); @@ -1430,7 +953,7 @@ public abstract class RpcServer implements RpcServerInterface, t = new DoNotRetryIOException(t); } - final RpcServer.Call readParamsFailedCall = createCall(id, + final ServerCall readParamsFailedCall = createCall(id, this.service, null, null, null, null, this, totalRequestSize, null, null, 0, this.callCleanup); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); @@ -1447,7 +970,7 @@ public abstract class RpcServer implements RpcServerInterface, if (header.hasTimeout() && header.getTimeout() > 0) { timeout = Math.max(minClientRequestTimeout, header.getTimeout()); } - RpcServer.Call call = createCall(id, this.service, md, header, param, + ServerCall call = createCall(id, this.service, md, header, param, cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout, this.callCleanup); @@ -1465,7 +988,7 @@ public abstract class RpcServer implements RpcServerInterface, public abstract boolean isConnectionOpen(); - public abstract Call createCall(int id, final BlockingService service, + public abstract ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout, @@ -1594,14 +1117,13 @@ public abstract class RpcServer implements RpcServerInterface, /** * Setup response for the RPC Call. - * * @param response buffer to serialize the response into - * @param call {@link Call} to which we are setting up the response + * @param call {@link ServerCall} to which we are setting up the response * @param error error message, if the call failed * @throws IOException */ - protected void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error) - throws IOException { + protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t, + String error) throws IOException { if (response != null) response.reset(); call.setResponse(null, null, t, error); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java new file mode 100644 index 0000000..9294839 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -0,0 +1,527 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.ipc.RpcServer.Connection; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.TraceInfo; + +/** + * Datastructure that holds all necessary to a method invocation and then afterward, carries + * the result. + */ +@InterfaceAudience.Private +abstract class ServerCall implements RpcCall { + + protected final int id; // the client's call id + protected final BlockingService service; + protected final MethodDescriptor md; + protected final RequestHeader header; + protected Message param; // the parameter passed + // Optional cell data passed outside of protobufs. + protected final CellScanner cellScanner; + protected final Connection connection; // connection to client + protected final long receiveTime; // the time received when response is null + // the time served when response is not null + protected final int timeout; + protected long startTime; + protected final long deadline;// the deadline to handle this call, if exceed we can drop it. + + protected final ByteBufferPool reservoir; + + protected final CellBlockBuilder cellBlockBuilder; + + /** + * Chain of buffers to send as response. + */ + protected BufferChain response; + + protected final long size; // size of current call + protected boolean isError; + protected final TraceInfo tinfo; + protected ByteBufferListOutputStream cellBlockStream = null; + protected CallCleanup reqCleanup = null; + + protected User user; + protected final InetAddress remoteAddress; + protected RpcCallback rpcCallback; + + private long responseCellSize = 0; + private long responseBlockSize = 0; + // cumulative size of serialized exceptions + private long exceptionSize = 0; + private final boolean retryImmediatelySupported; + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification="Can't figure why this complaint is happening... see below") + ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, + Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo, + InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, + CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { + this.id = id; + this.service = service; + this.md = md; + this.header = header; + this.param = param; + this.cellScanner = cellScanner; + this.connection = connection; + this.receiveTime = receiveTime; + this.response = null; + this.isError = false; + this.size = size; + this.tinfo = tinfo; + this.user = connection == null ? null : connection.user; // FindBugs: NP_NULL_ON_SOME_PATH + this.remoteAddress = remoteAddress; + this.retryImmediatelySupported = + connection == null ? false : connection.retryImmediatelySupported; + this.timeout = timeout; + this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; + this.reservoir = reservoir; + this.cellBlockBuilder = cellBlockBuilder; + this.reqCleanup = reqCleanup; + } + + /** + * Call is done. Execution happened and we returned results to client. It is + * now safe to cleanup. + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "Presume the lock on processing request held by caller is protection enough") + void done() { + if (this.cellBlockStream != null) { + // This will return back the BBs which we got from pool. + this.cellBlockStream.releaseResources(); + this.cellBlockStream = null; + } + // If the call was run successfuly, we might have already returned the BB + // back to pool. No worries..Then inputCellBlock will be null + cleanup(); + } + + @Override + public void cleanup() { + if (this.reqCleanup != null) { + this.reqCleanup.run(); + this.reqCleanup = null; + } + } + + @Override + public String toString() { + return toShortString() + " param: " + + (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + + " connection: " + connection.toString(); + } + + @Override + public RequestHeader getHeader() { + return this.header; + } + + @Override + public int getPriority() { + return this.header.getPriority(); + } + + /* + * Short string representation without param info because param itself could be huge depends on + * the payload of a command + */ + @Override + public String toShortString() { + String serviceName = this.connection.service != null ? + this.connection.service.getDescriptorForType().getName() : "null"; + return "callId: " + this.id + " service: " + serviceName + + " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") + + " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) + + " connection: " + connection.toString() + + " deadline: " + deadline; + } + + protected synchronized void setSaslTokenResponse(ByteBuffer response) { + ByteBuffer[] responseBufs = new ByteBuffer[1]; + responseBufs[0] = response; + this.response = new BufferChain(responseBufs); + } + + protected synchronized void setConnectionHeaderResponse(ByteBuffer response) { + ByteBuffer[] responseBufs = new ByteBuffer[1]; + responseBufs[0] = response; + this.response = new BufferChain(responseBufs); + } + + @Override + public synchronized void setResponse(Message m, final CellScanner cells, + Throwable t, String errorMsg) { + if (this.isError) return; + if (t != null) this.isError = true; + BufferChain bc = null; + try { + ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); + // Call id. + headerBuilder.setCallId(this.id); + if (t != null) { + setExceptionResponse(t, errorMsg, headerBuilder); + } + // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the + // reservoir when finished. This is hacky and the hack is not contained but benefits are + // high when we can avoid a big buffer allocation on each rpc. + List cellBlock = null; + int cellBlockSize = 0; + if (this.reservoir != null) { + this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, + this.connection.compressionCodec, cells, this.reservoir); + if (this.cellBlockStream != null) { + cellBlock = this.cellBlockStream.getByteBuffers(); + cellBlockSize = this.cellBlockStream.size(); + } + } else { + ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec, + this.connection.compressionCodec, cells); + if (b != null) { + cellBlockSize = b.remaining(); + cellBlock = new ArrayList<>(1); + cellBlock.add(b); + } + } + + if (cellBlockSize > 0) { + CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); + // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it. + cellBlockBuilder.setLength(cellBlockSize); + headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); + } + Message header = headerBuilder.build(); + ByteBuffer headerBuf = + createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock); + ByteBuffer[] responseBufs = null; + int cellBlockBufferSize = 0; + if (cellBlock != null) { + cellBlockBufferSize = cellBlock.size(); + responseBufs = new ByteBuffer[1 + cellBlockBufferSize]; + } else { + responseBufs = new ByteBuffer[1]; + } + responseBufs[0] = headerBuf; + if (cellBlock != null) { + for (int i = 0; i < cellBlockBufferSize; i++) { + responseBufs[i + 1] = cellBlock.get(i); + } + } + bc = new BufferChain(responseBufs); + if (connection.useWrap) { + bc = wrapWithSasl(bc); + } + } catch (IOException e) { + RpcServer.LOG.warn("Exception while creating response " + e); + } + this.response = bc; + // Once a response message is created and set to this.response, this Call can be treated as + // done. The Responder thread will do the n/w write of this message back to client. + if (this.rpcCallback != null) { + try { + this.rpcCallback.run(); + } catch (Exception e) { + // Don't allow any exception here to kill this handler thread. + RpcServer.LOG.warn("Exception while running the Rpc Callback.", e); + } + } + } + + protected void setExceptionResponse(Throwable t, String errorMsg, + ResponseHeader.Builder headerBuilder) { + ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); + exceptionBuilder.setExceptionClassName(t.getClass().getName()); + exceptionBuilder.setStackTrace(errorMsg); + exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException); + if (t instanceof RegionMovedException) { + // Special casing for this exception. This is only one carrying a payload. + // Do this instead of build a generic system for allowing exceptions carry + // any kind of payload. + RegionMovedException rme = (RegionMovedException)t; + exceptionBuilder.setHostname(rme.getHostname()); + exceptionBuilder.setPort(rme.getPort()); + } + // Set the exception as the result of the method invocation. + headerBuilder.setException(exceptionBuilder.build()); + } + + protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int cellBlockSize, List cellBlock) throws IOException { + // Organize the response as a set of bytebuffers rather than collect it all together inside + // one big byte array; save on allocations. + // for writing the header, we check if there is available space in the buffers + // created for the cellblock itself. If there is space for the header, we reuse + // the last buffer in the cellblock. This applies to the cellblock created from the + // pool or even the onheap cellblock buffer in case there is no pool enabled. + // Possible reuse would avoid creating a temporary array for storing the header every time. + ByteBuffer possiblePBBuf = + (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null; + int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0, + resultVintSize = 0; + if (header != null) { + headerSerializedSize = header.getSerializedSize(); + headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize); + } + if (result != null) { + resultSerializedSize = result.getSerializedSize(); + resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize); + } + // calculate the total size + int totalSize = headerSerializedSize + headerVintSize + + (resultSerializedSize + resultVintSize) + + cellBlockSize; + int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize + + resultVintSize + Bytes.SIZEOF_INT; + // Only if the last buffer has enough space for header use it. Else allocate + // a new buffer. Assume they are all flipped + if (possiblePBBuf != null + && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) { + // duplicate the buffer. This is where the header is going to be written + ByteBuffer pbBuf = possiblePBBuf.duplicate(); + // get the current limit + int limit = pbBuf.limit(); + // Position such that we write the header to the end of the buffer + pbBuf.position(limit); + // limit to the header size + pbBuf.limit(totalPBSize + limit); + // mark the current position + pbBuf.mark(); + writeToCOS(result, header, totalSize, pbBuf); + // reset the buffer back to old position + pbBuf.reset(); + return pbBuf; + } else { + return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize); + } + } + + private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf) + throws IOException { + ByteBufferUtils.putInt(pbBuf, totalSize); + // create COS that works on BB + CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf); + if (header != null) { + cos.writeMessageNoTag(header); + } + if (result != null) { + cos.writeMessageNoTag(result); + } + cos.flush(); + cos.checkNoSpaceLeft(); + } + + private ByteBuffer createHeaderAndMessageBytes(Message result, Message header, + int totalSize, int totalPBSize) throws IOException { + ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize); + writeToCOS(result, header, totalSize, pbBuf); + pbBuf.flip(); + return pbBuf; + } + + protected BufferChain wrapWithSasl(BufferChain bc) + throws IOException { + if (!this.connection.useSasl) return bc; + // Looks like no way around this; saslserver wants a byte array. I have to make it one. + // THIS IS A BIG UGLY COPY. + byte [] responseBytes = bc.getBytes(); + byte [] token; + // synchronization may be needed since there can be multiple Handler + // threads using saslServer or Crypto AES to wrap responses. + if (connection.useCryptoAesWrap) { + // wrap with Crypto AES + synchronized (connection.cryptoAES) { + token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length); + } + } else { + synchronized (connection.saslServer) { + token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); + } + } + if (RpcServer.LOG.isTraceEnabled()) { + RpcServer.LOG.trace("Adding saslServer wrapped token of size " + token.length + + " as call response."); + } + + ByteBuffer[] responseBufs = new ByteBuffer[2]; + responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length)); + responseBufs[1] = ByteBuffer.wrap(token); + return new BufferChain(responseBufs); + } + + @Override + public long disconnectSince() { + if (!this.connection.isConnectionOpen()) { + return System.currentTimeMillis() - receiveTime; + } else { + return -1L; + } + } + + @Override + public boolean isClientCellBlockSupported() { + return this.connection != null && this.connection.codec != null; + } + + @Override + public long getResponseCellSize() { + return responseCellSize; + } + + @Override + public void incrementResponseCellSize(long cellSize) { + responseCellSize += cellSize; + } + + @Override + public long getResponseBlockSize() { + return responseBlockSize; + } + + @Override + public void incrementResponseBlockSize(long blockSize) { + responseBlockSize += blockSize; + } + + @Override + public long getResponseExceptionSize() { + return exceptionSize; + } + @Override + public void incrementResponseExceptionSize(long exSize) { + exceptionSize += exSize; + } + + @Override + public long getSize() { + return this.size; + } + + @Override + public long getDeadline() { + return deadline; + } + + @Override + public User getRequestUser() { + return user; + } + + @Override + public String getRequestUserName() { + User user = getRequestUser(); + return user == null? null: user.getShortName(); + } + + @Override + public InetAddress getRemoteAddress() { + return remoteAddress; + } + + @Override + public VersionInfo getClientVersionInfo() { + return connection.getVersionInfo(); + } + + @Override + public synchronized void setCallBack(RpcCallback callback) { + this.rpcCallback = callback; + } + + @Override + public boolean isRetryImmediatelySupported() { + return retryImmediatelySupported; + } + + @Override + public BlockingService getService() { + return service; + } + + @Override + public MethodDescriptor getMethod() { + return md; + } + + @Override + public Message getParam() { + return param; + } + + @Override + public CellScanner getCellScanner() { + return cellScanner; + } + + @Override + public long getReceiveTime() { + return receiveTime; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public void setStartTime(long t) { + this.startTime = t; + } + + @Override + public int getTimeout() { + return timeout; + } + + @Override + public int getRemotePort() { + return connection.getRemotePort(); + } + + @Override + public TraceInfo getTraceInfo() { + return tinfo; + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index f771eec..59d1ff9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -127,61 +127,6 @@ public class SimpleRpcServer extends RpcServer { private Listener listener = null; protected Responder responder = null; - /** - * Datastructure that holds all necessary to a method invocation and then afterward, carries - * the result. - */ - @InterfaceStability.Evolving - public class Call extends RpcServer.Call { - - protected Responder responder; - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", - justification="Can't figure why this complaint is happening... see below") - Call(int id, final BlockingService service, final MethodDescriptor md, - RequestHeader header, Message param, CellScanner cellScanner, - RpcServer.Connection connection, long size, TraceInfo tinfo, - final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup, - Responder responder) { - super(id, service, md, header, param, cellScanner, connection, size, - tinfo, remoteAddress, timeout, reqCleanup); - this.responder = responder; - } - - /** - * Call is done. Execution happened and we returned results to client. It is now safe to - * cleanup. - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", - justification="Presume the lock on processing request held by caller is protection enough") - @Override - void done() { - super.done(); - this.getConnection().decRpcCount(); // Say that we're done with this call. - } - - @Override - public long disconnectSince() { - if (!getConnection().isConnectionOpen()) { - return System.currentTimeMillis() - timestamp; - } else { - return -1L; - } - } - - @Override - public synchronized void sendResponseIfReady() throws IOException { - // set param null to reduce memory pressure - this.param = null; - this.responder.doRespond(this); - } - - Connection getConnection() { - return (Connection) this.connection; - } - - } - /** Listens on the socket. Creates jobs for the handler threads*/ private class Listener extends Thread { @@ -589,8 +534,8 @@ public class SimpleRpcServer extends RpcServer { if (connection == null) { throw new IllegalStateException("Coding error: SelectionKey key without attachment."); } - Call call = connection.responseQueue.peekFirst(); - if (call != null && now > call.timestamp + purgeTimeout) { + SimpleServerCall call = connection.responseQueue.peekFirst(); + if (call != null && now > call.lastSentTime + purgeTimeout) { conWithOldCalls.add(call.getConnection()); } } @@ -637,7 +582,7 @@ public class SimpleRpcServer extends RpcServer { * @return true if we proceed the call fully, false otherwise. * @throws IOException */ - private boolean processResponse(final Call call) throws IOException { + private boolean processResponse(final SimpleServerCall call) throws IOException { boolean error = true; try { // Send as much data as we can in the non-blocking fashion @@ -680,7 +625,7 @@ public class SimpleRpcServer extends RpcServer { try { for (int i = 0; i < 20; i++) { // protection if some handlers manage to need all the responder - Call call = connection.responseQueue.pollFirst(); + SimpleServerCall call = connection.responseQueue.pollFirst(); if (call == null) { return true; } @@ -699,7 +644,7 @@ public class SimpleRpcServer extends RpcServer { // // Enqueue a response from the application. // - void doRespond(Call call) throws IOException { + void doRespond(SimpleServerCall call) throws IOException { boolean added = false; // If there is already a write in progress, we don't wait. This allows to free the handlers @@ -728,7 +673,7 @@ public class SimpleRpcServer extends RpcServer { call.responder.registerForWrite(call.getConnection()); // set the serve time when the response has to be sent later - call.timestamp = System.currentTimeMillis(); + call.lastSentTime = System.currentTimeMillis(); } } @@ -741,7 +686,7 @@ public class SimpleRpcServer extends RpcServer { protected SocketChannel channel; private ByteBuff data; private ByteBuffer dataLengthBuffer; - protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque<>(); + protected final ConcurrentLinkedDeque responseQueue = new ConcurrentLinkedDeque<>(); private final Lock responseWriteLock = new ReentrantLock(); private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs private long lastContact; @@ -769,13 +714,14 @@ public class SimpleRpcServer extends RpcServer { socketSendBufferSize); } } - this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, - 0, null, null, 0, null, responder); - this.setConnectionHeaderResponseCall = new Call( - CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, - this, 0, null, null, 0, null, responder); - this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, - null, null, null, this, 0, null, null, 0, null, responder); + this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null, + null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder); + this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID, + null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0, + reservoir, cellBlockBuilder, null, responder); + this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, + null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir, + cellBlockBuilder, null, responder); } public void setLastContact(long lastContact) { @@ -941,8 +887,9 @@ public class SimpleRpcServer extends RpcServer { RequestHeader header = (RequestHeader) builder.build(); // Notify the client about the offending request - Call reqTooBig = new Call(header.getCallId(), this.service, null, - null, null, null, this, 0, null, this.addr, 0, null, responder); + SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, + null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0, + reservoir, cellBlockBuilder, null, responder); metrics.exception(REQUEST_TOO_BIG_EXCEPTION); // Make sure the client recognizes the underlying exception // Otherwise, throw a DoNotRetryIOException. @@ -1043,8 +990,8 @@ public class SimpleRpcServer extends RpcServer { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { LOG.warn(msg); - Call fakeCall = new Call(-1, null, null, null, null, null, this, -1, - null, null, 0, null, responder); + SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1, + null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder); setupResponse(null, fakeCall, e, msg); responder.doRespond(fakeCall); // Returning -1 closes out the connection. @@ -1081,13 +1028,13 @@ public class SimpleRpcServer extends RpcServer { } @Override - public RpcServer.Call createCall(int id, final BlockingService service, - final MethodDescriptor md, RequestHeader header, Message param, - CellScanner cellScanner, RpcServer.Connection connection, long size, - TraceInfo tinfo, final InetAddress remoteAddress, int timeout, - CallCleanup reqCleanup) { - return new Call(id, service, md, header, param, cellScanner, connection, - size, tinfo, remoteAddress, timeout, reqCleanup, responder); + public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md, + RequestHeader header, Message param, CellScanner cellScanner, + RpcServer.Connection connection, long size, TraceInfo tinfo, + final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { + return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size, + tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder, + reqCleanup, responder); } } @@ -1206,17 +1153,16 @@ public class SimpleRpcServer extends RpcServer { public Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException { - return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0); + return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(), + 0); } @Override - public Pair call(BlockingService service, - MethodDescriptor md, Message param, CellScanner cellScanner, - long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) - throws IOException { - Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, - -1, null, null, timeout, null, null); - fakeCall.setReceiveTime(receiveTime); + public Pair call(BlockingService service, MethodDescriptor md, + Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, + long startTime, int timeout) throws IOException { + SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner, + null, -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null); return call(fakeCall, status); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java new file mode 100644 index 0000000..b82d348 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetAddress; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection; +import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.htrace.TraceInfo; + +/** + * Datastructure that holds all necessary to a method invocation and then afterward, carries the + * result. + */ +@InterfaceAudience.Private +class SimpleServerCall extends ServerCall { + + long lastSentTime; + + final Responder responder; + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", + justification = "Can't figure why this complaint is happening... see below") + SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md, + RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection, + long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout, + ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, + Responder responder) { + super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress, + receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); + this.responder = responder; + } + + /** + * Call is done. Execution happened and we returned results to client. It is now safe to cleanup. + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "Presume the lock on processing request held by caller is protection enough") + @Override + void done() { + super.done(); + this.getConnection().decRpcCount(); // Say that we're done with this call. + } + + @Override + public synchronized void sendResponseIfReady() throws IOException { + // set param null to reduce memory pressure + this.param = null; + this.responder.doRespond(this); + } + + Connection getConnection() { + return (Connection) this.connection; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java index 47c15ae..5128280 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java @@ -33,8 +33,8 @@ public class TestCallRunner { public void testSimpleCall() { RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class); Mockito.when(mockRpcServer.isStarted()).thenReturn(true); - RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class); - mockCall.connection = Mockito.mock(RpcServer.Connection.class); + ServerCall mockCall = Mockito.mock(ServerCall.class); + // mockCall.connection = Mockito.mock(RpcServer.Connection.class); CallRunner cr = new CallRunner(mockRpcServer, mockCall); cr.setStatus(new MonitoredRPCHandlerImpl()); cr.run(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 66b77cd..1d7c12e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -30,6 +30,11 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -44,18 +49,16 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CategoryBasedTimeout; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -63,18 +66,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestRule; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; - @Category({RPCTests.class, SmallTests.class}) public class TestSimpleRpcScheduler {/* @Rule @@ -167,7 +163,7 @@ public class TestSimpleRpcScheduler {/* } private CallRunner createMockTask() { - Call call = mock(Call.class); + ServerCall call = mock(ServerCall.class); CallRunner task = mock(CallRunner.class); when(task.getRpcCall()).thenReturn(call); return task; @@ -195,19 +191,19 @@ public class TestSimpleRpcScheduler {/* scheduler.start(); CallRunner smallCallTask = mock(CallRunner.class); - RpcServer.Call smallCall = mock(RpcServer.Call.class); + ServerCall smallCall = mock(ServerCall.class); RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build(); when(smallCallTask.getRpcCall()).thenReturn(smallCall); when(smallCall.getHeader()).thenReturn(smallHead); CallRunner largeCallTask = mock(CallRunner.class); - RpcServer.Call largeCall = mock(RpcServer.Call.class); + ServerCall largeCall = mock(ServerCall.class); RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build(); when(largeCallTask.getRpcCall()).thenReturn(largeCall); when(largeCall.getHeader()).thenReturn(largeHead); CallRunner hugeCallTask = mock(CallRunner.class); - RpcServer.Call hugeCall = mock(RpcServer.Call.class); + ServerCall hugeCall = mock(ServerCall.class); RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build(); when(hugeCallTask.getRpcCall()).thenReturn(hugeCall); when(hugeCall.getHeader()).thenReturn(hugeHead); @@ -290,7 +286,7 @@ public class TestSimpleRpcScheduler {/* scheduler.start(); CallRunner putCallTask = mock(CallRunner.class); - RpcServer.Call putCall = mock(RpcServer.Call.class); + ServerCall putCall = mock(ServerCall.class); putCall.param = RequestConverter.buildMutateRequest( Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); @@ -299,13 +295,13 @@ public class TestSimpleRpcScheduler {/* when(putCall.getParam()).thenReturn(putCall.param); CallRunner getCallTask = mock(CallRunner.class); - RpcServer.Call getCall = mock(RpcServer.Call.class); + ServerCall getCall = mock(ServerCall.class); RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build(); when(getCallTask.getRpcCall()).thenReturn(getCall); when(getCall.getHeader()).thenReturn(getHead); CallRunner scanCallTask = mock(CallRunner.class); - RpcServer.Call scanCall = mock(RpcServer.Call.class); + ServerCall scanCall = mock(ServerCall.class); scanCall.param = ScanRequest.newBuilder().setScannerId(1).build(); RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build(); when(scanCallTask.getRpcCall()).thenReturn(scanCall); @@ -382,7 +378,7 @@ public class TestSimpleRpcScheduler {/* scheduler.start(); CallRunner putCallTask = mock(CallRunner.class); - RpcServer.Call putCall = mock(RpcServer.Call.class); + ServerCall putCall = mock(ServerCall.class); putCall.param = RequestConverter.buildMutateRequest( Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); @@ -506,19 +502,15 @@ public class TestSimpleRpcScheduler {/* // Get mocked call that has the CallRunner sleep for a while so that the fast // path isn't hit. private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { - final RpcServer.Call putCall = mock(RpcServer.Call.class); - - putCall.timestamp = timestamp; - putCall.param = RequestConverter.buildMutateRequest( - Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); + ServerCall putCall = new ServerCall(1, null, null, + RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(), + RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))), + null, null, 9, null, null, timestamp, 0, null, null, null) { - RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder() - .setMethodName("mutate") - .build(); - when(putCall.getSize()).thenReturn(9L); - when(putCall.getHeader()).thenReturn(putHead); - when(putCall.getReceiveTime()).thenReturn(putCall.timestamp); - when(putCall.getParam()).thenReturn(putCall.param); + @Override + public void sendResponseIfReady() throws IOException { + } + }; CallRunner cr = new CallRunner(null, putCall) { public void run() { @@ -530,11 +522,13 @@ public class TestSimpleRpcScheduler {/* } catch (InterruptedException e) { } } + public RpcCall getRpcCall() { return putCall; } - public void drop() {} + public void drop() { + } }; return cr; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java index 85a14f2..a31b7b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java @@ -28,6 +28,8 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; +import com.google.common.collect.Lists; + import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -57,8 +59,8 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcServer; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -75,10 +77,8 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; -import com.google.common.collect.Lists; - @RunWith(Parameterized.class) -@Category({ SecurityTests.class, SmallTests.class }) +@Category({ SecurityTests.class, MediumTests.class }) public class TestSecureIPC { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); -- 2.7.4