diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a0a5f54..12d475f 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1617,6 +1617,14 @@ HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), + // server socket related + HIVE_SERVER2_READ_SOCKET_TIMEOUT("hive.server2.read.socket.timeout", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Timeout for the HiveServer2 to close the connection if no response from the client.\n" + + "In http mode, it's translated to maxIdleTime of jetty connector."), + HIVE_SERVER2_TCP_KEEP_ALIVE("hive.server2.tcp.keepalive", false, + "Whether to enable TCP keepalive for HiveServer2. Not effective in http mode."), + HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "0ms", new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false), "The check interval for session/operation timeout, which can be disabled by setting to zero or negative value."), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java metastore/src/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java index 9ac18dc..c62a475 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.metastore; +import java.net.ServerSocket; import java.net.SocketException; import org.apache.thrift.transport.TServerSocket; @@ -30,6 +31,11 @@ * */ public class TServerSocketKeepAlive extends TServerSocket { + + public TServerSocketKeepAlive(ServerSocket serverSocket) throws TTransportException { + super(serverSocket); + } + public TServerSocketKeepAlive(int port) throws TTransportException { super(port, 0); } diff --git service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java index a0f7667..4237f72 100644 --- service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java +++ service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java @@ -201,19 +201,19 @@ public static TTransport getSSLSocket(String host, int port, int loginTimeout, return TSSLTransportFactory.getClientSocket(host, port, loginTimeout, params); } - public static TServerSocket getServerSocket(String hiveHost, int portNum) - throws TTransportException { + public static TServerSocket getServerSocket(String hiveHost, int portNum, int socketTimeout) + throws TTransportException { InetSocketAddress serverAddress; if (hiveHost == null || hiveHost.isEmpty()) { serverAddress = new InetSocketAddress(portNum); } else { serverAddress = new InetSocketAddress(hiveHost, portNum); } - return new TServerSocket(serverAddress); + return new TServerSocket(serverAddress, socketTimeout); } - public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, String keyStorePath, - String keyStorePassWord) throws TTransportException, UnknownHostException { + public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, int socketTimeout, + String keyStorePath, String keyStorePassWord) throws TTransportException, UnknownHostException { TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); params.setKeyStore(keyStorePath, keyStorePassWord); @@ -224,7 +224,7 @@ public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, Str } else { serverAddress = InetAddress.getByName(hiveHost); } - return TSSLTransportFactory.getServerSocket(portNum, 0, serverAddress, params); + return TSSLTransportFactory.getServerSocket(portNum, socketTimeout, serverAddress, params); } // retrieve delegation token for the given user diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index 2b80adc..1dac2b8 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.thrift; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -27,6 +28,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.ShimLoader; + +import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup; @@ -34,6 +37,7 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; @@ -77,20 +81,8 @@ public void run() { workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryWithGarbageCleanup(threadPoolName)); - TServerSocket serverSocket = null; - if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { - serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum); - } else { - String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); - if (keyStorePath.isEmpty()) { - throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + - " Not configured for SSL connection"); - } - String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, - HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); - serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, - keyStorePath, keyStorePassword); - } + TServerSocket serverSocket = createServerSocket(hiveHost); + TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket) .processorFactory(processorFactory) .transportFactory(transportFactory) @@ -108,4 +100,26 @@ public void run() { } } + + private TServerSocket createServerSocket(String hiveHost) throws TTransportException, IOException { + TServerSocket serverSocket; + if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) { + serverSocket = HiveAuthFactory.getServerSocket(hiveHost, portNum, socketTimeout); + } else { + String keyStorePath = hiveConf.getVar(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim(); + if (keyStorePath.isEmpty()) { + throw new IllegalArgumentException(ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname + + " is not configured for SSL connection"); + } + String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, + HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname); + serverSocket = HiveAuthFactory.getServerSSLSocket(hiveHost, portNum, socketTimeout, + keyStorePath, keyStorePassword); + } + if (socketKeepAlive) { + // TServerSocket is simple wrapper of internal server socket + serverSocket = new TServerSocketKeepAlive(serverSocket.getServerSocket()); + } + return serverSocket; + } } diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 443c371..eff8a76 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.security.auth.login.LoginException; @@ -63,6 +64,9 @@ protected int maxWorkerThreads; protected long workerKeepAliveTime; + protected int socketTimeout; + protected boolean socketKeepAlive; + protected static HiveAuthFactory hiveAuthFactory; public ThriftCLIService(CLIService cliService, String serviceName) { @@ -73,6 +77,11 @@ public ThriftCLIService(CLIService cliService, String serviceName) { @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; + minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS); + maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS); + socketTimeout = (int) hiveConf.getTimeVar( + ConfVars.HIVE_SERVER2_READ_SOCKET_TIMEOUT, TimeUnit.SECONDS); + socketKeepAlive = hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TCP_KEEP_ALIVE); super.init(hiveConf); } diff --git service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java index 4067106..40b2ee2 100644 --- service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java @@ -110,11 +110,13 @@ public void run() { connector.setPort(portNum); // Linux:yes, Windows:no connector.setReuseAddress(!Shell.WINDOWS); - - int maxIdleTime = (int) hiveConf.getTimeVar( + + socketTimeout = (int) hiveConf.getTimeVar( ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS); - connector.setMaxIdleTime(maxIdleTime); - + if (socketTimeout > 0) { + connector.setMaxIdleTime(socketTimeout); + } + httpServer.addConnector(connector); hiveAuthFactory = new HiveAuthFactory(hiveConf);