diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 7ce86bd..34c53b9 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -81,7 +81,7 @@ public class IntegrationTestRpcClient { TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { super(null, "testRpcServer", Lists .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( - "localhost", 0), conf, scheduler); + "localhost", 0), conf, scheduler, null); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 7bcf3a7..b9c110f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -99,7 +99,9 @@ import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; @@ -305,6 +307,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { private volatile boolean allowFallbackToSimpleAuth; + private final RSRpcServices rsRpcServices; + /** * Datastructure that holds all necessary to a method invocation and then afterward, carries * the result. @@ -2281,11 +2285,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @param bindAddress Where to listen * @param conf * @param scheduler + * @param rsRpcServices The {@link RSRpcServices} instance to get details for scan.next */ public RpcServer(final Server server, final String name, final List services, final InetSocketAddress bindAddress, Configuration conf, - RpcScheduler scheduler) + RpcScheduler scheduler, RSRpcServices rsRpcServices) throws IOException { if (conf.getBoolean("hbase.ipc.server.reservoir.enabled", true)) { this.reservoir = new ByteBufferPool( @@ -2337,6 +2342,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { this.scheduler = scheduler; this.scheduler.init(new RpcSchedulerContext(this)); + + this.rsRpcServices = rsRpcServices; } @Override @@ -2554,6 +2561,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { responseInfo.put("method", methodName); responseInfo.put("call", call); responseInfo.put("param", ProtobufUtil.getShortTextFormat(param)); + if (param instanceof ClientProtos.ScanRequest && rsRpcServices != null) { + ClientProtos.ScanRequest request = ((ClientProtos.ScanRequest) param); + if (request.hasScannerId()) { + long scannerId = request.getScannerId(); + String scanDetails = rsRpcServices.getScanDetailsWithId(scannerId); + if (scanDetails != null) { + responseInfo.put("scandetails", scanDetails); + } + } + } LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 35b2ab0..46882ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1064,7 +1064,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rpcServer = new RpcServer(rs, name, getServices(), bindAddress, // use final bindAddress for this server. rs.conf, - rpcSchedulerFactory.create(rs.conf, this, rs)); + rpcSchedulerFactory.create(rs.conf, this, rs), this); } catch (BindException be) { String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT : HConstants.REGIONSERVER_PORT; @@ -1130,6 +1130,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return null; } + public String getScanDetailsWithId(long scannerId) { + RegionScanner scanner = getScanner(scannerId); + if (scanner == null) { + return null; + } + StringBuilder builder = new StringBuilder(); + builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString()); + builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString()); + return builder.toString(); + } + /** * Get the vtime associated with the scanner. * Currently the vtime is the number of "next" calls. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index 2211e8f..c782fda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -94,7 +94,7 @@ public abstract class AbstractTestIPC { TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), conf, scheduler); + new InetSocketAddress("localhost", 0), conf, scheduler, null); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 9a02d5b..7b48bbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -67,7 +67,7 @@ public class TestProtoBufRpc { // Get RPC server for server side implementation this.server = new RpcServer(null, "testrpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); + new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10), null); InetSocketAddress address = server.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index 8eed01c..343d03b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -52,7 +52,7 @@ public class TestRpcHandlerException { TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", Lists.newArrayList(new BlockingServiceAndInterface((BlockingService) SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); + new InetSocketAddress("localhost", 0), CONF, scheduler, null); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java index b7d6f87..a0671e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java @@ -252,7 +252,7 @@ public class TestSecureIPC { RpcServerInterface rpcServer = new RpcServer(null, "AbstractTestSecureIPC", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface((BlockingService) SERVICE, null)), isa, - serverConf, new FifoRpcScheduler(serverConf, 1)); + serverConf, new FifoRpcScheduler(serverConf, 1), null); rpcServer.start(); try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index b7517bf0..42f3028 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -189,7 +189,7 @@ public class TestTokenAuthentication { sai.add(new BlockingServiceAndInterface(proxy, AuthenticationProtos.AuthenticationService.BlockingInterface.class)); this.rpcServer = - new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1)); + new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1), null); InetSocketAddress address = rpcServer.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed");