diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index d570b17..9e6b987 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -118,8 +118,7 @@ public class CallRunner { traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); } // make the call - resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner, - call.timestamp, this.status, call.startTime, call.timeout); + resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); return; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 49c7f8a..242e3ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -325,9 +325,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. */ - @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) - @InterfaceStability.Evolving - public class Call implements RpcCallContext { + @InterfaceAudience.Private + public class Call implements RpcServerCallInterface { protected int id; // the client's call id protected BlockingService service; protected MethodDescriptor md; @@ -749,6 +748,58 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { public boolean isRetryImmediatelySupported() { return retryImmediatelySupported; } + + @Override + public BlockingService getService() { + return service; + } + + @Override + public MethodDescriptor getMethod() { + return md; + } + + @Override + public Message getParam() { + return param; + } + + @Override + public CellScanner getCellScanner() { + return cellScanner; + } + + @Override + public long getReceiveTime() { + return timestamp; + } + + @Override + public void setReceiveTime(long t) { + this.timestamp = t; + + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public void setStartTime(long t) { + this.startTime = t; + + } + + @Override + public int getTimeout() { + return timeout; + } + + @Override + public RequestHeader getRequestHeader() { + return header; + }; } @FunctionalInterface @@ -2563,24 +2614,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0); } + public Pair call(BlockingService service, MethodDescriptor md, Message param, + CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, + int timeout) + throws IOException { + Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, null, -1, null, null, timeout, + null); + fakeCall.setReceiveTime(receiveTime); + return call(fakeCall, status); + } + /** * This is a server side method, which is invoked over RPC. On success * the return response has protobuf response payload. On failure, the * exception name and the stack trace are returned in the protobuf response. */ - public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, - long startTime, int timeout) + public Pair call(RpcServerCallInterface call, MonitoredRPCHandler status) throws IOException { try { - status.setRPC(md.getName(), new Object[]{param}, receiveTime); + MethodDescriptor md = call.getMethod(); + Message param = call.getParam(); + status.setRPC(md.getName(), new Object[]{param}, + call.getReceiveTime()); // TODO: Review after we add in encoded data blocks. status.setRPCPacket(param); status.resume("Servicing call"); //get an instance of the method arg type - HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner); - controller.setCallTimeout(timeout); - Message result = service.callBlockingMethod(md, controller, param); + HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner()); + controller.setCallTimeout(call.getTimeout()); + Message result = call.getService().callBlockingMethod(md, controller, param); + long receiveTime = call.getReceiveTime(); + long startTime = call.getStartTime(); long endTime = System.currentTimeMillis(); int processingTime = (int) (endTime - startTime); int qTime = (int) (startTime - receiveTime); @@ -2593,7 +2657,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { " totalTime: " + totalTime); } long requestSize = param.getSerializedSize(); - long responseSize = result.getSerializedSize(); + // Support including the payload size in HBaseRpcController + long responseSize = result.getSerializedSize() + call.getResponseCellSize(); + metrics.dequeuedCall(qTime); metrics.processedCall(processingTime); metrics.totalCall(totalTime); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerCallInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerCallInterface.java new file mode 100644 index 0000000..78be749 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerCallInterface.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; + +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) +@InterfaceStability.Evolving +public interface RpcServerCallInterface extends RpcCallContext { + + BlockingService getService(); + + MethodDescriptor getMethod(); + + Message getParam(); + + CellScanner getCellScanner(); + + long getReceiveTime(); + + void setReceiveTime(long receiveTime); + + long getStartTime(); + + void setStartTime(long startTime); + + int getTimeout(); + + int getPriority(); + + RequestHeader getRequestHeader(); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java index 5401e3f..dbee4cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java @@ -55,11 +55,18 @@ public interface RpcServerInterface { @Deprecated Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException, ServiceException; + throws IOException; + /** + * @deprecated As of release 2.0, this will be removed in HBase 3.0 + */ + @Deprecated Pair call(BlockingService service, MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, - int timeout) throws IOException, ServiceException; + int timeout) throws IOException; + + Pair call(RpcServerCallInterface call, MonitoredRPCHandler status) + throws IOException; void setErrorHandler(HBaseRPCErrorHandler handler); HBaseRPCErrorHandler getErrorHandler();