From d862361aba753571a9a6f813e3c2c8c1f8eeadbb Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 31 Aug 2016 22:48:03 +0800 Subject: [PATCH] HBASE-16516 Revisit the implementation of PayloadCarryingRpcController --- .../hadoop/hbase/client/FlushRegionCallable.java | 4 +- .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 22 +- .../org/apache/hadoop/hbase/client/HTable.java | 8 +- .../apache/hadoop/hbase/client/MasterCallable.java | 6 +- .../hadoop/hbase/client/MultiServerCallable.java | 2 +- .../hbase/client/NoncedRegionServerCallable.java | 10 +- .../hbase/client/RegionAdminServiceCallable.java | 8 +- .../hadoop/hbase/client/RegionServerCallable.java | 10 +- .../client/RpcRetryingCallerWithReadReplicas.java | 6 +- .../hadoop/hbase/client/ScannerCallable.java | 2 +- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 18 +- .../apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 2 +- .../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 10 +- .../hbase/ipc/DelegatingHBaseRpcController.java | 127 ++++++++++++ .../DelegatingPayloadCarryingRpcController.java | 60 ------ .../hadoop/hbase/ipc/HBaseRpcController.java | 96 +++++++++ .../hadoop/hbase/ipc/HBaseRpcControllerImpl.java | 226 +++++++++++++++++++++ .../hbase/ipc/PayloadCarryingRpcController.java | 214 ------------------- .../org/apache/hadoop/hbase/ipc/RpcClientImpl.java | 8 +- .../hadoop/hbase/ipc/RpcControllerFactory.java | 14 +- .../hbase/security/access/AccessControlClient.java | 16 +- .../hadoop/hbase/zookeeper/MetaTableLocator.java | 4 +- .../hadoop/hbase/client/TestSnapshotFromAdmin.java | 6 +- .../hbase/ipc/TestHBaseRpcControllerImpl.java | 195 ++++++++++++++++++ .../ipc/TestPayloadCarryingRpcController.java | 194 ------------------ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 2 +- .../apache/hadoop/hbase/master/ServerManager.java | 14 +- .../hbase/protobuf/ReplicationProtbufUtil.java | 9 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 24 +-- .../RegionReplicaReplicationEndpoint.java | 4 +- .../hbase/TestMetaTableAccessorNoCluster.java | 4 +- .../apache/hadoop/hbase/TestMetaTableLocator.java | 4 +- .../hbase/client/TestHBaseAdminNoCluster.java | 4 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 4 +- .../hbase/client/TestRpcControllerFactory.java | 14 +- .../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 9 +- .../hbase/ipc/TestProtobufRpcServiceImpl.java | 6 +- .../hadoop/hbase/master/MockRegionServer.java | 4 +- .../regionserver/TestEndToEndSplitTransaction.java | 12 +- 39 files changed, 779 insertions(+), 603 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java delete mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java index c7bf804..c612e0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java @@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -65,7 +65,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable getOnlineRegions(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.connection.getAdmin(sn); // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); return ProtobufUtil.getOnlineRegions(controller, admin); } @@ -1094,7 +1094,7 @@ public class HBaseAdmin implements Admin { @Override public Void call() throws Exception { // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); FlushRegionRequest request = RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); admin.flushRegion(controller, request); @@ -1257,7 +1257,7 @@ public class HBaseAdmin implements Admin { @Override public Void call() throws Exception { // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); admin.compactRegion(controller, request); @@ -1649,7 +1649,7 @@ public class HBaseAdmin implements Admin { throw new IOException("should not give a splitkey which equals to startkey!"); } // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(hri.getTable()); // TODO: this does not do retries, it should. Set priority and timeout in controller @@ -1837,7 +1837,7 @@ public class HBaseAdmin implements Admin { final AdminService.BlockingInterface admin = this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); controller.setPriority(HConstants.HIGH_QOS); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); @@ -2191,7 +2191,7 @@ public class HBaseAdmin implements Admin { final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); try { return admin.rollWALWriter(controller, request); } catch (ServiceException e) { @@ -2272,7 +2272,7 @@ public class HBaseAdmin implements Admin { ServerName sn = regionServerPair.getSecond(); final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); // TODO: There is no timeout on this controller. Set one! - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + HBaseRpcController controller = rpcControllerFactory.newController(); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); GetRegionInfoResponse response; @@ -3034,7 +3034,7 @@ public class HBaseAdmin implements Admin { AdminProtos.GetRegionInfoResponse.CompactionState.NONE; checkTableExists(tableName); // TODO: There is no timeout on this controller. Set one! - final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController(); + final HBaseRpcController rpcController = rpcControllerFactory.newController(); switch (compactType) { case MOB: final AdminProtos.AdminService.BlockingInterface masterAdmin = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index d2423b3..52cc610 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -616,7 +616,7 @@ public class HTable implements Table { new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), append.getRow()) { @Override - protected Result call(PayloadCarryingRpcController controller) throws Exception { + protected Result call(HBaseRpcController controller) throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); MutateResponse response = getStub().mutate(controller, request); @@ -638,7 +638,7 @@ public class HTable implements Table { new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), increment.getRow()) { @Override - protected Result call(PayloadCarryingRpcController controller) throws Exception { + protected Result call(HBaseRpcController controller) throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); MutateResponse response = getStub().mutate(controller, request); @@ -684,7 +684,7 @@ public class HTable implements Table { new NoncedRegionServerCallable(this.connection, this.rpcControllerFactory, getName(), row) { @Override - protected Long call(PayloadCarryingRpcController controller) throws Exception { + protected Long call(HBaseRpcController controller) throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, amount, durability, getNonceGroup(), getNonce()); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java index e279a39..5db0546 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes; abstract class MasterCallable implements RetryingCallable, Closeable { protected final ClusterConnection connection; protected MasterKeepAliveConnection master; - private final PayloadCarryingRpcController rpcController; + private final HBaseRpcController rpcController; MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) { this.connection = (ClusterConnection) connection; @@ -111,7 +111,7 @@ abstract class MasterCallable implements RetryingCallable, Closeable { */ protected abstract V rpcCall() throws Exception; - PayloadCarryingRpcController getRpcController() { + HBaseRpcController getRpcController() { return this.rpcController; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 1ce4aab..3ef97e78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 8fbaa90..7c98861 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @InterfaceAudience.Private public abstract class NoncedRegionServerCallable extends AbstractRegionServerCallable { private ClientService.BlockingInterface stub; - private final PayloadCarryingRpcController rpcController; + private final HBaseRpcController rpcController; private final long nonce; /** @@ -59,7 +59,7 @@ public abstract class NoncedRegionServerCallable extends AbstractRegionServer this(connection, rpcControllerFactory.newController(), tableName, row); } - public NoncedRegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController, + public NoncedRegionServerCallable(Connection connection, HBaseRpcController rpcController, TableName tableName, byte [] row) { super(connection, tableName, row); this.rpcController = rpcController; @@ -111,9 +111,9 @@ public abstract class NoncedRegionServerCallable extends AbstractRegionServer * class. * @throws Exception */ - protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; + protected abstract T call(HBaseRpcController rpcController) throws Exception; - public PayloadCarryingRpcController getRpcController() { + public HBaseRpcController getRpcController() { return this.rpcController; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java index 4e347dd..9b3f6ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes; public abstract class RegionAdminServiceCallable implements RetryingCallable { protected AdminService.BlockingInterface stub; protected final RpcControllerFactory rpcControllerFactory; - private PayloadCarryingRpcController controller = null; + private HBaseRpcController controller = null; protected final ClusterConnection connection; protected HRegionLocation location; @@ -186,7 +186,7 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< } } - PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() { + HBaseRpcController getCurrentPayloadCarryingRpcController() { return this.controller; } @@ -197,5 +197,5 @@ public abstract class RegionAdminServiceCallable implements RetryingCallable< * class. * @throws Exception */ - protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; + protected abstract T call(HBaseRpcController rpcController) throws Exception; } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index baf99a0..6a02e18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; @@ -103,8 +103,8 @@ public abstract class RegionServerCallable extends AbstractRegionServerCallab if (this.rpcController != null) { // Do a reset to clear previous states, such as CellScanner. this.rpcController.reset(); - if (this.rpcController instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)this.rpcController; + if (this.rpcController instanceof HBaseRpcController) { + HBaseRpcController pcrc = (HBaseRpcController)this.rpcController; // If it is an instance of PayloadCarryingRpcController, we can set priority on the // controller based off the tableName. RpcController may be null in tests when mocking so allow // for null controller. @@ -141,10 +141,10 @@ public abstract class RegionServerCallable extends AbstractRegionServerCallab * a Coproccessor Endpoint context. Should never happen. */ protected CellScanner getRpcControllerCellScanner() { - return ((PayloadCarryingRpcController)this.rpcController).cellScanner(); + return ((HBaseRpcController)this.rpcController).cellScanner(); } protected void setRpcControllerCellScanner(CellScanner cellScanner) { - ((PayloadCarryingRpcController)this.rpcController).setCellScanner(cellScanner); + ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 8d63295..0ea696e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -89,7 +89,7 @@ public class RpcRetryingCallerWithReadReplicas { */ class ReplicaRegionServerCallable extends RegionServerCallable implements Cancellable { final int id; - private final PayloadCarryingRpcController controller; + private final HBaseRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory, @@ -144,7 +144,7 @@ public class RpcRetryingCallerWithReadReplicas { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); // Presumption that we are passed a PayloadCarryingRpcController here! - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; + HBaseRpcController pcrc = (HBaseRpcController)controller; pcrc.setCallTimeout(callTimeout); ClientProtos.GetResponse response = getStub().get(controller, request); if (response == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 0ee54d0..adf1153 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 00c4cdd..6cb07861 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -214,11 +214,11 @@ public abstract class AbstractRpcClient implements RpcClient { * new Connection each time. * @return A pair with the Message response and the Cell data (if any). */ - private Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, + private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController pcrc, Message param, Message returnType, final User ticket, final InetSocketAddress isa) throws ServiceException { if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); + pcrc = new HBaseRpcControllerImpl(); } Pair val; @@ -257,7 +257,7 @@ public abstract class AbstractRpcClient implements RpcClient { * @throws InterruptedException if call is interrupted * @throws java.io.IOException if transport failed */ - protected abstract Pair call(PayloadCarryingRpcController pcrc, + protected abstract Pair call(HBaseRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, InetSocketAddress isa, MetricsConnection.CallStats callStats) throws IOException, InterruptedException; @@ -274,16 +274,16 @@ public abstract class AbstractRpcClient implements RpcClient { * @param channelOperationTimeout timeout for operation * @return configured payload controller */ - static PayloadCarryingRpcController configurePayloadCarryingRpcController( + static HBaseRpcController configurePayloadCarryingRpcController( RpcController controller, int channelOperationTimeout) { - PayloadCarryingRpcController pcrc; - if (controller != null && controller instanceof PayloadCarryingRpcController) { - pcrc = (PayloadCarryingRpcController) controller; + HBaseRpcController pcrc; + if (controller != null && controller instanceof HBaseRpcController) { + pcrc = (HBaseRpcController) controller; if (!pcrc.hasCallTimeout()) { pcrc.setCallTimeout(channelOperationTimeout); } } else { - pcrc = new PayloadCarryingRpcController(); + pcrc = new HBaseRpcControllerImpl(); pcrc.setCallTimeout(channelOperationTimeout); } return pcrc; @@ -317,7 +317,7 @@ public abstract class AbstractRpcClient implements RpcClient { @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { - PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController( + HBaseRpcController pcrc = configurePayloadCarryingRpcController( controller, channelOperationTimeout); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 9550f2a..2ec5adc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -430,7 +430,7 @@ public class AsyncRpcChannel { requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { + if (call.getPriority() != HBaseRpcController.PRIORITY_UNSET) { requestHeaderBuilder.setPriority(call.getPriority()); } requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ? diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index e368c43..5f4d2f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -231,12 +231,12 @@ public class AsyncRpcClient extends AbstractRpcClient { * @throws java.io.IOException if a connection failure is encountered */ @Override - protected Pair call(PayloadCarryingRpcController pcrc, + protected Pair call(HBaseRpcController pcrc, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, InetSocketAddress addr, MetricsConnection.CallStats callStats) throws IOException, InterruptedException { if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); + pcrc = new HBaseRpcControllerImpl(); } final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); @@ -269,7 +269,7 @@ public class AsyncRpcClient extends AbstractRpcClient { } private MessageConverter getMessageConverterWithRpcController( - final PayloadCarryingRpcController pcrc) { + final HBaseRpcController pcrc) { return new MessageConverter() { @Override @@ -284,7 +284,7 @@ public class AsyncRpcClient extends AbstractRpcClient { * Call method async */ private void callMethod(final Descriptors.MethodDescriptor md, - final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket, + final HBaseRpcController pcrc, final Message param, Message returnType, User ticket, InetSocketAddress addr, final RpcCallback done) { final AsyncRpcChannel connection; try { @@ -490,7 +490,7 @@ public class AsyncRpcClient extends AbstractRpcClient { @Override public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType, RpcCallback done) { - PayloadCarryingRpcController pcrc = + HBaseRpcController pcrc = configurePayloadCarryingRpcController(controller, channelOperationTimeout); this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java new file mode 100644 index 0000000..2c97ffb --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; + +import java.io.IOException; +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Simple delegating controller for use with the {@link RpcControllerFactory} to help override + * standard behavior of a {@link HBaseRpcController}. + */ +@InterfaceAudience.Private +public class DelegatingHBaseRpcController implements HBaseRpcController { + + private final HBaseRpcController delegate; + + public DelegatingHBaseRpcController(HBaseRpcController delegate) { + this.delegate = delegate; + } + + @Override + public void reset() { + delegate.reset(); + } + + @Override + public boolean failed() { + return delegate.failed(); + } + + @Override + public String errorText() { + return delegate.errorText(); + } + + @Override + public void startCancel() { + delegate.startCancel(); + } + + @Override + public void setFailed(String reason) { + delegate.setFailed(reason); + } + + @Override + public boolean isCanceled() { + return delegate.isCanceled(); + } + + @Override + public void notifyOnCancel(RpcCallback callback) { + delegate.notifyOnCancel(callback); + } + + @Override + public CellScanner cellScanner() { + return delegate.cellScanner(); + } + + @Override + public void setCellScanner(CellScanner cellScanner) { + delegate.setCellScanner(cellScanner); + } + + @Override + public void setPriority(int priority) { + delegate.setPriority(priority); + } + + @Override + public void setPriority(TableName tn) { + delegate.setPriority(tn); + } + + @Override + public int getPriority() { + return delegate.getPriority(); + } + + @Override + public int getCallTimeout() { + return delegate.getCallTimeout(); + } + + @Override + public void setCallTimeout(int callTimeout) { + delegate.setCallTimeout(callTimeout); + } + + @Override + public boolean hasCallTimeout() { + return delegate.hasCallTimeout(); + } + + @Override + public void setFailed(IOException e) { + delegate.setFailed(e); + } + + @Override + public void notifyOnCancel(RpcCallback callback, Callable actionIfNotCancelled) + throws Exception { + delegate.notifyOnCancel(callback, actionIfNotCancelled); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java deleted file mode 100644 index aafd492..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Simple delegating controller for use with the {@link RpcControllerFactory} to help override - * standard behavior of a {@link PayloadCarryingRpcController}. - */ -@InterfaceAudience.Private -public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController { - private final PayloadCarryingRpcController delegate; - - public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) { - this.delegate = delegate; - } - - @Override - public CellScanner cellScanner() { - return delegate.cellScanner(); - } - - @Override - public void setCellScanner(final CellScanner cellScanner) { - delegate.setCellScanner(cellScanner); - } - - @Override - public void setPriority(int priority) { - delegate.setPriority(priority); - } - - @Override - public void setPriority(final TableName tn) { - delegate.setPriority(tn); - } - - @Override - public int getPriority() { - return delegate.getPriority(); - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java new file mode 100644 index 0000000..e7a4913 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import java.io.IOException; +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it + * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid + * having to protobuf them (for performance reasons). This class is used ferrying data across the + * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing. + */ +@InterfaceAudience.Private +public interface HBaseRpcController extends RpcController, CellScannable { + + static final int PRIORITY_UNSET = -1; + + /** + * IMPORTANT: always call this method if the call finished without any exception to tell + * the {@code HBaseRpcController} that we are done. + */ + void setCellScanner(CellScanner cellScanner); + + /** + * @param priority Priority for this request; should fall roughly in the range + * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} + */ + void setPriority(int priority); + + /** + * @param tn Set priority based off the table we are going against. + */ + void setPriority(final TableName tn); + + /** + * @return The priority of this request + */ + int getPriority(); + + int getCallTimeout(); + + void setCallTimeout(int callTimeout); + + boolean hasCallTimeout(); + + /** + * Set failed with an exception to pass on. For use in async rpc clients + * @param e exception to set with + */ + void setFailed(IOException e); + + /** + * A little different from the basic RpcController: + *
    + *
  1. You can register multiple callbacks to an {@code HBaseRpcController}.
  2. + *
  3. The callback will not be called if the rpc call is finished without any cancellation.
  4. + *
  5. You can call me at client side also.
  6. + *
+ */ + @Override + void notifyOnCancel(RpcCallback callback); + + /** + * If already cancelled, do nothing, otherwise add the callback to notification list. The + * actionIfNotCancelled will be called if the rpc call has not been cancelled yet. The caller + * should guarantee that the rpc call has not been finished yet, and the implementation should + * guarantee that there is no cancel operation happen before actionIfNotCancelled is executed. + */ + void notifyOnCancel(RpcCallback callback, Callable actionIfNotCancelled) + throws Exception; +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java new file mode 100644 index 0000000..d978ed6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.RpcCallback; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it + * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid + * having to protobuf them (for performance reasons). This class is used ferrying data across the + * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing. + */ +@InterfaceAudience.Private +public class HBaseRpcControllerImpl implements HBaseRpcController { + /** + * The time, in ms before the call should expire. + */ + private Integer callTimeout; + + private boolean done = false; + + private boolean cancelled = false; + + private final List> cancellationCbs = new ArrayList<>(); + + private IOException exception; + + /** + * Priority to set on this request. Set it here in controller so available composing the request. + * This is the ordained way of setting priorities going forward. We will be undoing the old + * annotation-based mechanism. + */ + private int priority = PRIORITY_UNSET; + + /** + * They are optionally set on construction, cleared after we make the call, and then optionally + * set on response with the result. We use this lowest common denominator access to Cells because + * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded + * block that implements CellScanner. + */ + private CellScanner cellScanner; + + public HBaseRpcControllerImpl() { + this((CellScanner) null); + } + + public HBaseRpcControllerImpl(final CellScanner cellScanner) { + this.cellScanner = cellScanner; + } + + public HBaseRpcControllerImpl(final List cellIterables) { + this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables); + } + + /** + * @return One-shot cell scanner (you cannot back it up and restart) + */ + @Override + public CellScanner cellScanner() { + return cellScanner; + } + + @Override + public void setCellScanner(final CellScanner cellScanner) { + this.cellScanner = cellScanner; + } + + @Override + public void setPriority(int priority) { + this.priority = priority; + } + + @Override + public void setPriority(final TableName tn) { + setPriority( + tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS); + } + + @Override + public int getPriority() { + return priority; + } + + // In the implementations of some callable with replicas, rpc calls are executed in a executor and + // we will cancel the operation from outside which means there could be a race between reset and + // startCancel. Although I think this race should be handled by the callable since the reset may + // clear the cancel state... + @Override + public synchronized void reset() { + priority = 0; + cellScanner = null; + exception = null; + done = false; + cancelled = false; + cancellationCbs.clear(); + callTimeout = null; + } + + @Override + public int getCallTimeout() { + if (callTimeout != null) { + return callTimeout.intValue(); + } else { + return 0; + } + } + + @Override + public void setCallTimeout(int callTimeout) { + this.callTimeout = callTimeout; + } + + @Override + public boolean hasCallTimeout() { + return callTimeout != null; + } + + @Override + public synchronized String errorText() { + if (!done || exception == null) { + return null; + } + return exception.getMessage(); + } + + @Override + public synchronized boolean failed() { + return done && this.exception != null; + } + + @Override + public synchronized boolean isCanceled() { + return cancelled; + } + + @Override + public void notifyOnCancel(RpcCallback callback) { + synchronized (this) { + if (done) { + return; + } + if (!cancelled) { + cancellationCbs.add(callback); + return; + } + } + // run it directly as we have already been cancelled. + callback.run(null); + } + + @Override + public synchronized void setFailed(String reason) { + if (done) { + return; + } + done = true; + exception = new IOException(reason); + } + + @Override + public synchronized void setFailed(IOException e) { + if (done) { + return; + } + done = true; + exception = e; + } + + @Override + public void startCancel() { + // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need + // to copy it. + List> cbs; + synchronized (this) { + if (done) { + return; + } + done = true; + cancelled = true; + cbs = new ArrayList<>(cancellationCbs); + } + // after cancelled, the cancellationCbs will not be changed, so it is safe to access it + // directly. + for (RpcCallback cb : cbs) { + cb.run(null); + } + } + + @Override + public synchronized void notifyOnCancel(RpcCallback callback, + Callable actionIfNotCancelled) throws Exception { + if (cancelled) { + return; + } + assert !done; + cancellationCbs.add(callback); + actionIfNotCancelled.call(); + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java deleted file mode 100644 index d9877dc..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -/** - * Optionally carries Cells across the proxy/service interface down into ipc. On its - * way out it optionally carries a set of result Cell data. We stick the Cells here when we want - * to avoid having to protobuf them (for performance reasons). This class is used ferrying data - * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server - * ipc'ing. - */ -@InterfaceAudience.Private -public class PayloadCarryingRpcController implements RpcController, CellScannable { - /** - * The time, in ms before the call should expire. - */ - protected volatile Integer callTimeout; - protected volatile boolean cancelled = false; - protected final AtomicReference> cancellationCb = new AtomicReference<>(null); - protected final AtomicReference> failureCb = new AtomicReference<>(null); - private IOException exception; - - public static final int PRIORITY_UNSET = -1; - /** - * Priority to set on this request. Set it here in controller so available composing the - * request. This is the ordained way of setting priorities going forward. We will be - * undoing the old annotation-based mechanism. - */ - private int priority = PRIORITY_UNSET; - - /** - * They are optionally set on construction, cleared after we make the call, and then optionally - * set on response with the result. We use this lowest common denominator access to Cells because - * sometimes the scanner is backed by a List of Cells and other times, it is backed by an - * encoded block that implements CellScanner. - */ - private CellScanner cellScanner; - - public PayloadCarryingRpcController() { - this((CellScanner)null); - } - - public PayloadCarryingRpcController(final CellScanner cellScanner) { - this.cellScanner = cellScanner; - } - - public PayloadCarryingRpcController(final List cellIterables) { - this.cellScanner = cellIterables == null? null: CellUtil.createCellScanner(cellIterables); - } - - /** - * @return One-shot cell scanner (you cannot back it up and restart) - */ - @Override - public CellScanner cellScanner() { - return cellScanner; - } - - public void setCellScanner(final CellScanner cellScanner) { - this.cellScanner = cellScanner; - } - - /** - * @param priority Priority for this request; should fall roughly in the range - * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} - */ - public void setPriority(int priority) { - this.priority = priority; - } - - /** - * @param tn Set priority based off the table we are going against. - */ - public void setPriority(final TableName tn) { - setPriority(tn != null && tn.isSystemTable()? HConstants.SYSTEMTABLE_QOS: - HConstants.NORMAL_QOS); - } - - /** - * @return The priority of this request - */ - public int getPriority() { - return priority; - } - - @Override - public void reset() { - priority = 0; - cellScanner = null; - exception = null; - cancelled = false; - failureCb.set(null); - cancellationCb.set(null); - callTimeout = null; - } - - public int getCallTimeout() { - if (callTimeout != null) { - return callTimeout; - } else { - return 0; - } - } - - public void setCallTimeout(int callTimeout) { - this.callTimeout = callTimeout; - } - - public boolean hasCallTimeout(){ - return callTimeout != null; - } - - @Override - public String errorText() { - if (exception != null) { - return exception.getMessage(); - } else { - return null; - } - } - - /** - * For use in async rpc clients - * @return true if failed - */ - @Override - public boolean failed() { - return this.exception != null; - } - - @Override - public boolean isCanceled() { - return cancelled; - } - - @Override - public void notifyOnCancel(RpcCallback cancellationCb) { - this.cancellationCb.set(cancellationCb); - if (this.cancelled) { - cancellationCb.run(null); - } - } - - /** - * Notify a callback on error. - * For use in async rpc clients - * - * @param failureCb the callback to call on error - */ - public void notifyOnFail(RpcCallback failureCb) { - this.failureCb.set(failureCb); - if (this.exception != null) { - failureCb.run(this.exception); - } - } - - @Override - public void setFailed(String reason) { - this.exception = new IOException(reason); - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } - } - - /** - * Set failed with an exception to pass on. - * For use in async rpc clients - * - * @param e exception to set with - */ - public void setFailed(IOException e) { - this.exception = e; - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } - } - - @Override - public void startCancel() { - cancelled = true; - if (cancellationCb.get() != null) { - cancellationCb.get().run(null); - } - } -} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 03b2953..4546c8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -903,7 +903,7 @@ public class RpcClientImpl extends AbstractRpcClient { builder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there is one set. - if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { + if (priority != HBaseRpcController.PRIORITY_UNSET) { builder.setPriority(priority); } builder.setTimeout(call.timeout); @@ -1208,12 +1208,12 @@ public class RpcClientImpl extends AbstractRpcClient { * @throws IOException if something fails on the connection */ @Override - protected Pair call(PayloadCarryingRpcController pcrc, MethodDescriptor md, + protected Pair call(HBaseRpcController pcrc, MethodDescriptor md, Message param, Message returnType, User ticket, InetSocketAddress addr, MetricsConnection.CallStats callStats) throws IOException, InterruptedException { if (pcrc == null) { - pcrc = new PayloadCarryingRpcController(); + pcrc = new HBaseRpcControllerImpl(); } Call call = this.call(md, param, returnType, pcrc, ticket, addr, callStats); @@ -1236,7 +1236,7 @@ public class RpcClientImpl extends AbstractRpcClient { * @throws IOException if something fails on the connection */ private Call call(MethodDescriptor method, Message request, - R responsePrototype, PayloadCarryingRpcController pcrc, User ticket, + R responsePrototype, HBaseRpcController pcrc, User ticket, InetSocketAddress addr, MetricsConnection.CallStats callStats) throws IOException, InterruptedException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java index 4b84df1..3c90a92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.ReflectionUtils; /** - * Factory to create a {@link PayloadCarryingRpcController} + * Factory to create a {@link HBaseRpcController} */ @InterfaceAudience.Private public class RpcControllerFactory { @@ -46,17 +46,17 @@ public class RpcControllerFactory { this.conf = conf; } - public PayloadCarryingRpcController newController() { + public HBaseRpcController newController() { // TODO: Set HConstants default rpc timeout here rather than nothing? - return new PayloadCarryingRpcController(); + return new HBaseRpcControllerImpl(); } - public PayloadCarryingRpcController newController(final CellScanner cellScanner) { - return new PayloadCarryingRpcController(cellScanner); + public HBaseRpcController newController(final CellScanner cellScanner) { + return new HBaseRpcControllerImpl(cellScanner); } - public PayloadCarryingRpcController newController(final List cellIterables) { - return new PayloadCarryingRpcController(cellIterables); + public HBaseRpcController newController(final List cellIterables) { + return new HBaseRpcControllerImpl(cellIterables); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java index 79dbd05..edbd0a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlClient.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService.BlockingInterface; @@ -94,7 +94,7 @@ public class AccessControlClient { public static void grant(Connection connection, final TableName tableName, final String userName, final byte[] family, final byte[] qual, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { @@ -113,7 +113,7 @@ public class AccessControlClient { */ public static void grant(Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { @@ -128,7 +128,7 @@ public class AccessControlClient { */ public static void grant(Connection connection, final String userName, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { ProtobufUtil.grant(controller, getAccessControlServiceStub(table), userName, actions); @@ -155,7 +155,7 @@ public class AccessControlClient { public static void revoke(Connection connection, final TableName tableName, final String username, final byte[] family, final byte[] qualifier, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); controller.setPriority(tableName); try (Table table = connection.getTable(ACL_TABLE_NAME)) { @@ -174,7 +174,7 @@ public class AccessControlClient { */ public static void revoke(Connection connection, final String namespace, final String userName, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, namespace, @@ -188,7 +188,7 @@ public class AccessControlClient { */ public static void revoke(Connection connection, final String userName, final Permission.Action... actions) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { ProtobufUtil.revoke(controller, getAccessControlServiceStub(table), userName, actions); @@ -206,7 +206,7 @@ public class AccessControlClient { */ public static List getUserPermissions(Connection connection, String tableRegex) throws Throwable { - PayloadCarryingRpcController controller + HBaseRpcController controller = ((ClusterConnection) connection).getRpcControllerFactory().newController(); List permList = new ArrayList(); try (Table table = connection.getTable(ACL_TABLE_NAME)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 497e8c4..359617a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.ipc.FailedServerException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -318,7 +318,7 @@ public class MetaTableLocator { return false; } Throwable t; - PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); + HBaseRpcController controller = connection.getRpcControllerFactory().newController(); try { // Try and get regioninfo from the hosting server. return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java index b41c859..80980fd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSnapshotFromAdmin.java @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; @@ -88,7 +88,7 @@ public class TestSnapshotFromAdmin { RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); // set the max wait time for the snapshot to complete @@ -136,7 +136,7 @@ public class TestSnapshotFromAdmin { RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(conf); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); Mockito.when(mockConnection.getRpcRetryingCallerFactory()).thenReturn(callerFactory); Mockito.when(mockConnection.getRpcControllerFactory()).thenReturn(controllerFactory); Admin admin = new HBaseAdmin(mockConnection); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java new file mode 100644 index 0000000..d9535a6 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestHBaseRpcControllerImpl.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScannable; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestHBaseRpcControllerImpl { + + @Test + public void testListOfCellScannerables() throws IOException { + List cells = new ArrayList(); + final int count = 10; + for (int i = 0; i < count; i++) { + cells.add(createCell(i)); + } + HBaseRpcController controller = new HBaseRpcControllerImpl(cells); + CellScanner cellScanner = controller.cellScanner(); + int index = 0; + for (; cellScanner.advance(); index++) { + Cell cell = cellScanner.current(); + byte[] indexBytes = Bytes.toBytes(index); + assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + } + assertEquals(count, index); + } + + /** + * @param index + * @return A faked out 'Cell' that does nothing but return index as its value + */ + static CellScannable createCell(final int index) { + return new CellScannable() { + @Override + public CellScanner cellScanner() { + return new CellScanner() { + @Override + public Cell current() { + // Fake out a Cell. All this Cell has is a value that is an int in size and equal + // to the above 'index' param serialized as an int. + return new Cell() { + private final int i = index; + + @Override + public byte[] getRowArray() { + // unused + return null; + } + + @Override + public int getRowOffset() { + // unused + return 0; + } + + @Override + public short getRowLength() { + // unused + return 0; + } + + @Override + public byte[] getFamilyArray() { + // unused + return null; + } + + @Override + public int getFamilyOffset() { + // unused + return 0; + } + + @Override + public byte getFamilyLength() { + // unused + return 0; + } + + @Override + public byte[] getQualifierArray() { + // unused + return null; + } + + @Override + public int getQualifierOffset() { + // unused + return 0; + } + + @Override + public int getQualifierLength() { + // unused + return 0; + } + + @Override + public long getTimestamp() { + // unused + return 0; + } + + @Override + public byte getTypeByte() { + // unused + return 0; + } + + @Override + public long getSequenceId() { + // unused + return 0; + } + + @Override + public byte[] getValueArray() { + return Bytes.toBytes(this.i); + } + + @Override + public int getValueOffset() { + return 0; + } + + @Override + public int getValueLength() { + return Bytes.SIZEOF_INT; + } + + @Override + public int getTagsOffset() { + // unused + return 0; + } + + @Override + public int getTagsLength() { + // unused + return 0; + } + + @Override + public byte[] getTagsArray() { + // unused + return null; + } + }; + } + + private boolean hasCell = true; + + @Override + public boolean advance() { + // We have one Cell only so return true first time then false ever after. + if (!hasCell) return hasCell; + hasCell = false; + return true; + } + }; + } + }; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java deleted file mode 100644 index 11c8ff8..0000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.junit.experimental.categories.Category; - - -@Category({ClientTests.class, SmallTests.class}) -public class TestPayloadCarryingRpcController { - @Test - public void testListOfCellScannerables() throws IOException { - List cells = new ArrayList(); - final int count = 10; - for (int i = 0; i < count; i++) { - cells.add(createCell(i)); - } - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); - CellScanner cellScanner = controller.cellScanner(); - int index = 0; - for (; cellScanner.advance(); index++) { - Cell cell = cellScanner.current(); - byte [] indexBytes = Bytes.toBytes(index); - assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(), - cell.getValueOffset(), cell.getValueLength())); - } - assertEquals(count, index); - } - - /** - * @param index - * @return A faked out 'Cell' that does nothing but return index as its value - */ - static CellScannable createCell(final int index) { - return new CellScannable() { - @Override - public CellScanner cellScanner() { - return new CellScanner() { - @Override - public Cell current() { - // Fake out a Cell. All this Cell has is a value that is an int in size and equal - // to the above 'index' param serialized as an int. - return new Cell() { - private final int i = index; - - @Override - public byte[] getRowArray() { - // unused - return null; - } - - @Override - public int getRowOffset() { - // unused - return 0; - } - - @Override - public short getRowLength() { - // unused - return 0; - } - - @Override - public byte[] getFamilyArray() { - // unused - return null; - } - - @Override - public int getFamilyOffset() { - // unused - return 0; - } - - @Override - public byte getFamilyLength() { - // unused - return 0; - } - - @Override - public byte[] getQualifierArray() { - // unused - return null; - } - - @Override - public int getQualifierOffset() { - // unused - return 0; - } - - @Override - public int getQualifierLength() { - // unused - return 0; - } - - @Override - public long getTimestamp() { - // unused - return 0; - } - - @Override - public byte getTypeByte() { - // unused - return 0; - } - - @Override - public long getSequenceId() { - // unused - return 0; - } - - @Override - public byte[] getValueArray() { - return Bytes.toBytes(this.i); - } - - @Override - public int getValueOffset() { - return 0; - } - - @Override - public int getValueLength() { - return Bytes.SIZEOF_INT; - } - - @Override - public int getTagsOffset() { - // unused - return 0; - } - - @Override - public int getTagsLength() { - // unused - return 0; - } - - @Override - public byte[] getTagsArray() { - // unused - return null; - } - }; - } - - private boolean hasCell = true; - @Override - public boolean advance() { - // We have one Cell only so return true first time then false ever after. - if (!hasCell) return hasCell; - hasCell = false; - return true; - } - }; - } - }; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 759da82..c6cefbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -2205,7 +2205,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { status.setRPCPacket(param); status.resume("Servicing call"); //get an instance of the method arg type - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner); + HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner); controller.setCallTimeout(timeout); Message result = service.callBlockingMethod(md, controller, param); long endTime = System.currentTimeMillis(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index d7ba4f3..5ee3790 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; @@ -789,7 +789,7 @@ public class ServerManager { } } - private PayloadCarryingRpcController newRpcController() { + private HBaseRpcController newRpcController() { return rpcControllerFactory == null ? null : rpcControllerFactory.newController(); } @@ -813,7 +813,7 @@ public class ServerManager { region.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(), dest); } @@ -835,7 +835,7 @@ public class ServerManager { if (server == null) return; try { AdminService.BlockingInterface admin = getRsAdmin(server); - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); ProtobufUtil.warmupRegion(controller, admin, region); } catch (IOException e) { LOG.error("Received exception in RPC for warmup server:" + @@ -851,7 +851,7 @@ public class ServerManager { public static void closeRegionSilentlyAndWait(ClusterConnection connection, ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException { AdminService.BlockingInterface rs = connection.getAdmin(server); - PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController(); + HBaseRpcController controller = connection.getRpcControllerFactory().newController(); try { ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName()); } catch (IOException e) { @@ -902,7 +902,7 @@ public class ServerManager { + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible, user); } @@ -916,7 +916,7 @@ public class ServerManager { RetryCounter retryCounter = pingRetryCounterFactory.create(); while (retryCounter.shouldRetry()) { try { - PayloadCarryingRpcController controller = newRpcController(); + HBaseRpcController controller = newRpcController(); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { ServerInfo info = ProtobufUtil.getServerInfo(controller, admin); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index 8cb2237..795dfad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.protobuf; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -35,7 +37,8 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; @@ -46,8 +49,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKey; -import com.google.protobuf.ServiceException; - @InterfaceAudience.Private public class ReplicationProtbufUtil { /** @@ -66,7 +67,7 @@ public class ReplicationProtbufUtil { Pair p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); + HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond()); try { admin.replicateWALEntry(controller, p.getFirst()); } catch (ServiceException se) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3859d18..e857c8d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; @@ -465,7 +465,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void addResult(final MutateResponse.Builder builder, final Result result, - final PayloadCarryingRpcController rpcc) { + final HBaseRpcController rpcc) { if (result == null) return; if (isClientCellBlockSupport()) { builder.setResult(ProtobufUtil.toResultNoData(result)); @@ -485,7 +485,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addCellsPerResult(res.size()); builder.addPartialFlagPerResult(res.isPartial()); } - ((PayloadCarryingRpcController)controller). + ((HBaseRpcController)controller). setCellScanner(CellUtil.createCellScanner(results)); } else { for (Result res: results) { @@ -1839,7 +1839,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, public ReplicateWALEntryResponse replay(final RpcController controller, final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); - CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner(); + CellScanner cells = ((HBaseRpcController) controller).cellScanner(); try { checkOpen(); List entries = request.getEntryList(); @@ -1944,7 +1944,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (regionServer.replicationSinkHandler != null) { requestCount.increment(); List entries = request.getEntryList(); - CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner(); + CellScanner cellScanner = ((HBaseRpcController)controller).cellScanner(); regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner); regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), @@ -2208,10 +2208,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else if (r != null) { ClientProtos.Result pbr; RpcCallContext call = RpcServer.getCurrentCall(); - if (isClientCellBlockSupport(call) && controller instanceof PayloadCarryingRpcController + if (isClientCellBlockSupport(call) && controller instanceof HBaseRpcController && VersionInfoUtil.hasMinimumVersion(call.getClientVersionInfo(), 1, 3)) { pbr = ProtobufUtil.toResultNoData(r); - ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil.createCellScanner(r + ((HBaseRpcController) controller).setCellScanner(CellUtil.createCellScanner(r .rawCells())); } else { pbr = ProtobufUtil.toResult(r); @@ -2298,7 +2298,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. - PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; + HBaseRpcController controller = (HBaseRpcController)rpcc; CellScanner cellScanner = controller != null ? controller.cellScanner(): null; if (controller != null) { controller.setCellScanner(null); @@ -2436,7 +2436,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, final MutateRequest request) throws ServiceException { // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. - PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; + HBaseRpcController controller = (HBaseRpcController)rpcc; CellScanner cellScanner = controller != null ? controller.cellScanner() : null; OperationQuota quota = null; // Clear scanner so we are not holding on to reference across call. @@ -2768,9 +2768,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout; } if (controller != null) { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pRpcController = - (PayloadCarryingRpcController)controller; + if (controller instanceof HBaseRpcController) { + HBaseRpcController pRpcController = + (HBaseRpcController)controller; if (pRpcController.getCallTimeout() > 0) { timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index c756294..9695aa9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.client.RegionAdminServiceCallable; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RetryingCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; @@ -625,7 +625,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes(); } - public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception { + public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception { // Check whether we should still replay this entry. If the regions are changed, or the // entry is not coming form the primary region, filter it out because we do not need it. // Regions can change because of (1) region split (2) region merge (3) table recreated diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java index affa9b3..e7afe48 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessorNoCluster.java @@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; @@ -165,7 +165,7 @@ public class TestMetaTableAccessorNoCluster { .thenReturn(ScanResponse.newBuilder().setScannerId(1234567890L).build()) .thenAnswer(new Answer() { public ScanResponse answer(InvocationOnMock invocation) throws Throwable { - ((PayloadCarryingRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil + ((HBaseRpcController) invocation.getArguments()[0]).setCellScanner(CellUtil .createCellScanner(cellScannables)); return builder.build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java index ba6e1d4..f14eaaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableLocator.java @@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.master.RegionState; @@ -254,7 +254,7 @@ public class TestMetaTableLocator { thenReturn(implementation); RpcControllerFactory controllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(controllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); Mockito.when(connection.getRpcControllerFactory()).thenReturn(controllerFactory); ServerName sn = ServerName.valueOf("example.com", 1234, System.currentTimeMillis()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java index 3948d18..25620aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHBaseAdminNoCluster.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; @@ -301,7 +301,7 @@ public class TestHBaseAdminNoCluster { RpcControllerFactory rpcControllerFactory = Mockito.mock(RpcControllerFactory.class); Mockito.when(connection.getRpcControllerFactory()).thenReturn(rpcControllerFactory); Mockito.when(rpcControllerFactory.newController()).thenReturn( - Mockito.mock(PayloadCarryingRpcController.class)); + Mockito.mock(HBaseRpcController.class)); // we need a real retrying caller RpcRetryingCallerFactory callerFactory = new RpcRetryingCallerFactory(configuration); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index bda80de..4e61fd3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.HMaster; @@ -548,7 +548,7 @@ public class TestHCM { (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) { @Override - public Object call(PayloadCarryingRpcController controller) throws Exception { + public Object call(HBaseRpcController controller) throws Exception { return null; } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 1f093fe..c1c9b1e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -36,8 +36,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService; -import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -57,27 +57,27 @@ public class TestRpcControllerFactory { } @Override - public PayloadCarryingRpcController newController() { + public HBaseRpcController newController() { return new CountingRpcController(super.newController()); } @Override - public PayloadCarryingRpcController newController(final CellScanner cellScanner) { + public HBaseRpcController newController(final CellScanner cellScanner) { return new CountingRpcController(super.newController(cellScanner)); } @Override - public PayloadCarryingRpcController newController(final List cellIterables) { + public HBaseRpcController newController(final List cellIterables) { return new CountingRpcController(super.newController(cellIterables)); } } - public static class CountingRpcController extends DelegatingPayloadCarryingRpcController { + public static class CountingRpcController extends DelegatingHBaseRpcController { private static AtomicInteger INT_PRIORITY = new AtomicInteger(); private static AtomicInteger TABLE_PRIORITY = new AtomicInteger(); - public CountingRpcController(PayloadCarryingRpcController delegate) { + public CountingRpcController(HBaseRpcController delegate) { super(delegate); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 771ef93..9686afb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -103,7 +103,7 @@ public abstract class AbstractTestIPC { try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); String message = "hello"; assertEquals(message, stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); @@ -133,8 +133,7 @@ public abstract class AbstractTestIPC { try (AbstractRpcClient client = createRpcClient(conf)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController( - CellUtil.createCellScanner(cells)); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells)); String message = "hello"; assertEquals(message, stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); @@ -210,7 +209,7 @@ public abstract class AbstractTestIPC { // set total RPC size bigger than 100 bytes EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); stub.echo( - new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList. of(CELL))), + new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList. of(CELL))), param); fail("RPC should have failed because it exceeds max request size"); } catch (ServiceException e) { @@ -264,7 +263,7 @@ public abstract class AbstractTestIPC { try (AbstractRpcClient client = createRpcClient(CONF)) { rpcServer.start(); BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); - PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); int ms = 1000; int timeout = 100; for (int i = 0; i < 10; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java index ce7521e..ae658a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -76,8 +76,8 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface { @Override public EchoResponseProto echo(RpcController controller, EchoRequestProto request) throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; + if (controller instanceof HBaseRpcController) { + HBaseRpcController pcrc = (HBaseRpcController) controller; // If cells, scan them to check we are able to iterate what we were given and since this is an // echo, just put them back on the controller creating a new block. Tests our block building. CellScanner cellScanner = pcrc.cellScanner(); @@ -93,7 +93,7 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface { } } cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); + pcrc.setCellScanner(cellScanner); } return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 1940afa..2e47eb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -412,7 +412,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { builder.addCellsPerResult(result.size()); List results = new ArrayList(1); results.add(result); - ((PayloadCarryingRpcController) controller).setCellScanner(CellUtil + ((HBaseRpcController) controller).setCellScanner(CellUtil .createCellScanner(results)); builder.setMoreResults(true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 1c1a603..f756ebe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -22,6 +22,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -52,7 +56,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -69,10 +73,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Iterators; -import com.google.common.collect.Sets; -import com.google.protobuf.ServiceException; - @Category(LargeTests.class) public class TestEndToEndSplitTransaction { private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class); @@ -164,7 +164,7 @@ public class TestEndToEndSplitTransaction { regionName, new Scan(row), 1, true); try { server.getRSRpcServices().scan( - new PayloadCarryingRpcController(), scanRequest); + new HBaseRpcControllerImpl(), scanRequest); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } -- 1.9.1