diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 96f506f..64bf843 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; @@ -103,16 +100,6 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.regionserver.RSRpcServices; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; @@ -124,6 +111,26 @@ import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -147,16 +154,8 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.TraceInfo; import org.codehaus.jackson.map.ObjectMapper; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * An RPC server that hosts protobuf described Services. @@ -270,7 +269,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // maintains the set of client connections and handles idle timeouts private ConnectionManager connectionManager; private Listener listener = null; - protected Responder responder = null; + private int writeThreads; // number of write threads + private int currentWriter = 0; + protected Responder[] responders = null; protected AuthenticationTokenSecretManager authTokenSecretMgr = null; protected HBaseRPCErrorHandler errorHandler = null; @@ -1024,6 +1025,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } continue; } + c.responder = getResponder(); key.attach(c); // so closeCurrentConnection can get the object reader.addConnection(c); } @@ -1076,6 +1078,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } + + Responder getResponder() { + currentWriter = (currentWriter + 1) % responders.length; + return responders[currentWriter]; + } } // Sends responses of RPC back to clients. @@ -1427,6 +1434,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected User user = null; protected UserGroupInformation ugi = null; + protected Responder responder = null; + public Connection(SocketChannel channel, long lastContact) { this.channel = channel; this.lastContact = lastContact; @@ -2484,7 +2493,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Create the responder here - responder = new Responder(); + this.writeThreads = conf.getInt("hbase.ipc.server.write.threadpool.size", + 10); + LOG.info("hbase.ipc.server.write.threadpool.size: " + writeThreads); + responders = new Responder[writeThreads]; + for (int i = 0; i < writeThreads; ++i) { + Responder responder = new Responder(); + responders[i] = responder; + } connectionManager = new ConnectionManager(); this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); this.userProvider = UserProvider.instantiate(conf); @@ -2577,7 +2593,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } this.authManager = new ServiceAuthorizationManager(); HBasePolicyProvider.init(conf, authManager); - responder.start(); + for (int i = 0; i < writeThreads; ++i) { + responders[i].start(); + } listener.start(); scheduler.start(); started = true; @@ -2761,7 +2779,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } listener.interrupt(); listener.doStop(); - responder.interrupt(); + for (int i = 0; i < writeThreads; ++i) { + responders[i].interrupt(); + } scheduler.stop(); notifyAll(); }