diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index e603c4a..08860d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -58,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -68,20 +67,17 @@ import javax.security.sasl.SaslServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CallQueueTooBigException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException; -import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -90,7 +86,6 @@ import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -102,17 +97,16 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler; import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler; import org.apache.hadoop.hbase.security.SaslStatus; import org.apache.hadoop.hbase.security.SaslUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; @@ -134,8 +128,8 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.codehaus.jackson.map.ObjectMapper; import org.apache.htrace.TraceInfo; +import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; @@ -260,7 +254,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { //maintain a list //of client connections private Listener listener = null; - protected Responder responder = null; + private int writeThreads; // number of write threads + private int currentWriter = 0; + protected Responder[] responders = null; protected AuthenticationTokenSecretManager authTokenSecretMgr = null; protected int numConnections = 0; @@ -908,6 +904,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = getConnection(channel, System.currentTimeMillis()); + c.responder = getResponder(); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); @@ -975,6 +972,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } + + Responder getResponder() { + currentWriter = (currentWriter + 1) % responders.length; + return responders[currentWriter]; + } } // Sends responses of RPC back to clients. @@ -1316,6 +1318,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { protected User user = null; protected UserGroupInformation ugi = null; + protected Responder responder = null; + public Connection(SocketChannel channel, long lastContact) { this.channel = channel; this.lastContact = lastContact; @@ -2187,7 +2191,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { // Create the responder here - responder = new Responder(); + this.writeThreads = conf.getInt("hbase.ipc.server.write.threadpool.size", + 10); + LOG.info("hbase.ipc.server.write.threadpool.size: " + writeThreads); + responders = new Responder[writeThreads]; + for (int i = 0; i < writeThreads; ++i) { + responders[i] = new Responder(); + } this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false); this.userProvider = UserProvider.instantiate(conf); this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); @@ -2279,7 +2289,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } this.authManager = new ServiceAuthorizationManager(); HBasePolicyProvider.init(conf, authManager); - responder.start(); + for (int i = 0; i < writeThreads; ++i) { + responders[i].start(); + } listener.start(); scheduler.start(); started = true; @@ -2445,7 +2457,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } listener.interrupt(); listener.doStop(); - responder.interrupt(); + for (int i = 0; i < writeThreads; ++i) { + responders[i].interrupt(); + } scheduler.stop(); notifyAll(); }